Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions core/scheduler/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ const (
SensorFailEvent JobEventType = "sensor_fail"
SensorSuccessEvent JobEventType = "sensor_success"

OperatorStartEvent JobEventType = "operator_start"
OperatorRetryEvent JobEventType = "operator_retry"
OperatorFailEvent JobEventType = "operator_fail"
OperatorSuccessEvent JobEventType = "operator_success"

StatusFiring EventStatus = "firing"
StatusResolved EventStatus = "resolved"
)
Expand Down Expand Up @@ -101,6 +106,39 @@ func (s *SLAObject) String() string {
return fmt.Sprintf("(job: %s,scheduledAt: %s)", s.JobName, s.JobScheduledAt.Format(time.RFC3339))
}

type OperatorRunInstance struct {
MaxTries int `json:"max_tries"`
OperatorName string `json:"task_id"`
StartTime time.Time `json:"start_date"`
OperatorKey string `json:"task_instance_key_str"`
TryNumber int `json:"attempt"`
EndTime *time.Time `json:"end_date,omitempty"`
LogURL string `json:"log_url"`
}

type DagRun struct {
RunID string `json:"run_id"`
JobName string `json:"dag_id"`
ScheduledAt time.Time `json:"scheduled_at"`
ExecutionDate time.Time `json:"execution_date"`
StartTime time.Time `json:"start_date"`
EndTime *time.Time `json:"end_date,omitempty"`
}

type OperatorObj struct {
DownstreamTaskIDs []string `json:"downstream_task_ids"`
}

type EventContext struct {
Tenant tenant.Tenant
Type JobEventType `json:"event_type"`
OperatorType OperatorType `json:"operator_type"`
OperatorRunInstance OperatorRunInstance `json:"task_instance"`
DagRun DagRun `json:"dag_run"`
Task OperatorObj `json:"task"`
EventReason *string `json:"event_reason,omitempty"`
}

