Skip to content

Commit

Permalink
Merge branch 'kaytu-io:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
ADorigi committed Jul 2, 2024
2 parents 718d942 + c7da9df commit 1d6895d
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 81 deletions.
5 changes: 1 addition & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ require (
github.com/fatih/color v1.17.0
github.com/go-git/go-git/v5 v5.12.0
github.com/golang-jwt/jwt/v4 v4.5.0
github.com/golang/protobuf v1.5.4
github.com/google/go-github v17.0.0+incompatible
github.com/google/go-github/v62 v62.0.0
github.com/google/uuid v1.6.0
github.com/hashicorp/hcl/v2 v2.20.1
github.com/jedib0t/go-pretty/v6 v6.5.9
github.com/muesli/reflow v0.3.0
Expand All @@ -25,7 +23,7 @@ require (
golang.org/x/oauth2 v0.20.0
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.1
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
)

require (
Expand Down Expand Up @@ -81,7 +79,6 @@ require (
golang.org/x/tools v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/spf13/cobra => github.com/spf13/cobra v1.4.0
Expand Down
5 changes: 0 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOW
github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
Expand All @@ -78,8 +76,6 @@ github.com/google/go-github/v62 v62.0.0 h1:/6mGCaRywZz9MuHyw9gD1CwsbmBX8GWsbFkwM
github.com/google/go-github/v62 v62.0.0/go.mod h1:EMxeUqGJq2xRu9DYBMwel/mr7kZrzUOfQmmpYrZn2a4=
github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8=
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/hcl/v2 v2.20.1 h1:M6hgdyz7HYt1UN9e61j+qKJBqR3orTWbI1HKBJEdxtc=
github.com/hashicorp/hcl/v2 v2.20.1/go.mod h1:TZDqQ4kNKCbh1iJp99FdPiUaVDDUPivbqxZulxDYqL4=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
Expand Down Expand Up @@ -247,7 +243,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV
gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME=
gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
108 changes: 53 additions & 55 deletions pkg/plugin/proto/src/golang/plugin.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions pkg/plugin/proto/src/golang/plugin_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 20 additions & 8 deletions pkg/plugin/sdk/job_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/kaytu-io/kaytu/pkg/plugin/proto/src/golang"
"github.com/kaytu-io/kaytu/pkg/utils"
"log"
"runtime/debug"
"sync/atomic"
Expand All @@ -29,15 +30,15 @@ type JobQueue struct {
pendingCounter atomic.Uint32
finishedCounter atomic.Uint32
onFinish func(ctx context.Context)
retryCount map[string]int
retryCount utils.ConcurrentMap[string, int]
}

func NewJobQueue(maxConcurrent int, stream *StreamController) *JobQueue {
return &JobQueue{
queue: make(chan Job, 10000),
maxConcurrent: maxConcurrent,
stream: stream,
retryCount: map[string]int{},
retryCount: utils.NewConcurrentMap[string, int](),

pendingCounter: atomic.Uint32{},
finishedCounter: atomic.Uint32{},
Expand Down Expand Up @@ -133,12 +134,23 @@ func (q *JobQueue) handleJob(ctx context.Context, job Job) {
log.Printf("Running job %s", props.ID)
if err := q.runJob(ctx, job); err != nil {
jobResult.FailureMessage = err.Error()
if q.retryCount[props.ID] < props.MaxRetry {
q.retryCount[props.ID]++

log.Printf("Failed job %s: %s, retrying[%d/%d]", props.ID, err.Error(), q.retryCount[props.ID], props.MaxRetry)
q.Push(job)
return
if v, ok := q.retryCount.Get(props.ID); v < props.MaxRetry {
if !ok {
v2, loaded := q.retryCount.LoadOrStore(props.ID, 0)
if loaded {
v = v2
}
}
for !q.retryCount.CompareAndSwap(props.ID, v, v+1) {
v, _ = q.retryCount.Get(props.ID)
}
if v+1 < props.MaxRetry {
log.Printf("Failed job %s: %s, retrying[%d/%d]", props.ID, err.Error(), v+1, props.MaxRetry)
q.Push(job)
return
} else {
log.Printf("Failed job %s: %s", props.ID, err.Error())
}
} else {
log.Printf("Failed job %s: %s", props.ID, err.Error())
}
Expand Down
57 changes: 57 additions & 0 deletions pkg/utils/concurrent_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package utils

import (
"sync"
)

type ConcurrentMap[K comparable, V any] struct {
data sync.Map
}

func NewConcurrentMap[K comparable, V any]() ConcurrentMap[K, V] {
return ConcurrentMap[K, V]{
data: sync.Map{},
}
}

func (cm *ConcurrentMap[K, V]) Set(key K, value V) {
cm.data.Store(key, value)
}

func (cm *ConcurrentMap[K, V]) Delete(key K) {
cm.data.Delete(key)
}

func (cm *ConcurrentMap[K, V]) Get(key K) (V, bool) {
v, ok := cm.data.Load(key)
if !ok {
return *new(V), false
}
return v.(V), true
}

func (cm *ConcurrentMap[K, V]) Range(f func(key K, value V) bool) {
cm.data.Range(func(key, value any) bool {
return f(key.(K), value.(V))
})
}

func (cm *ConcurrentMap[K, V]) CompareAndSwap(key K, old, new V) bool {
return cm.data.CompareAndSwap(key, old, new)
}

func (cm *ConcurrentMap[K, V]) CompareAndDelete(key K, value V) bool {
return cm.data.CompareAndDelete(key, value)
}

func (cm *ConcurrentMap[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
v, loaded := cm.data.LoadOrStore(key, value)
actual = v.(V)
return actual, loaded
}

func (cm *ConcurrentMap[K, V]) LoadAndDelete(key K) (value V, loaded bool) {
v, loaded := cm.data.LoadAndDelete(key)
value = v.(V)
return value, loaded
}
Loading

0 comments on commit 1d6895d

Please sign in to comment.