diff --git a/core/scheduler/event.go b/core/scheduler/event.go index af0908b603..3464a8fa62 100644 --- a/core/scheduler/event.go +++ b/core/scheduler/event.go @@ -50,6 +50,11 @@ const ( SensorFailEvent JobEventType = "sensor_fail" SensorSuccessEvent JobEventType = "sensor_success" + OperatorStartEvent JobEventType = "operator_start" + OperatorRetryEvent JobEventType = "operator_retry" + OperatorFailEvent JobEventType = "operator_fail" + OperatorSuccessEvent JobEventType = "operator_success" + StatusFiring EventStatus = "firing" StatusResolved EventStatus = "resolved" ) @@ -101,6 +106,39 @@ func (s *SLAObject) String() string { return fmt.Sprintf("(job: %s,scheduledAt: %s)", s.JobName, s.JobScheduledAt.Format(time.RFC3339)) } +type OperatorRunInstance struct { + MaxTries int `json:"max_tries"` + OperatorName string `json:"task_id"` + StartTime time.Time `json:"start_date"` + OperatorKey string `json:"task_instance_key_str"` + TryNumber int `json:"attempt"` + EndTime *time.Time `json:"end_date,omitempty"` + LogURL string `json:"log_url"` +} + +type DagRun struct { + RunID string `json:"run_id"` + JobName string `json:"dag_id"` + ScheduledAt time.Time `json:"scheduled_at"` + ExecutionDate time.Time `json:"execution_date"` + StartTime time.Time `json:"start_date"` + EndTime *time.Time `json:"end_date,omitempty"` +} + +type OperatorObj struct { + DownstreamTaskIDs []string `json:"downstream_task_ids"` +} + +type EventContext struct { + Tenant tenant.Tenant + Type JobEventType `json:"event_type"` + OperatorType OperatorType `json:"operator_type"` + OperatorRunInstance OperatorRunInstance `json:"task_instance"` + DagRun DagRun `json:"dag_run"` + Task OperatorObj `json:"task"` + EventReason *string `json:"event_reason,omitempty"` +} + type Event struct { JobName JobName URN string @@ -112,6 +150,8 @@ type Event struct { JobScheduledAt time.Time Values map[string]any SLAObjectList []*SLAObject + + EventContext *EventContext } func (e Event) String() string { @@ -152,6 +192,23 @@ func (event JobEventType) String() string { return string(event) } +func EventContextFrom(rawEventContext any) (*EventContext, error) { + sc := EventContext{} + bytesArr, err := json.Marshal(rawEventContext) + if err != nil { + return nil, errors.InvalidArgument(EntityEvent, "unable to marshal event") + } + err = json.Unmarshal(bytesArr, &sc) + if err != nil { + return nil, errors.InvalidArgument(EntityEvent, "unable to unmarshal event, err:"+err.Error()) + } + sc.OperatorType, err = NewOperatorType(strings.ToLower(sc.OperatorType.String())) + if err != nil { + return nil, err + } + return &sc, nil +} + func EventFrom(eventTypeName string, eventValues map[string]any, jobName JobName, tenent tenant.Tenant) (*Event, error) { eventType, err := FromStringToEventType(eventTypeName) if err != nil { @@ -224,6 +281,13 @@ func EventFrom(eventTypeName string, eventValues map[string]any, jobName JobName return nil, errors.InvalidArgument(EntityEvent, "property 'scheduled_at' is not in appropriate format") } eventObj.JobScheduledAt = scheduledAtTimeStamp + + eventObj.EventContext, err = EventContextFrom(eventValues["event_context"]) + if err != nil { + return nil, err + } + eventObj.EventContext.Tenant = eventObj.Tenant } + return &eventObj, nil } diff --git a/core/scheduler/event_test.go b/core/scheduler/event_test.go index 8bbe8f7cdc..4ed2e6d0f6 100644 --- a/core/scheduler/event_test.go +++ b/core/scheduler/event_test.go @@ -190,12 +190,37 @@ func TestFromStringToEventType(t *testing.T) { "task_id": "some_txbq", "status": "running", "scheduled_at": "2022-01-02T15:04:05Z", + "event_context": map[string]any{ + "task_instance": map[string]any{ + "max_tries": 1, + "task_id": "mc2mc", + "task_instance_key_str": "campaign__mc2mc__20250926", + "attempt": 2, + "log_url": "http://localhost:8080/dags/campaign/grid?dag_run_id=scheduled__2025-09-26T03%3A40%3A00%2B00%3A00&task_id=mc2mc&map_index=-1&tab=logs", + "start_date": "2025-09-26T03:46:39Z", + }, + "dag_run": map[string]any{ + "dag_id": "campaign", + "scheduled_at": "2025-09-26T03:50:00Z", + "execution_date": "2025-09-26T03:40:00Z", + "run_id": "scheduled__2025-09-26T03:40:00+00:00", + "start_date": "2025-09-26T03:45:00Z", + }, + "task": map[string]any{"downstream_task_ids": []string{"hook_predator"}}, + "operator_type": "TASK", + "event_type": "operator_retry", + }, + "event_type": "TYPE_TASK_RETRY", } jobName := scheduler.JobName("some_job") tnnt, err := tenant.NewTenant("someProject", "someNamespace") eventTypeName := "TYPE_TASK_RETRY" assert.Nil(t, err) + startTime, _ := time.Parse(time.RFC3339, "2025-09-26T03:46:39Z") + jobScheduledAt, _ := time.Parse(time.RFC3339, "2025-09-26T03:50:00Z") + executionDate, _ := time.Parse(time.RFC3339, "2025-09-26T03:40:00Z") + jobstartDate, _ := time.Parse(time.RFC3339, "2025-09-26T03:45:00Z") outputObj := scheduler.Event{ JobName: jobName, Tenant: tnnt, @@ -205,6 +230,32 @@ func TestFromStringToEventType(t *testing.T) { Status: scheduler.StateRunning, JobScheduledAt: time.Date(2022, time.January, 2, 15, 0o4, 0o5, 0, time.UTC), Values: eventValues, + EventContext: &scheduler.EventContext{ + Type: scheduler.OperatorRetryEvent, + OperatorType: scheduler.OperatorTask, + OperatorRunInstance: scheduler.OperatorRunInstance{ + MaxTries: 1, + OperatorName: "mc2mc", + StartTime: startTime, + OperatorKey: "campaign__mc2mc__20250926", + TryNumber: 2, + EndTime: nil, + LogURL: "http://localhost:8080/dags/campaign/grid?dag_run_id=scheduled__2025-09-26T03%3A40%3A00%2B00%3A00&task_id=mc2mc&map_index=-1&tab=logs", + }, + DagRun: scheduler.DagRun{ + RunID: "scheduled__2025-09-26T03:40:00+00:00", + JobName: "campaign", + ScheduledAt: jobScheduledAt, + ExecutionDate: executionDate, + StartTime: jobstartDate, + EndTime: nil, + }, + Task: scheduler.OperatorObj{ + DownstreamTaskIDs: []string{"hook_predator"}, + }, + EventReason: nil, + Tenant: tnnt, + }, } output, err := scheduler.EventFrom(eventTypeName, eventValues, jobName, tnnt) assert.Nil(t, err) diff --git a/core/scheduler/handler/v1beta1/job_run_test.go b/core/scheduler/handler/v1beta1/job_run_test.go index bf28b8ae43..c7f5234efa 100644 --- a/core/scheduler/handler/v1beta1/job_run_test.go +++ b/core/scheduler/handler/v1beta1/job_run_test.go @@ -583,6 +583,10 @@ func TestJobRunHandler(t *testing.T) { "task_id": "wait_sample_select", "status": "success", "scheduled_at": "2022-01-02T15:04:05Z", + "event_context": map[string]any{ + "operator_type": "SENSOR", + "event_type": "operator_success", + }, }, ) req := &pb.RegisterJobEventRequest{ @@ -630,6 +634,10 @@ func TestJobRunHandler(t *testing.T) { "status": "success", "scheduled_at": "2022-01-02T15:04:05Z", "task_id": "wait_sample_select", + "event_context": map[string]any{ + "operator_type": "SENSOR", + "event_type": "operator_success", + }, }, ) req := &pb.RegisterJobEventRequest{ diff --git a/core/scheduler/job.go b/core/scheduler/job.go index 919b84e97d..5970a40da0 100644 --- a/core/scheduler/job.go +++ b/core/scheduler/job.go @@ -1,6 +1,8 @@ package scheduler import ( + "crypto/sha256" + "encoding/hex" "fmt" "strings" "time" @@ -24,6 +26,19 @@ func (o OperatorType) String() string { return string(o) } +func NewOperatorType(op string) (OperatorType, error) { + switch strings.ToLower(op) { + case OperatorTask.String(): + return OperatorTask, nil + case OperatorSensor.String(): + return OperatorSensor, nil + case OperatorHook.String(): + return OperatorHook, nil + default: + return OperatorType(op), errors.InvalidArgument(EntityEvent, "invalid operator type, supported : task, sensor, hook") + } +} + const ( EntityJobRun = "jobRun" @@ -76,6 +91,33 @@ func (j *Job) IsDryRun() bool { return false } +func (j *Job) GetTaskAlertConfig() *OperatorAlertConfig { + if j.Task == nil { + return nil + } + return j.Task.AlertConfig +} + +func (j *Job) GetHookAlertConfigByName(hookName string) *OperatorAlertConfig { + for _, hook := range j.Hooks { + if hook.Name == hookName { + return hook.AlertConfig + } + } + return nil +} + +func (j *Job) GetOperatorAlertConfigByName(operatorType OperatorType, operatorName string) *OperatorAlertConfig { + switch operatorType { + case OperatorTask: + return j.GetTaskAlertConfig() + case OperatorHook: + return j.GetHookAlertConfigByName(operatorName) + default: + return nil + } +} + func (j *Job) GetHook(hookName string) (*Hook, error) { for _, hook := range j.Hooks { if hook.Name == hookName { @@ -119,6 +161,12 @@ type SLAAlertConfig struct { Severity Severity `json:"severity,omitempty"` } +func (s SLAAlertConfig) Tag() string { + tagStr := fmt.Sprintf("%s:%s", s.Severity, s.DurationThreshold) + hash := sha256.Sum256([]byte(tagStr)) + return hex.EncodeToString(hash[:]) +} + type OperatorAlertConfig struct { SLAAlertConfigs []*SLAAlertConfig `json:"sla_alert_configs,omitempty"` Team string `json:"team,omitempty"` diff --git a/core/scheduler/service/deployment_service_test.go b/core/scheduler/service/deployment_service_test.go index 0c6c3a3d39..a678ce7101 100644 --- a/core/scheduler/service/deployment_service_test.go +++ b/core/scheduler/service/deployment_service_test.go @@ -84,7 +84,7 @@ func TestDeploymentService(t *testing.T) { defer jobRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) + jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, nil, feats) err := runService.UploadToScheduler(ctx, proj1Name) assert.NotNil(t, err) @@ -100,7 +100,7 @@ func TestDeploymentService(t *testing.T) { defer priorityResolver.AssertExpectations(t) runService := service.NewJobRunService(logger, - jobRepo, nil, nil, nil, nil, priorityResolver, nil, nil, nil, feats) + jobRepo, nil, nil, nil, nil, nil, priorityResolver, nil, nil, nil, feats) err := runService.UploadToScheduler(ctx, proj1Name) assert.NotNil(t, err) @@ -120,7 +120,7 @@ func TestDeploymentService(t *testing.T) { Return(fmt.Errorf("DeployJobs tnnt1 error")) defer mScheduler.AssertExpectations(t) - runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, + runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, mScheduler, priorityResolver, nil, nil, nil, feats) err := runService.UploadToScheduler(ctx, proj1Name) @@ -148,7 +148,7 @@ func TestDeploymentService(t *testing.T) { mScheduler.On("DeleteJobs", mock.Anything, tnnt2, []string{"job4-to-delete"}).Return(nil) defer mScheduler.AssertExpectations(t) - runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, + runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, mScheduler, priorityResolver, nil, nil, nil, feats) err := runService.UploadToScheduler(ctx, proj1Name) @@ -173,7 +173,7 @@ func TestDeploymentService(t *testing.T) { mScheduler.On("DeleteJobs", mock.Anything, tnnt2, []string{"job4-to-delete"}).Return(nil) defer mScheduler.AssertExpectations(t) - runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, + runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, mScheduler, priorityResolver, nil, nil, nil, feats) err := runService.UploadToScheduler(ctx, proj1Name) @@ -197,7 +197,7 @@ func TestDeploymentService(t *testing.T) { mScheduler := new(mockScheduler) defer mScheduler.AssertExpectations(t) - runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, + runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, mScheduler, priorityResolver, nil, nil, nil, feats) err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete) @@ -219,7 +219,7 @@ func TestDeploymentService(t *testing.T) { mScheduler := new(mockScheduler) defer mScheduler.AssertExpectations(t) - runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, + runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, mScheduler, priorityResolver, nil, nil, nil, feats) err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete) @@ -242,7 +242,7 @@ func TestDeploymentService(t *testing.T) { mScheduler.On("DeployJobs", mock.Anything, tnnt1, jobsToUpload).Return(errors.New("internal error")) defer mScheduler.AssertExpectations(t) - runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, + runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, mScheduler, priorityResolver, nil, nil, nil, feats) err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete) @@ -256,7 +256,7 @@ func TestDeploymentService(t *testing.T) { mScheduler.On("DeleteJobs", mock.Anything, tnnt1, jobNamesToDelete).Return(errors.New("internal error")) defer mScheduler.AssertExpectations(t) - runService := service.NewJobRunService(logger, nil, nil, nil, nil, + runService := service.NewJobRunService(logger, nil, nil, nil, nil, nil, mScheduler, nil, nil, nil, nil, feats) err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete) @@ -280,7 +280,7 @@ func TestDeploymentService(t *testing.T) { mScheduler.On("DeleteJobs", mock.Anything, tnnt1, jobNamesToDelete).Return(nil) defer mScheduler.AssertExpectations(t) - runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, + runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, mScheduler, priorityResolver, nil, nil, nil, feats) err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete) @@ -303,7 +303,7 @@ func TestDeploymentService(t *testing.T) { mScheduler.On("DeployJobs", mock.Anything, tnnt1, jobsToUpload).Return(nil) defer mScheduler.AssertExpectations(t) - runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, + runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, mScheduler, priorityResolver, nil, nil, nil, feats) err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete) @@ -317,7 +317,7 @@ func TestDeploymentService(t *testing.T) { mScheduler.On("DeleteJobs", mock.Anything, tnnt1, jobNamesToDelete).Return(nil) defer mScheduler.AssertExpectations(t) - runService := service.NewJobRunService(logger, nil, nil, nil, nil, + runService := service.NewJobRunService(logger, nil, nil, nil, nil, nil, mScheduler, nil, nil, nil, nil, feats) err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete) @@ -341,7 +341,7 @@ func TestDeploymentService(t *testing.T) { mScheduler.On("DeployJobs", mock.Anything, tnnt1, jobsToUpload).Return(nil) defer mScheduler.AssertExpectations(t) - runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, + runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, mScheduler, priorityResolver, nil, nil, nil, feats) err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete) diff --git a/core/scheduler/service/job_run_service.go b/core/scheduler/service/job_run_service.go index 03896ea70a..0f763db4f1 100644 --- a/core/scheduler/service/job_run_service.go +++ b/core/scheduler/service/job_run_service.go @@ -41,7 +41,7 @@ const ( var jobRunEventsMetric = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "operator_stats", Help: "total job run events received", -}, []string{"operator_name", "event_type"}) +}, []string{"operator_name", "operator_type", "event_type"}) var jobRunDdurationsBreakdownSeconds = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "jobrun_durations_breakdown_seconds", @@ -84,6 +84,12 @@ type OperatorRunRepository interface { UpdateOperatorRun(ctx context.Context, operator scheduler.OperatorType, jobRunID uuid.UUID, eventTime time.Time, state scheduler.State) error } +type SLARepository interface { + RegisterSLA(ctx context.Context, projectName tenant.ProjectName, jobName, operatorName, operatorType, runID string, slaTime time.Time, description string, scheduledAt, operatorStartTime time.Time) error + UpdateSLA(ctx context.Context, projectName tenant.ProjectName, jobName, operatorName, operatorType, runID string, slaTime, operatorStartTime time.Time) error + FinishSLA(ctx context.Context, projectName tenant.ProjectName, jobName, operatorName, operatorType, runID string, operatorEndTime time.Time) error +} + type JobInputCompiler interface { Compile(ctx context.Context, job *scheduler.JobWithDetails, config scheduler.RunConfig, executedAt time.Time) (*scheduler.ExecutorInput, error) } @@ -115,6 +121,7 @@ type JobRunService struct { l log.Logger repo JobRunRepository replayRepo JobReplayRepository + slaRepo SLARepository operatorRunRepo OperatorRunRepository eventHandler EventHandler scheduler Scheduler @@ -780,6 +787,59 @@ func (s *JobRunService) raiseJobRunStateChangeEvent(jobRun *scheduler.JobRun) { s.eventHandler.HandleEvent(schedulerEvent) } +func (s *JobRunService) registerConfiguredSLA(ctx context.Context, event *scheduler.Event) error { + job, err := s.jobRepo.GetJob(ctx, event.Tenant.ProjectName(), event.JobName) + if err != nil { + s.l.Error("error getting job by job name [%s]: %s", event.JobName, err) + return err + } + eventCtx := event.EventContext + + alertConfig := job.GetOperatorAlertConfigByName(eventCtx.OperatorType, eventCtx.OperatorRunInstance.OperatorName) + if alertConfig == nil { + return nil + } + return s.registerSLAs(ctx, eventCtx, alertConfig.SLAAlertConfigs) +} + +func (s *JobRunService) registerSLAs(ctx context.Context, eventCtx *scheduler.EventContext, slaAlertConfigs []*scheduler.SLAAlertConfig) error { + jobRunID := eventCtx.DagRun.RunID + operatorStartTime := eventCtx.OperatorRunInstance.StartTime + operatorType := eventCtx.OperatorType + operatorName := eventCtx.OperatorRunInstance.OperatorName + jobName := eventCtx.DagRun.JobName + + me := errors.NewMultiError("errorInRegisterSLA") + for _, slaAlertConfig := range slaAlertConfigs { + slaBoundary := operatorStartTime.Add(slaAlertConfig.DurationThreshold) + err := s.slaRepo.RegisterSLA(ctx, eventCtx.Tenant.ProjectName(), jobName, operatorName, operatorType.String(), + jobRunID, slaBoundary, slaAlertConfig.Tag(), eventCtx.DagRun.ScheduledAt, eventCtx.OperatorRunInstance.StartTime) + if err != nil { + errMsg := fmt.Sprintf("error registering sla for operator Run Id: %s, Type: %s, Sla: %s, err: %s", jobRunID, operatorType.String(), slaAlertConfig.Tag(), err.Error()) + me.Append(errors.Wrap(scheduler.EntityEvent, errMsg, err)) + s.l.Error(errMsg) + } + } + return me.ToErr() +} + +func (s *JobRunService) getExistingOperatorRun(ctx context.Context, event *scheduler.Event) (*scheduler.OperatorRun, error) { + jobRun, err := s.getJobRunByScheduledAt(ctx, event.Tenant, event.JobName, event.JobScheduledAt) + if err != nil { + s.l.Error("error getting job run by scheduled time [%s]: %s", event.JobScheduledAt, err) + return nil, err + } + existingOperatorRun, err := s.operatorRunRepo.GetOperatorRun(ctx, event.OperatorName, event.EventContext.OperatorType, jobRun.ID) + if err != nil { + if errors.IsErrorType(err, errors.ErrNotFound) { + return nil, nil //nolint:nilnil + } + s.l.Error("error checking existing operator run state [%s], operatorName [%s] : %s", jobRun.ID, event.OperatorName, err) + return nil, err + } + return existingOperatorRun, nil +} + func (s *JobRunService) createOperatorRun(ctx context.Context, event *scheduler.Event, operatorType scheduler.OperatorType) error { jobRun, err := s.getJobRunByScheduledAt(ctx, event.Tenant, event.JobName, event.JobScheduledAt) if err != nil { @@ -801,7 +861,13 @@ func (s *JobRunService) createOperatorRun(ctx context.Context, event *scheduler. s.raiseJobRunStateChangeEvent(jobRun) } - return s.operatorRunRepo.CreateOperatorRun(ctx, event.OperatorName, operatorType, jobRun.ID, event.EventTime) + err = s.operatorRunRepo.CreateOperatorRun(ctx, event.OperatorName, operatorType, jobRun.ID, event.EventTime) + if err != nil { + s.l.Error("error registering operator run job [%s], type [%s], operator [%s] : %s", event.JobName, operatorType, event.OperatorName, err) + return err + } + + return err } func (s *JobRunService) getOperatorRun(ctx context.Context, event *scheduler.Event, operatorType scheduler.OperatorType, jobRunID uuid.UUID) (*scheduler.OperatorRun, error) { @@ -829,6 +895,19 @@ func (s *JobRunService) getOperatorRun(ctx context.Context, event *scheduler.Eve return operatorRun, nil } +func (s *JobRunService) CleanupSLA(ctx context.Context, event *scheduler.Event) error { + if event.EventContext.Type == scheduler.OperatorSuccessEvent || event.EventContext.Type == scheduler.OperatorFailEvent { + operatorEndTime := *event.EventContext.OperatorRunInstance.EndTime + err := s.slaRepo.FinishSLA(ctx, event.Tenant.ProjectName(), event.JobName.String(), event.OperatorName, event.EventContext.OperatorType.String(), event.EventContext.DagRun.RunID, operatorEndTime) + if err != nil { + s.l.Error("error finishing sla update for operator run [%s:%s:%s]: %s", + event.JobName.String(), event.OperatorName, event.JobScheduledAt.Format(time.RFC3339), err) + return err + } + } + return nil +} + func (s *JobRunService) updateOperatorRun(ctx context.Context, event *scheduler.Event, operatorType scheduler.OperatorType) error { jobRun, err := s.getJobRunByScheduledAt(ctx, event.Tenant, event.JobName, event.JobScheduledAt) if err != nil { @@ -845,6 +924,7 @@ func (s *JobRunService) updateOperatorRun(ctx context.Context, event *scheduler. s.l.Error("error updating operator run id [%s]: %s", operatorRun.ID, err) return err } + jobRunDdurationsBreakdownSeconds.WithLabelValues( event.Tenant.ProjectName().String(), event.Tenant.NamespaceName().String(), @@ -867,15 +947,10 @@ func (s *JobRunService) trackEvent(event *scheduler.Event) { event.Type, event.EventTime.Format("01/02/06 15:04:05 MST"), event.JobName, event.OperatorName, event.JobScheduledAt.Format("01/02/06 15:04:05 MST"), event.Status) } - switch event.Type { - case scheduler.SensorSuccessEvent, scheduler.SensorRetryEvent, scheduler.SensorFailEvent: - jobRunEventsMetric.WithLabelValues( - scheduler.OperatorSensor.String(), - event.Type.String(), - ).Inc() - case scheduler.TaskSuccessEvent, scheduler.TaskRetryEvent, scheduler.TaskFailEvent, scheduler.HookSuccessEvent, scheduler.HookRetryEvent, scheduler.HookFailEvent: + if event.Type != scheduler.SLAMissEvent && event.Type != scheduler.JobSuccessEvent && event.Type != scheduler.JobFailureEvent { jobRunEventsMetric.WithLabelValues( event.OperatorName, + event.EventContext.OperatorType.String(), event.Type.String(), ).Inc() } @@ -889,31 +964,46 @@ func (s *JobRunService) UpdateJobState(ctx context.Context, event *scheduler.Eve return s.updateJobRunSLA(ctx, event) case scheduler.JobSuccessEvent, scheduler.JobFailureEvent: return s.updateJobRun(ctx, event) - case scheduler.TaskStartEvent: - return s.createOperatorRun(ctx, event, scheduler.OperatorTask) - case scheduler.TaskSuccessEvent, scheduler.TaskRetryEvent, scheduler.TaskFailEvent: - return s.updateOperatorRun(ctx, event, scheduler.OperatorTask) case scheduler.SensorStartEvent: return s.createOperatorRun(ctx, event, scheduler.OperatorSensor) case scheduler.SensorSuccessEvent, scheduler.SensorRetryEvent, scheduler.SensorFailEvent: return s.updateOperatorRun(ctx, event, scheduler.OperatorSensor) - case scheduler.HookStartEvent: - return s.createOperatorRun(ctx, event, scheduler.OperatorHook) - case scheduler.HookSuccessEvent, scheduler.HookRetryEvent, scheduler.HookFailEvent: - return s.updateOperatorRun(ctx, event, scheduler.OperatorHook) + case scheduler.TaskStartEvent, scheduler.HookStartEvent: + + existingOperatorRun, err := s.getExistingOperatorRun(ctx, event) + if err != nil { + return err + } + err = s.createOperatorRun(ctx, event, event.EventContext.OperatorType) + if err != nil { + return err + } + if existingOperatorRun != nil && existingOperatorRun.Status == scheduler.StateRetry { + return nil + } + return s.registerConfiguredSLA(ctx, event) + case scheduler.TaskSuccessEvent, scheduler.TaskFailEvent, scheduler.HookSuccessEvent, scheduler.HookFailEvent: + err := s.updateOperatorRun(ctx, event, event.EventContext.OperatorType) + if err != nil { + return err + } + return s.CleanupSLA(ctx, event) + case scheduler.TaskRetryEvent, scheduler.HookRetryEvent: + return s.updateOperatorRun(ctx, event, event.EventContext.OperatorType) default: return errors.InvalidArgument(scheduler.EntityEvent, "invalid event type: "+string(event.Type)) } } func NewJobRunService(logger log.Logger, jobRepo JobRepository, jobRunRepo JobRunRepository, replayRepo JobReplayRepository, - operatorRunRepo OperatorRunRepository, scheduler Scheduler, resolver PriorityResolver, compiler JobInputCompiler, eventHandler EventHandler, - projectGetter ProjectGetter, features config.FeaturesConfig, + operatorRunRepo OperatorRunRepository, slaRepo SLARepository, scheduler Scheduler, resolver PriorityResolver, + compiler JobInputCompiler, eventHandler EventHandler, projectGetter ProjectGetter, features config.FeaturesConfig, ) *JobRunService { return &JobRunService{ l: logger, repo: jobRunRepo, operatorRunRepo: operatorRunRepo, + slaRepo: slaRepo, scheduler: scheduler, eventHandler: eventHandler, replayRepo: replayRepo, diff --git a/core/scheduler/service/job_run_service_test.go b/core/scheduler/service/job_run_service_test.go index 76486959e5..a7a7dea0e7 100644 --- a/core/scheduler/service/job_run_service_test.go +++ b/core/scheduler/service/job_run_service_test.go @@ -69,13 +69,21 @@ func TestJobRunService(t *testing.T) { t.Run("UpdateJobState", func(t *testing.T) { t.Run("should reject unregistered events", func(t *testing.T) { runService := service.NewJobRunService(logger, - nil, nil, nil, nil, nil, nil, nil, nil, nil, feats) + nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, feats) event := &scheduler.Event{ JobName: jobName, Tenant: tnnt, Type: "UnregisteredEventTYpe", Values: map[string]any{}, + EventContext: &scheduler.EventContext{ + Type: "UnregisteredEventTYpe", + OperatorType: "UnregisteredOpType", + OperatorRunInstance: scheduler.OperatorRunInstance{}, + DagRun: scheduler.DagRun{}, + Task: scheduler.OperatorObj{}, + EventReason: nil, + }, } err := runService.UpdateJobState(ctx, event) assert.NotNil(t, err) @@ -92,7 +100,7 @@ func TestJobRunService(t *testing.T) { defer jobRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - jobRepo, jobRunRepository, nil, nil, nil, nil, nil, nil, nil, feats) + jobRepo, jobRunRepository, nil, nil, nil, nil, nil, nil, nil, nil, feats) event := &scheduler.Event{ JobName: jobName, @@ -102,6 +110,10 @@ func TestJobRunService(t *testing.T) { OperatorName: "taskBq2bq", JobScheduledAt: scheduledAtTimeStamp, Values: map[string]any{}, + EventContext: &scheduler.EventContext{ + Type: scheduler.OperatorStartEvent, + OperatorType: scheduler.OperatorTask, + }, } err := runService.UpdateJobState(ctx, event) assert.NotNil(t, err) @@ -137,7 +149,7 @@ func TestJobRunService(t *testing.T) { defer jobRunRepository.AssertExpectations(t) runService := service.NewJobRunService(logger, - jobRepo, jobRunRepository, nil, nil, nil, nil, nil, nil, nil, feats) + jobRepo, jobRunRepository, nil, nil, nil, nil, nil, nil, nil, nil, feats) event := &scheduler.Event{ JobName: jobName, @@ -145,6 +157,10 @@ func TestJobRunService(t *testing.T) { Type: scheduler.TaskStartEvent, JobScheduledAt: scheduledAtTimeStamp, Values: map[string]any{}, + EventContext: &scheduler.EventContext{ + Type: scheduler.OperatorStartEvent, + OperatorType: scheduler.OperatorTask, + }, } err := runService.UpdateJobState(ctx, event) @@ -189,6 +205,9 @@ func TestJobRunService(t *testing.T) { "status": "success", "monitoring": monitoring, }, + EventContext: &scheduler.EventContext{ + Type: scheduler.JobSuccessEvent, + }, } projectGetter := new(mockProjectGetter) @@ -222,7 +241,7 @@ func TestJobRunService(t *testing.T) { defer eventHandler.AssertExpectations(t) runService := service.NewJobRunService(logger, - jobRepo, jobRunRepo, nil, operatorRunRepo, nil, nil, nil, eventHandler, projectGetter, feats) + jobRepo, jobRunRepo, nil, operatorRunRepo, nil, nil, nil, nil, eventHandler, projectGetter, feats) err = runService.UpdateJobState(ctx, event) assert.Nil(t, err) @@ -265,7 +284,7 @@ func TestJobRunService(t *testing.T) { defer eventHandler.AssertExpectations(t) runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, nil, nil, nil, nil, eventHandler, nil, feats) + nil, jobRunRepo, nil, nil, nil, nil, nil, nil, eventHandler, nil, feats) err := runService.UpdateJobState(ctx, event) assert.Nil(t, err) @@ -326,7 +345,7 @@ func TestJobRunService(t *testing.T) { defer jobRunRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, feats) + nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) err := runService.UpdateJobState(ctx, event) assert.NotNil(t, err) @@ -347,7 +366,7 @@ func TestJobRunService(t *testing.T) { projectGetter.On("Get", ctx, projName).Return(project, nil) runService := service.NewJobRunService(logger, - jobRepo, jobRunRepo, nil, nil, nil, nil, nil, nil, projectGetter, feats) + jobRepo, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, projectGetter, feats) err := runService.UpdateJobState(ctx, event) assert.NotNil(t, err) @@ -368,7 +387,7 @@ func TestJobRunService(t *testing.T) { defer jobRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - jobRepo, jobRunRepo, nil, nil, nil, nil, nil, nil, projectGetter, feats) + jobRepo, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, projectGetter, feats) err := runService.UpdateJobState(ctx, event) assert.NotNil(t, err) @@ -396,7 +415,7 @@ func TestJobRunService(t *testing.T) { defer jobRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - jobRepo, jobRunRepo, nil, nil, nil, nil, nil, eventHandler, projectGetter, feats) + jobRepo, jobRunRepo, nil, nil, nil, nil, nil, nil, eventHandler, projectGetter, feats) err := runService.UpdateJobState(ctx, event) assert.Nil(t, err) @@ -415,6 +434,10 @@ func TestJobRunService(t *testing.T) { JobScheduledAt: scheduledAtTimeStamp, EventTime: eventTime, Values: map[string]any{}, + EventContext: &scheduler.EventContext{ + Type: scheduler.OperatorStartEvent, + OperatorType: scheduler.OperatorTask, + }, } jobRunRepo := new(mockJobRunRepository) @@ -422,7 +445,7 @@ func TestJobRunService(t *testing.T) { defer jobRunRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, feats) + nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) err := runService.UpdateJobState(ctx, event) assert.NotNil(t, err) @@ -438,6 +461,10 @@ func TestJobRunService(t *testing.T) { JobScheduledAt: scheduledAtTimeStamp, EventTime: eventTime, Values: map[string]any{}, + EventContext: &scheduler.EventContext{ + Type: scheduler.OperatorStartEvent, + OperatorType: scheduler.OperatorSensor, + }, } jobRunRepo := new(mockJobRunRepository) @@ -445,7 +472,7 @@ func TestJobRunService(t *testing.T) { defer jobRunRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, feats) + nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) err := runService.UpdateJobState(ctx, event) assert.NotNil(t, err) @@ -461,6 +488,10 @@ func TestJobRunService(t *testing.T) { JobScheduledAt: scheduledAtTimeStamp, EventTime: eventTime, Values: map[string]any{}, + EventContext: &scheduler.EventContext{ + Type: scheduler.OperatorStartEvent, + OperatorType: scheduler.OperatorHook, + }, } jobRunRepo := new(mockJobRunRepository) @@ -468,7 +499,7 @@ func TestJobRunService(t *testing.T) { defer jobRunRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, feats) + nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) err := runService.UpdateJobState(ctx, event) assert.NotNil(t, err) @@ -477,15 +508,20 @@ func TestJobRunService(t *testing.T) { t.Run("on TaskStartEvent", func(t *testing.T) { scheduledAtTimeStamp, _ := time.Parse(scheduler.ISODateFormat, "2022-01-02T15:04:05Z") eventTime := time.Unix(todayDate.Add(time.Hour).Unix(), 0) + operatorName := "task_bq3bq" event := &scheduler.Event{ JobName: jobName, Tenant: tnnt, Type: scheduler.TaskStartEvent, Status: scheduler.StateRunning, EventTime: eventTime, - OperatorName: "task_bq3bq", + OperatorName: operatorName, JobScheduledAt: scheduledAtTimeStamp, Values: map[string]any{}, + EventContext: &scheduler.EventContext{ + Type: scheduler.OperatorStartEvent, + OperatorType: scheduler.OperatorTask, + }, } jobRun := scheduler.JobRun{ @@ -503,14 +539,29 @@ func TestJobRunService(t *testing.T) { t.Run("should pass creating new operator run ", func(t *testing.T) { operatorRunRepository := new(mockOperatorRunRepository) operatorRunRepository.On("CreateOperatorRun", ctx, event.OperatorName, scheduler.OperatorTask, jobRun.ID, eventTime).Return(nil) + + operatorRunRepository.On("GetOperatorRun", ctx, event.OperatorName, scheduler.OperatorTask, jobRun.ID).Return(&scheduler.OperatorRun{}, nil) + defer operatorRunRepository.AssertExpectations(t) eventHandler := newEventHandler(t) eventHandler.On("HandleEvent", mock.Anything).Times(1) defer eventHandler.AssertExpectations(t) + job := scheduler.Job{ + Name: jobName, + Tenant: tnnt, + Task: &scheduler.Task{ + Config: map[string]string{}, + }, + } + + jobRepo := new(JobRepository) + jobRepo.On("GetJob", ctx, projName, jobName). + Return(&job, nil) + runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, operatorRunRepository, nil, nil, nil, eventHandler, nil, feats) + jobRepo, jobRunRepo, nil, operatorRunRepository, nil, nil, nil, nil, eventHandler, nil, feats) err := runService.UpdateJobState(ctx, event) assert.Nil(t, err) @@ -521,6 +572,7 @@ func TestJobRunService(t *testing.T) { t.Run("on TaskSuccessEvent should create task_run row", func(t *testing.T) { scheduledAtTimeStamp, _ := time.Parse(scheduler.ISODateFormat, "2022-01-02T15:04:05Z") eventTime := time.Unix(todayDate.Add(time.Hour).Unix(), 0) + dagRunID := "manual__2025-09-26T11:17:01+00:00" event := &scheduler.Event{ JobName: jobName, Tenant: tnnt, @@ -532,6 +584,18 @@ func TestJobRunService(t *testing.T) { Values: map[string]any{ "status": "success", }, + EventContext: &scheduler.EventContext{ + Type: scheduler.OperatorSuccessEvent, + OperatorType: scheduler.OperatorTask, + OperatorRunInstance: scheduler.OperatorRunInstance{ + OperatorName: "task_bq2bq", + EndTime: &eventTime, + }, + DagRun: scheduler.DagRun{ + RunID: dagRunID, + JobName: jobName.String(), + }, + }, } t.Run("scenario OperatorRun not found and new operator creation fails", func(t *testing.T) { @@ -556,7 +620,7 @@ func TestJobRunService(t *testing.T) { defer eventHandler.AssertExpectations(t) runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, operatorRunRepository, nil, nil, nil, eventHandler, nil, feats) + nil, jobRunRepo, nil, operatorRunRepository, nil, nil, nil, nil, eventHandler, nil, feats) err := runService.UpdateJobState(ctx, event) assert.NotNil(t, err) @@ -586,7 +650,7 @@ func TestJobRunService(t *testing.T) { defer eventHandler.AssertExpectations(t) runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, operatorRunRepository, nil, nil, nil, eventHandler, nil, feats) + nil, jobRunRepo, nil, operatorRunRepository, nil, nil, nil, nil, eventHandler, nil, feats) err := runService.UpdateJobState(ctx, event) assert.NotNil(t, err) @@ -616,8 +680,12 @@ func TestJobRunService(t *testing.T) { jobRunRepo.On("GetByScheduledAt", ctx, tnnt, jobName, scheduledAtTimeStamp).Return(&jobRun, nil) defer jobRunRepo.AssertExpectations(t) + slaRepo := new(mockSLARepository) + slaRepo.On("FinishSLA", ctx, tnnt.ProjectName(), jobName.String(), event.OperatorName, scheduler.OperatorTask.String(), dagRunID, event.EventTime).Return(nil) + defer slaRepo.AssertExpectations(t) + runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, operatorRunRepository, nil, nil, nil, nil, nil, feats) + nil, jobRunRepo, nil, operatorRunRepository, slaRepo, nil, nil, nil, nil, nil, feats) err := runService.UpdateJobState(ctx, event) assert.Nil(t, err) @@ -636,6 +704,10 @@ func TestJobRunService(t *testing.T) { Values: map[string]any{ "status": "success", }, + EventContext: &scheduler.EventContext{ + Type: scheduler.OperatorSuccessEvent, + OperatorType: scheduler.OperatorSensor, + }, } jobRunRepo := new(mockJobRunRepository) @@ -643,7 +715,7 @@ func TestJobRunService(t *testing.T) { defer jobRunRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, feats) + nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) err := runService.UpdateJobState(ctx, event) assert.NotNil(t, err) @@ -662,6 +734,10 @@ func TestJobRunService(t *testing.T) { Values: map[string]any{ "status": "success", }, + EventContext: &scheduler.EventContext{ + Type: scheduler.OperatorSuccessEvent, + OperatorType: scheduler.OperatorHook, + }, } jobRun := scheduler.JobRun{ @@ -680,7 +756,7 @@ func TestJobRunService(t *testing.T) { // operatorRunRepository.On("UpdateOperatorRun", ctx, scheduler.OperatorSensor, operatorRun.ID, eventTime, "success").Return(nil) defer operatorRunRepository.AssertExpectations(t) runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, operatorRunRepository, nil, nil, nil, nil, nil, feats) + nil, jobRunRepo, nil, operatorRunRepository, nil, nil, nil, nil, nil, nil, feats) err := runService.UpdateJobState(ctx, event) assert.NotNil(t, err) @@ -747,7 +823,7 @@ func TestJobRunService(t *testing.T) { defer jobRunRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, feats) + nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) err := runService.UpdateJobState(ctx, event) assert.Nil(t, err) @@ -770,7 +846,7 @@ func TestJobRunService(t *testing.T) { defer jobRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) + jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, nil, feats) executorInput, err := runService.JobRunInput(ctx, projName, jobName, scheduler.RunConfig{}) assert.Nil(t, executorInput) assert.NotNil(t, err) @@ -830,7 +906,7 @@ func TestJobRunService(t *testing.T) { defer jobInputCompiler.AssertExpectations(t) runService := service.NewJobRunService(logger, - jobRepo, jobRunRepo, jobReplayRepo, nil, nil, nil, jobInputCompiler, nil, nil, feats) + jobRepo, jobRunRepo, jobReplayRepo, nil, nil, nil, nil, jobInputCompiler, nil, nil, feats) executorInput, err := runService.JobRunInput(ctx, projName, jobName, runConfig) assert.Equal(t, &dummyExecutorInput, executorInput) @@ -892,7 +968,7 @@ func TestJobRunService(t *testing.T) { defer jobInputCompiler.AssertExpectations(t) runService := service.NewJobRunService(logger, - jobRepo, jobRunRepo, jobReplayRepo, nil, nil, nil, jobInputCompiler, nil, nil, feats) + jobRepo, jobRunRepo, jobReplayRepo, nil, nil, nil, nil, jobInputCompiler, nil, nil, feats) executorInput, err := runService.JobRunInput(ctx, projName, jobName, runConfig) assert.Equal(t, &dummyExecutorInput, executorInput) @@ -947,7 +1023,7 @@ func TestJobRunService(t *testing.T) { defer jobInputCompiler.AssertExpectations(t) runService := service.NewJobRunService(logger, - jobRepo, jobRunRepo, jobReplayRepo, nil, nil, nil, jobInputCompiler, nil, nil, feats) + jobRepo, jobRunRepo, jobReplayRepo, nil, nil, nil, nil, jobInputCompiler, nil, nil, feats) executorInput, err := runService.JobRunInput(ctx, projName, jobName, runConfig) assert.Equal(t, &dummyExecutorInput, executorInput) @@ -1001,7 +1077,7 @@ func TestJobRunService(t *testing.T) { defer jobInputCompiler.AssertExpectations(t) runService := service.NewJobRunService(logger, - jobRepo, jobRunRepo, jobReplayRepo, nil, nil, nil, jobInputCompiler, nil, nil, feats) + jobRepo, jobRunRepo, jobReplayRepo, nil, nil, nil, nil, jobInputCompiler, nil, nil, feats) executorInput, err := runService.JobRunInput(ctx, projName, jobName, runConfig) assert.Nil(t, err) @@ -1036,7 +1112,7 @@ func TestJobRunService(t *testing.T) { defer jobRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) + jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, nil, feats) returnedRuns, _, err := runService.GetJobRuns(ctx, projName, jobName, criteria) assert.Error(t, err) assert.ErrorContains(t, err, "unable to get job details for jobName: sample_select, project:proj") @@ -1083,7 +1159,7 @@ func TestJobRunService(t *testing.T) { defer jobRunRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - jobRepo, jobRunRepo, nil, nil, sch, nil, nil, nil, projectGetter, feats) + jobRepo, jobRunRepo, nil, nil, nil, sch, nil, nil, nil, projectGetter, feats) var err error var returnedRuns []*scheduler.JobRunStatus returnedRuns, _, err = runService.GetJobRuns(ctx, projName, jobName, criteria) @@ -1231,7 +1307,7 @@ func TestJobRunService(t *testing.T) { defer jobRunRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - jobRepo, jobRunRepo, nil, nil, sch, nil, nil, nil, projectGetter, feats) + jobRepo, jobRunRepo, nil, nil, nil, sch, nil, nil, nil, projectGetter, feats) returnedRuns, _, err := runService.GetJobRuns(ctx, projName, jobName, scenario.input) assert.Nil(t, err) assert.Equal(t, scenario.expectedResult, returnedRuns) @@ -1312,7 +1388,7 @@ func TestJobRunService(t *testing.T) { sch.On("GetJobRuns", ctx, tnnt, criteria, jobCron).Return(runs, nil) defer sch.AssertExpectations(t) - runService := service.NewJobRunService(logger, jobRepo, jobRunRepo, nil, nil, sch, nil, nil, nil, projectGetter, feats) + runService := service.NewJobRunService(logger, jobRepo, jobRunRepo, nil, nil, nil, sch, nil, nil, nil, projectGetter, feats) returnedRuns, _, err := runService.GetJobRuns(ctx, projName, jobName, jobQuery) assert.Nil(t, err) assert.Equal(t, 2, len(returnedRuns)) @@ -1346,7 +1422,7 @@ func TestJobRunService(t *testing.T) { jobRepo.On("GetJobDetails", ctx, projName, jobName).Return(&jobWithDetails, nil) defer jobRepo.AssertExpectations(t) - runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) + runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, nil, feats) returnedRuns, _, err := runService.GetJobRuns(ctx, projName, jobName, jobQuery) assert.Error(t, err) assert.ErrorContains(t, err, "unable to parse job cron interval: expected exactly 5 fields, found 2: [invalid interval]") @@ -1380,7 +1456,7 @@ func TestJobRunService(t *testing.T) { jobRepo.On("GetJobDetails", ctx, projName, jobName).Return(&jobWithDetails, nil) defer jobRepo.AssertExpectations(t) - runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) + runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, nil, feats) returnedRuns, _, err := runService.GetJobRuns(ctx, projName, jobName, jobQuery) assert.Error(t, err) assert.ErrorContains(t, err, "cannot get job runs, job interval is empty") @@ -1416,7 +1492,7 @@ func TestJobRunService(t *testing.T) { jobRepo := new(JobRepository) jobRepo.On("GetJobDetails", ctx, projName, jobName).Return(&jobWithDetails, nil) defer jobRepo.AssertExpectations(t) - runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, nil, nil, nil, projectGetter, feats) + runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, projectGetter, feats) returnedRuns, _, err := runService.GetJobRuns(ctx, projName, jobName, jobQuery) assert.Error(t, err) assert.ErrorContains(t, err, "job schedule startDate not found in job") @@ -1469,7 +1545,7 @@ func TestJobRunService(t *testing.T) { jobRepo.On("GetJobDetails", ctx, projName, jobName).Return(&jobWithDetails, nil) defer jobRepo.AssertExpectations(t) - runService := service.NewJobRunService(logger, jobRepo, jobRunRepo, nil, nil, sch, nil, nil, nil, nil, feats) + runService := service.NewJobRunService(logger, jobRepo, jobRunRepo, nil, nil, nil, sch, nil, nil, nil, nil, feats) returnedRuns, _, err := runService.GetJobRuns(ctx, projName, jobName, criteria) assert.Nil(t, err) assert.Equal(t, runs, returnedRuns) @@ -1488,7 +1564,7 @@ func TestJobRunService(t *testing.T) { defer jobRunRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, feats) + nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) returnedRuns, count := runService.FilterRunsV3(ctx, tnnt, criteria) assert.Equal(t, -1, count) assert.Nil(t, returnedRuns) @@ -1512,7 +1588,7 @@ func TestJobRunService(t *testing.T) { defer jobRunRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, feats) + nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) returnedRuns, count := runService.FilterRunsV3(ctx, tnnt, criteria) assert.Equal(t, 1, count) assert.Equal(t, 1, len(returnedRuns)) @@ -1536,7 +1612,7 @@ func TestJobRunService(t *testing.T) { defer jobRunRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, feats) + nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) returnedRuns, count := runService.FilterRunsV3(ctx, tnnt, criteria) assert.Equal(t, 0, count) assert.Equal(t, 1, len(returnedRuns)) @@ -1567,7 +1643,7 @@ func TestJobRunService(t *testing.T) { defer jobRunRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, feats) + nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) returnedRuns, count := runService.FilterRunsV3(ctx, tnnt, criteria) assert.Equal(t, 1, count) assert.Equal(t, 3, len(returnedRuns)) @@ -1614,7 +1690,7 @@ func TestJobRunService(t *testing.T) { defer jobRunRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, feats) + nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) returnedRuns, count := runService.FilterRunsV3(ctx, tnnt, criteria) assert.Equal(t, 1, count) assert.Equal(t, 5, len(returnedRuns)) @@ -1656,7 +1732,7 @@ func TestJobRunService(t *testing.T) { defer jobRunRepo.AssertExpectations(t) runService := service.NewJobRunService(logger, - nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, feats) + nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) returnedRuns, count := runService.FilterRunsV3(ctx, tnnt, criteria) assert.Equal(t, 24, count) assert.Equal(t, 24, len(returnedRuns)) @@ -1673,7 +1749,7 @@ func TestJobRunService(t *testing.T) { projectGetter.On("Get", ctx, projName).Return(nil, errors.NewError(errors.ErrInternalError, tenant.EntityProject, "unexpected error")) - service := service.NewJobRunService(logger, nil, nil, nil, nil, nil, nil, nil, nil, projectGetter, feats) + service := service.NewJobRunService(logger, nil, nil, nil, nil, nil, nil, nil, nil, nil, projectGetter, feats) actualInterval, actualError := service.GetInterval(ctx, projName, jobName, referenceTime) @@ -1692,7 +1768,7 @@ func TestJobRunService(t *testing.T) { jobRepo.On("GetJobDetails", ctx, projName, jobName).Return(nil, errors.NewError(errors.ErrInternalError, job.EntityJob, "unexpected error")) - service := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, nil, nil, nil, projectGetter, feats) + service := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, projectGetter, feats) actualInterval, actualError := service.GetInterval(ctx, projName, jobName, referenceTime) @@ -1727,7 +1803,7 @@ func TestJobRunService(t *testing.T) { } jobRepo.On("GetJobDetails", ctx, projName, jobName).Return(job, nil) - service := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, nil, nil, nil, projectGetter, feats) + service := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, projectGetter, feats) actualInterval, actualError := service.GetInterval(ctx, projName, jobName, referenceTime) @@ -1760,7 +1836,7 @@ func TestJobRunService(t *testing.T) { jobRepo.On("GetJobDetails", ctx, projName, jobName).Return(job, nil) - service := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, nil, nil, nil, projectGetter, feats) + service := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, projectGetter, feats) actualInterval, actualError := service.GetInterval(ctx, projName, jobName, referenceTime) @@ -1787,7 +1863,7 @@ func TestJobRunService(t *testing.T) { }, nil) defer jobRunRepo.AssertExpectations(t) - runService := service.NewJobRunService(logger, nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, feats) + runService := service.NewJobRunService(logger, nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) filters := []filter.FilterOpt{ filter.WithTime(filter.StartDate, startDate), @@ -1816,7 +1892,7 @@ func TestJobRunService(t *testing.T) { jobRunRepo.On("GetLatestRun", ctx, projName, jobName, &runState).Return(jobRun, nil) defer jobRunRepo.AssertExpectations(t) - runService := service.NewJobRunService(logger, nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, feats) + runService := service.NewJobRunService(logger, nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) filters := []filter.FilterOpt{ filter.WithString(filter.RunState, runState.String()), @@ -1839,7 +1915,7 @@ func TestJobRunService(t *testing.T) { jobRunRepo.On("GetRunsByTimeRange", ctx, projName, jobName, &runState, startDate, endDate).Return(runsByTimeRange, fmt.Errorf("some error")) defer jobRunRepo.AssertExpectations(t) - runService := service.NewJobRunService(logger, nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, feats) + runService := service.NewJobRunService(logger, nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) filters := []filter.FilterOpt{ filter.WithTime(filter.StartDate, startDate), @@ -1861,7 +1937,7 @@ func TestJobRunService(t *testing.T) { jobRunRepo.On("GetLatestRun", ctx, projName, jobName, &runState).Return(jobRun, fmt.Errorf("some error")) defer jobRunRepo.AssertExpectations(t) - runService := service.NewJobRunService(logger, nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, feats) + runService := service.NewJobRunService(logger, nil, jobRunRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats) filters := []filter.FilterOpt{ filter.WithString(filter.RunState, runState.String()), @@ -1904,6 +1980,25 @@ func (m *mockJobInputCompiler) Compile(ctx context.Context, job *scheduler.JobWi return args.Get(0).(*scheduler.ExecutorInput), args.Error(1) } +type mockSLARepository struct { + mock.Mock +} + +func (m *mockSLARepository) RegisterSLA(ctx context.Context, projectName tenant.ProjectName, jobName, operatorName, operatorType, runID string, slaTime time.Time, description string, scheduledAt, operatorStartTime time.Time) error { + args := m.Called(ctx, projectName, jobName, operatorName, operatorType, runID, slaTime, description, scheduledAt, operatorStartTime) + return args.Error(0) +} + +func (m *mockSLARepository) UpdateSLA(ctx context.Context, projectName tenant.ProjectName, jobName, operatorName, operatorType, runID string, slaTime, operatorStartTime time.Time) error { + args := m.Called(ctx, projectName, jobName, operatorName, operatorType, runID, slaTime, operatorStartTime) + return args.Error(0) +} + +func (m *mockSLARepository) FinishSLA(ctx context.Context, projectName tenant.ProjectName, jobName, operatorName, operatorType, runID string, operatorEndTime time.Time) error { + args := m.Called(ctx, projectName, jobName, operatorName, operatorType, runID, operatorEndTime) + return args.Error(0) +} + type mockJobRunRepository struct { mock.Mock } diff --git a/ext/scheduler/airflow/__lib.py b/ext/scheduler/airflow/__lib.py index 165601e745..363bf794ac 100644 --- a/ext/scheduler/airflow/__lib.py +++ b/ext/scheduler/airflow/__lib.py @@ -3,6 +3,7 @@ from datetime import datetime, timezone, timedelta from typing import Any, Dict, Optional +from dataclasses import dataclass, asdict import pendulum import requests @@ -40,6 +41,17 @@ STARTUP_TIMEOUT_IN_SECS = int(Variable.get("startup_timeout_in_secs", default_var=2 * 60)) OPTIMUS_REQUEST_TIMEOUT_IN_SECS = int(Variable.get("optimus_request_timeout_in_secs", default_var=5 * 60)) +# --- EVENT TYPES ----------------------------------- +OPERATOR_START_EVENT = "operator_start" +OPERATOR_RETRY_EVENT = "operator_retry" +OPERATOR_SUCCESS_EVENT = "operator_success" +OPERATOR_FAIL_EVENT = "operator_fail" + +JOB_SUCCESS_EVENT = "job_success" +JOB_FAIL_EVENT = "job_fail" +JOB_SLA_MISS_EVENT = "job_sla_miss" +# --------------------------------------------------- + def lookup_non_standard_cron_expression(expr: str) -> str: expr_mapping = { '@yearly': '0 0 1 1 *', @@ -477,6 +489,7 @@ def get_run_type(context): def job_success_event(context): try: meta = { + "event_context": EventContext.from_ctx(context, JOB_SUCCESS_EVENT).to_dict(), "event_type": "TYPE_JOB_SUCCESS", "status": "success" } @@ -491,6 +504,7 @@ def job_success_event(context): def job_failure_event(context): try: meta = { + "event_context": EventContext.from_ctx(context, JOB_FAIL_EVENT).to_dict(), "event_type": "TYPE_FAILURE", "status": "failed" } @@ -502,6 +516,77 @@ def job_failure_event(context): except Exception as e: print(e) +@dataclass +class EventContext: + @dataclass + class TaskInstance: + max_tries: int + task_id: str + task_instance_key_str: str + attempt: int + log_url: str + start_date: str = None + end_date: str = None + + @dataclass + class DagRun: + dag_id: str + scheduled_at: str + execution_date: str + run_id: str + start_date: str = None + end_date: str = None + + @dataclass + class Task: + downstream_task_ids: list + + task_instance: TaskInstance + dag_run: DagRun + task: Task + operator_type: str + event_type: str + event_reason: str + + @staticmethod + def format_dt(dt): + return dt.strftime(TIMESTAMP_FORMAT) if dt else None + + @classmethod + def from_ctx(cls, ctx, event_type: str): + ti = ctx.get("task_instance") + dag_run = ctx.get("dag_run") + current_execution_date = ctx.get("execution_date") + current_schedule_date = get_scheduled_at(ctx) + + return cls( + task_instance=cls.TaskInstance( + max_tries=ti.max_tries, + task_id=ti.task_id, + task_instance_key_str = ctx.get("task_instance_key_str"), + attempt=ti.try_number, + log_url=ti.log_url, + start_date=cls.format_dt(ti.start_date), + end_date=cls.format_dt(ti.end_date), + ), + dag_run=cls.DagRun( + dag_id=ti.dag_id, + scheduled_at=current_schedule_date.strftime(TIMESTAMP_FORMAT), + execution_date=current_execution_date.strftime(TIMESTAMP_FORMAT), + run_id=dag_run.run_id, + start_date=cls.format_dt(dag_run.start_date), + end_date=cls.format_dt(dag_run.end_date), + ), + task=cls.Task( + downstream_task_ids=list(ti.task.downstream_task_ids), + ), + operator_type=get_run_type(ctx), + event_type=event_type, + event_reason = ctx.get("reason") if ctx.get("reason") != None else "", + ) + + def to_dict(self): + return asdict(self) # task level events def operator_start_event(context): @@ -511,6 +596,7 @@ def operator_start_event(context): if not shouldSendSensorStartEvent(context): return meta = { + "event_context": EventContext.from_ctx(context , OPERATOR_START_EVENT).to_dict(), "event_type": "TYPE_{}_START".format(run_type), "status": "running" } @@ -522,6 +608,7 @@ def operator_success_event(context): try: run_type = get_run_type(context) meta = { + "event_context": EventContext.from_ctx(context, OPERATOR_SUCCESS_EVENT).to_dict(), "event_type": "TYPE_{}_SUCCESS".format(run_type), "status": "success" } @@ -534,6 +621,7 @@ def operator_retry_event(context): try: run_type = get_run_type(context) meta = { + "event_context": EventContext.from_ctx(context, OPERATOR_RETRY_EVENT).to_dict(), "event_type": "TYPE_{}_RETRY".format(run_type), "status": "retried" } @@ -546,6 +634,7 @@ def operator_failure_event(context): try: run_type = get_run_type(context) meta = { + "event_context": EventContext.from_ctx(context, OPERATOR_FAIL_EVENT).to_dict(), "event_type": "TYPE_{}_FAIL".format(run_type), "status": "failed" } diff --git a/internal/store/postgres/migrations/000072_create_operator_sla_table.down.sql b/internal/store/postgres/migrations/000072_create_operator_sla_table.down.sql new file mode 100644 index 0000000000..a9efb74c9a --- /dev/null +++ b/internal/store/postgres/migrations/000072_create_operator_sla_table.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS sla; +DROP TYPE IF EXISTS operator_type; \ No newline at end of file diff --git a/internal/store/postgres/migrations/000072_create_operator_sla_table.up.sql b/internal/store/postgres/migrations/000072_create_operator_sla_table.up.sql new file mode 100644 index 0000000000..dd1deae5bb --- /dev/null +++ b/internal/store/postgres/migrations/000072_create_operator_sla_table.up.sql @@ -0,0 +1,25 @@ +DO $$ BEGIN + CREATE TYPE operator_type AS ENUM ('sensor', 'task', 'hook', 'job'); +EXCEPTION + WHEN duplicate_object THEN null; +END $$; + +CREATE TABLE IF NOT EXISTS operator_sla ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + project_name TEXT NOT NULL, + job_name TEXT NOT NULL, + operator_name TEXT NOT NULL, + run_id TEXT NOT NULL, + alert_tag TEXT NOT NULL, + operator_type operator_type NOT NULL, + + sla_time TIMESTAMPTZ NOT NULL, + scheduled_at TIMESTAMPTZ NOT NULL, + operator_start_time TIMESTAMPTZ NOT NULL, + + worker_signature UUID, + worker_lock_until TIMESTAMPTZ, + + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); \ No newline at end of file diff --git a/internal/store/postgres/scheduler/job_operator_repository.go b/internal/store/postgres/scheduler/job_operator_repository.go index 0fa6c2dc4c..f39397e339 100644 --- a/internal/store/postgres/scheduler/job_operator_repository.go +++ b/internal/store/postgres/scheduler/job_operator_repository.go @@ -97,7 +97,10 @@ func (o *OperatorRunRepository) CreateOperatorRun(ctx context.Context, name stri } insertOperatorRun := "INSERT INTO " + operatorTableName + " ( " + jobOperatorColumnsToStore + ", created_at, updated_at) values ( $1, $2, $3, $4, null, NOW(), NOW())" _, err = o.db.Exec(ctx, insertOperatorRun, name, jobRunID, scheduler.StateRunning, startTime) - return errors.WrapIfErr(scheduler.EntityJobRun, "error while inserting the run", err) + if err != nil { + return errors.WrapIfErr(scheduler.EntityJobRun, "error while inserting the run", err) + } + return nil } func (o *OperatorRunRepository) UpdateOperatorRun(ctx context.Context, operatorType scheduler.OperatorType, operatorRunID uuid.UUID, eventTime time.Time, state scheduler.State) error { diff --git a/internal/store/postgres/scheduler/sla_repository.go b/internal/store/postgres/scheduler/sla_repository.go new file mode 100644 index 0000000000..51dfa076b8 --- /dev/null +++ b/internal/store/postgres/scheduler/sla_repository.go @@ -0,0 +1,84 @@ +package scheduler + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/goto/optimus/core/scheduler" + "github.com/goto/optimus/core/tenant" + "github.com/goto/optimus/internal/errors" +) + +type OperatorsSLA struct { + ID uuid.UUID + ProjectName string + NamespaceName string + JobName string + OperatorName string + RunID string + OperatorType string + SLATime time.Time + Description string + + WorkerSignature string + WorkerLockUntil time.Time + + CreatedAt time.Time + UpdatedAt time.Time +} + +type SLARepository struct { + db *pgxpool.Pool +} + +func NewSLARepository(pool *pgxpool.Pool) *SLARepository { + return &SLARepository{ + db: pool, + } +} + +func (s *SLARepository) RegisterSLA(ctx context.Context, projectName tenant.ProjectName, jobName, operatorName, operatorType, runID string, slaTime time.Time, alertTag string, scheduledAt, operatorStartTime time.Time) error { + slaQuery := "INSERT INTO operator_sla ( project_name, job_name, operator_name, operator_type, run_id, sla_time, alert_tag, scheduled_at, operator_start_time) values ( $1, $2, $3, $4, $5, $6, $7, $8, $9)" + + tag, err := s.db.Exec(ctx, slaQuery, projectName, jobName, operatorName, operatorType, runID, slaTime, alertTag, scheduledAt, operatorStartTime) + if err != nil { + errMsg := fmt.Sprintf("error executing sla insert, params: %s, %s, %s, %s, %s, %s", jobName, operatorName, operatorType, runID, slaTime, alertTag) + return errors.Wrap(scheduler.EntityEvent, errMsg, err) + } + if tag.RowsAffected() == 0 { + errMsg := fmt.Sprintf("error executing sla insert, params: %s, %s, %s, %s, %s, %s, err: now new rows created", jobName, operatorName, operatorType, runID, slaTime, alertTag) + return errors.NewError(errors.ErrInternalError, scheduler.EntityEvent, errMsg) + } + return nil +} + +func (s *SLARepository) UpdateSLA(ctx context.Context, projectName tenant.ProjectName, jobName, operatorName, operatorType, runID string, slaTime, operatorStartTime time.Time) error { + slaQuery := "update operator_sla set sla_time = $1, operator_start_time = $2 where project_name = $3 and job_name = $4 and operator_name = $5 and operator_type = $6 and run_id = $7 " + + tag, err := s.db.Exec(ctx, slaQuery, slaTime, operatorStartTime, projectName, jobName, operatorName, operatorType, runID) + if err != nil { + errMsg := fmt.Sprintf("error executing sla update, params: %s:%s, %s, %s, %s", jobName, operatorName, operatorType, runID, slaTime) + return errors.Wrap(scheduler.EntityEvent, errMsg, err) + } + if tag.RowsAffected() == 0 { + errMsg := fmt.Sprintf("error executing sla update, params: %s:%s, %s, %s, %s, err: now new rows created", jobName, operatorName, operatorType, runID, slaTime) + return errors.NewError(errors.ErrInternalError, scheduler.EntityEvent, errMsg) + } + return nil +} + +func (s *SLARepository) FinishSLA(ctx context.Context, projectName tenant.ProjectName, jobName, operatorName, operatorType, runID string, operatorEndTime time.Time) error { + slaQuery := "delete from operator_sla where project_name = $1 and job_name = $2 and operator_name = $3 and operator_type = $4 and run_id = $5 and SLA_time > $6 " + + _, err := s.db.Exec(ctx, slaQuery, projectName, jobName, operatorName, operatorType, runID, operatorEndTime) + if err != nil { + errMsg := fmt.Sprintf("error finishing SLA, params: %s:%s operatorType:%s, runID:%s, operatorEndTime:%s", jobName, operatorName, operatorType, runID, operatorEndTime) + return errors.Wrap(scheduler.EntityEvent, errMsg, err) + } + + return nil +} diff --git a/server/optimus.go b/server/optimus.go index 9a99508a89..f4d34e330f 100644 --- a/server/optimus.go +++ b/server/optimus.go @@ -286,6 +286,7 @@ func (s *OptimusServer) setupHandlers() error { // Scheduler bounded context jobRunRepo := schedulerRepo.NewJobRunRepository(s.dbPool) operatorRunRepository := schedulerRepo.NewOperatorRunRepository(s.dbPool) + slaRepository := schedulerRepo.NewSLARepository(s.dbPool) jobProviderRepo := schedulerRepo.NewJobProviderRepository(s.dbPool) notificationContext, cancelNotifiers := context.WithCancel(context.Background()) @@ -357,7 +358,7 @@ func (s *OptimusServer) setupHandlers() error { ) newJobRunService := schedulerService.NewJobRunService( - s.logger, jobProviderRepo, jobRunRepo, replayRepository, operatorRunRepository, + s.logger, jobProviderRepo, jobRunRepo, replayRepository, operatorRunRepository, slaRepository, newScheduler, newPriorityResolver, jobInputCompiler, s.eventHandler, tProjectService, s.conf.Features, )