type Event struct {
JobName JobName
URN string
Expand All @@ -112,6 +150,8 @@ type Event struct {
JobScheduledAt time.Time
Values map[string]any
SLAObjectList []*SLAObject

EventContext *EventContext
}

func (e Event) String() string {
Expand Down Expand Up @@ -152,6 +192,23 @@ func (event JobEventType) String() string {
return string(event)
}

func EventContextFrom(rawEventContext any) (*EventContext, error) {
sc := EventContext{}
bytesArr, err := json.Marshal(rawEventContext)
if err != nil {
return nil, errors.InvalidArgument(EntityEvent, "unable to marshal event")
}
err = json.Unmarshal(bytesArr, &sc)
if err != nil {
return nil, errors.InvalidArgument(EntityEvent, "unable to unmarshal event, err:"+err.Error())
}
sc.OperatorType, err = NewOperatorType(strings.ToLower(sc.OperatorType.String()))
if err != nil {
return nil, err
}
return &sc, nil
}

func EventFrom(eventTypeName string, eventValues map[string]any, jobName JobName, tenent tenant.Tenant) (*Event, error) {
eventType, err := FromStringToEventType(eventTypeName)
if err != nil {
Expand Down Expand Up @@ -224,6 +281,13 @@ func EventFrom(eventTypeName string, eventValues map[string]any, jobName JobName
return nil, errors.InvalidArgument(EntityEvent, "property 'scheduled_at' is not in appropriate format")
}
eventObj.JobScheduledAt = scheduledAtTimeStamp

eventObj.EventContext, err = EventContextFrom(eventValues["event_context"])
if err != nil {
return nil, err
}
eventObj.EventContext.Tenant = eventObj.Tenant
}

return &eventObj, nil
}
51 changes: 51 additions & 0 deletions core/scheduler/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,37 @@ func TestFromStringToEventType(t *testing.T) {
"task_id": "some_txbq",
"status": "running",
"scheduled_at": "2022-01-02T15:04:05Z",
"event_context": map[string]any{
"task_instance": map[string]any{
"max_tries": 1,
"task_id": "mc2mc",
"task_instance_key_str": "campaign__mc2mc__20250926",
"attempt": 2,
"log_url": "http://localhost:8080/dags/campaign/grid?dag_run_id=scheduled__2025-09-26T03%3A40%3A00%2B00%3A00&task_id=mc2mc&map_index=-1&tab=logs",
"start_date": "2025-09-26T03:46:39Z",
},
"dag_run": map[string]any{
"dag_id": "campaign",
"scheduled_at": "2025-09-26T03:50:00Z",
"execution_date": "2025-09-26T03:40:00Z",
"run_id": "scheduled__2025-09-26T03:40:00+00:00",
"start_date": "2025-09-26T03:45:00Z",
},
"task": map[string]any{"downstream_task_ids": []string{"hook_predator"}},
"operator_type": "TASK",
"event_type": "operator_retry",
},
"event_type": "TYPE_TASK_RETRY",
}
jobName := scheduler.JobName("some_job")
tnnt, err := tenant.NewTenant("someProject", "someNamespace")
eventTypeName := "TYPE_TASK_RETRY"
assert.Nil(t, err)

startTime, _ := time.Parse(time.RFC3339, "2025-09-26T03:46:39Z")
jobScheduledAt, _ := time.Parse(time.RFC3339, "2025-09-26T03:50:00Z")
executionDate, _ := time.Parse(time.RFC3339, "2025-09-26T03:40:00Z")
jobstartDate, _ := time.Parse(time.RFC3339, "2025-09-26T03:45:00Z")
outputObj := scheduler.Event{
JobName: jobName,
Tenant: tnnt,
Expand All @@ -205,6 +230,32 @@ func TestFromStringToEventType(t *testing.T) {
Status: scheduler.StateRunning,
JobScheduledAt: time.Date(2022, time.January, 2, 15, 0o4, 0o5, 0, time.UTC),
Values: eventValues,
EventContext: &scheduler.EventContext{
Type: scheduler.OperatorRetryEvent,
OperatorType: scheduler.OperatorTask,
OperatorRunInstance: scheduler.OperatorRunInstance{
MaxTries: 1,
OperatorName: "mc2mc",
StartTime: startTime,
OperatorKey: "campaign__mc2mc__20250926",
TryNumber: 2,
EndTime: nil,
LogURL: "http://localhost:8080/dags/campaign/grid?dag_run_id=scheduled__2025-09-26T03%3A40%3A00%2B00%3A00&task_id=mc2mc&map_index=-1&tab=logs",
},
DagRun: scheduler.DagRun{
RunID: "scheduled__2025-09-26T03:40:00+00:00",
JobName: "campaign",
ScheduledAt: jobScheduledAt,
ExecutionDate: executionDate,
StartTime: jobstartDate,
EndTime: nil,
},
Task: scheduler.OperatorObj{
DownstreamTaskIDs: []string{"hook_predator"},
},
EventReason: nil,
Tenant: tnnt,
},
}
output, err := scheduler.EventFrom(eventTypeName, eventValues, jobName, tnnt)
assert.Nil(t, err)
Expand Down
8 changes: 8 additions & 0 deletions core/scheduler/handler/v1beta1/job_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,10 @@ func TestJobRunHandler(t *testing.T) {
"task_id": "wait_sample_select",
"status": "success",
"scheduled_at": "2022-01-02T15:04:05Z",
"event_context": map[string]any{
"operator_type": "SENSOR",
"event_type": "operator_success",
},
},
)
req := &pb.RegisterJobEventRequest{
Expand Down Expand Up @@ -630,6 +634,10 @@ func TestJobRunHandler(t *testing.T) {
"status": "success",
"scheduled_at": "2022-01-02T15:04:05Z",
"task_id": "wait_sample_select",
"event_context": map[string]any{
"operator_type": "SENSOR",
"event_type": "operator_success",
},
},
)
req := &pb.RegisterJobEventRequest{
Expand Down
40 changes: 40 additions & 0 deletions core/scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@ func (o OperatorType) String() string {
return string(o)
}

func NewOperatorType(op string) (OperatorType, error) {
switch strings.ToLower(op) {
case OperatorTask.String():
return OperatorTask, nil
case OperatorSensor.String():
return OperatorSensor, nil
case OperatorHook.String():
return OperatorHook, nil
default:
return OperatorType(op), errors.InvalidArgument(EntityEvent, "invalid operator type, supported : task, sensor, hook")
}
}

const (
EntityJobRun = "jobRun"

Expand Down Expand Up @@ -76,6 +89,33 @@ func (j *Job) IsDryRun() bool {
return false
}

func (j *Job) GetTaskAlertConfig() *OperatorAlertConfig {
if j.Task == nil {
return nil
}
return j.Task.AlertConfig
}

func (j *Job) GetHookAlertConfigByName(hookName string) *OperatorAlertConfig {
for _, hook := range j.Hooks {
if hook.Name == hookName {
return hook.AlertConfig
}
}
return nil
}

func (j *Job) GetOperatorAlertConfigByName(operatorType OperatorType, operatorName string) *OperatorAlertConfig {
switch operatorType {
case OperatorTask:
return j.GetTaskAlertConfig()
case OperatorHook:
return j.GetHookAlertConfigByName(operatorName)
default:
return nil
}
}

func (j *Job) GetHook(hookName string) (*Hook, error) {
for _, hook := range j.Hooks {
if hook.Name == hookName {
Expand Down
26 changes: 13 additions & 13 deletions core/scheduler/service/deployment_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestDeploymentService(t *testing.T) {
defer jobRepo.AssertExpectations(t)

runService := service.NewJobRunService(logger,
jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, feats)
jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, nil, feats)

err := runService.UploadToScheduler(ctx, proj1Name)
assert.NotNil(t, err)
Expand All @@ -100,7 +100,7 @@ func TestDeploymentService(t *testing.T) {
defer priorityResolver.AssertExpectations(t)

runService := service.NewJobRunService(logger,
jobRepo, nil, nil, nil, nil, priorityResolver, nil, nil, nil, feats)
jobRepo, nil, nil, nil, nil, nil, priorityResolver, nil, nil, nil, feats)

err := runService.UploadToScheduler(ctx, proj1Name)
assert.NotNil(t, err)
Expand All @@ -120,7 +120,7 @@ func TestDeploymentService(t *testing.T) {
Return(fmt.Errorf("DeployJobs tnnt1 error"))
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, feats)

err := runService.UploadToScheduler(ctx, proj1Name)
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestDeploymentService(t *testing.T) {
mScheduler.On("DeleteJobs", mock.Anything, tnnt2, []string{"job4-to-delete"}).Return(nil)
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, feats)

err := runService.UploadToScheduler(ctx, proj1Name)
Expand All @@ -173,7 +173,7 @@ func TestDeploymentService(t *testing.T) {
mScheduler.On("DeleteJobs", mock.Anything, tnnt2, []string{"job4-to-delete"}).Return(nil)
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, feats)

err := runService.UploadToScheduler(ctx, proj1Name)
Expand All @@ -197,7 +197,7 @@ func TestDeploymentService(t *testing.T) {
mScheduler := new(mockScheduler)
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, feats)

err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
Expand All @@ -219,7 +219,7 @@ func TestDeploymentService(t *testing.T) {
mScheduler := new(mockScheduler)
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, feats)

err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
Expand All @@ -242,7 +242,7 @@ func TestDeploymentService(t *testing.T) {
mScheduler.On("DeployJobs", mock.Anything, tnnt1, jobsToUpload).Return(errors.New("internal error"))
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, feats)

err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
Expand All @@ -256,7 +256,7 @@ func TestDeploymentService(t *testing.T) {
mScheduler.On("DeleteJobs", mock.Anything, tnnt1, jobNamesToDelete).Return(errors.New("internal error"))
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, nil, nil, nil, nil,
runService := service.NewJobRunService(logger, nil, nil, nil, nil, nil,
mScheduler, nil, nil, nil, nil, feats)

err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
Expand All @@ -280,7 +280,7 @@ func TestDeploymentService(t *testing.T) {
mScheduler.On("DeleteJobs", mock.Anything, tnnt1, jobNamesToDelete).Return(nil)
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, feats)

err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
Expand All @@ -303,7 +303,7 @@ func TestDeploymentService(t *testing.T) {
mScheduler.On("DeployJobs", mock.Anything, tnnt1, jobsToUpload).Return(nil)
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, feats)

err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
Expand All @@ -317,7 +317,7 @@ func TestDeploymentService(t *testing.T) {
mScheduler.On("DeleteJobs", mock.Anything, tnnt1, jobNamesToDelete).Return(nil)
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, nil, nil, nil, nil,
runService := service.NewJobRunService(logger, nil, nil, nil, nil, nil,
mScheduler, nil, nil, nil, nil, feats)

err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
Expand All @@ -341,7 +341,7 @@ func TestDeploymentService(t *testing.T) {
mScheduler.On("DeployJobs", mock.Anything, tnnt1, jobsToUpload).Return(nil)
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, feats)

err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
Expand Down
Loading
Loading