Skip to content

Commit

Permalink
Merge pull request kaytu-io#260 from kaytu-io/feat-adds-summary-table
Browse files Browse the repository at this point in the history
fix: Adds retry for jobs
  • Loading branch information
salehkhazaei committed Jun 26, 2024
2 parents 79d5e8f + ce98a79 commit 2235dc4
Showing 1 changed file with 40 additions and 12 deletions.
52 changes: 40 additions & 12 deletions pkg/plugin/sdk/job_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ import (
"time"
)

type JobProperties struct {
ID string
Description string
MaxRetry int
}

type Job interface {
Id() string
Description() string
Properties() JobProperties
Run(ctx context.Context) error
}

Expand All @@ -24,28 +29,31 @@ type JobQueue struct {
pendingCounter atomic.Uint32
finishedCounter atomic.Uint32
onFinish func(ctx context.Context)
retryCount map[string]int
}

func NewJobQueue(maxConcurrent int, stream *StreamController) *JobQueue {
return &JobQueue{
queue: make(chan Job, 10000),
maxConcurrent: maxConcurrent,
stream: stream,
retryCount: map[string]int{},

pendingCounter: atomic.Uint32{},
finishedCounter: atomic.Uint32{},
}
}

func (q *JobQueue) Push(job Job) {
log.Printf("Pushing job %s to queue", job.Id())
props := job.Properties()
log.Printf("Pushing job %s to queue", props.ID)
q.pendingCounter.Add(1)

q.stream.Send(&golang.PluginMessage{
PluginMessage: &golang.PluginMessage_Job{
Job: &golang.JobResult{
Id: job.Id(),
Description: job.Description(),
Id: props.ID,
Description: props.Description,
FailureMessage: "",
Done: false,
},
Expand Down Expand Up @@ -92,30 +100,50 @@ func (q *JobQueue) SetOnFinish(f func(ctx context.Context)) {
q.onFinish = f
}

func (q *JobQueue) runJob(ctx context.Context, job Job) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("job paniced: %v, stack: %v", r, string(debug.Stack()))
}
}()

return job.Run(ctx)
}

func (q *JobQueue) handleJob(ctx context.Context, job Job) {
props := job.Properties()

defer func() {
if r := recover(); r != nil {
log.Printf("Job queue handle job panic: %v, stack: %v", r, string(debug.Stack()))
q.stream.Send(&golang.PluginMessage{
PluginMessage: &golang.PluginMessage_Err{
Err: &golang.Error{Error: fmt.Sprintf("job %s paniced: %v", job.Id(), r)},
Err: &golang.Error{Error: fmt.Sprintf("job %s paniced: %v", props.ID, r)},
},
})
}
}()
defer q.finishedCounter.Add(1)

jobResult := &golang.JobResult{
Id: job.Id(),
Description: job.Description(),
Id: props.ID,
Description: props.Description,
Done: true,
}
log.Printf("Running job %s", job.Id())
if err := job.Run(ctx); err != nil {
log.Printf("Running job %s", props.ID)
if err := q.runJob(ctx, job); err != nil {
jobResult.FailureMessage = err.Error()
log.Printf("Failed job %s: %s", job.Id(), err.Error())
if q.retryCount[props.ID] < props.MaxRetry {
q.retryCount[props.ID]++

log.Printf("Failed job %s: %s, retrying[%d/%d]", props.ID, err.Error(), q.retryCount[props.ID], props.MaxRetry)
q.handleJob(ctx, job)
return
} else {
log.Printf("Failed job %s: %s", props.ID, err.Error())
}
} else {
log.Printf("Finished job %s", job.Id())
log.Printf("Finished job %s", props.ID)
}

q.stream.Send(&golang.PluginMessage{
Expand Down

0 comments on commit 2235dc4

Please sign in to comment.