From 99fee033a69804be00dc534675fc1e08861ec5de Mon Sep 17 00:00:00 2001 From: Arik Cohen Date: Sat, 22 Feb 2025 12:04:26 -0500 Subject: [PATCH] Feature: Delete scheduled job --- datastore/datastore.go | 1 + datastore/inmemory/inmemory.go | 10 ++++ datastore/inmemory/inmemory_test.go | 27 +++++++++ datastore/postgres/postgres.go | 71 +++++++++++++++++------ datastore/postgres/postgres_test.go | 23 ++++++++ db/postgres/schema.go | 30 +++++----- engine/datastore.go | 7 +++ internal/coordinator/api/api.go | 20 +++++++ internal/coordinator/handlers/schedule.go | 5 +- 9 files changed, 159 insertions(+), 35 deletions(-) diff --git a/datastore/datastore.go b/datastore/datastore.go index e503515d..e3f50b21 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -49,6 +49,7 @@ type Datastore interface { GetScheduledJobs(ctx context.Context, currentUser string, page, size int) (*Page[*tork.ScheduledJobSummary], error) GetScheduledJobByID(ctx context.Context, id string) (*tork.ScheduledJob, error) UpdateScheduledJob(ctx context.Context, id string, modify func(u *tork.ScheduledJob) error) error + DeleteScheduledJob(ctx context.Context, id string) error CreateUser(ctx context.Context, u *tork.User) error GetUser(ctx context.Context, username string) (*tork.User, error) diff --git a/datastore/inmemory/inmemory.go b/datastore/inmemory/inmemory.go index 8d0ba622..e6b06cd6 100644 --- a/datastore/inmemory/inmemory.go +++ b/datastore/inmemory/inmemory.go @@ -720,6 +720,16 @@ func (ds *InMemoryDatastore) UpdateScheduledJob(ctx context.Context, id string, return nil } +func (ds *InMemoryDatastore) DeleteScheduledJob(ctx context.Context, id string) error { + ds.jobs.Iterate(func(_ string, j *tork.Job) { + if j.Schedule != nil && j.Schedule.ID == id { + ds.jobs.Delete(j.ID) + } + }) + ds.scheduledJobs.Delete(id) + return nil +} + func (ds *InMemoryDatastore) WithTx(ctx context.Context, f func(tx datastore.Datastore) error) error { return f(ds) } diff --git a/datastore/inmemory/inmemory_test.go b/datastore/inmemory/inmemory_test.go index 069be28d..a7b6eebd 100644 --- a/datastore/inmemory/inmemory_test.go +++ b/datastore/inmemory/inmemory_test.go @@ -870,3 +870,30 @@ func TestInMemoryUpdateScheduledJob(t *testing.T) { assert.Error(t, err) assert.Equal(t, datastore.ErrJobNotFound, err) } + +func TestInMemoryDeleteScheduledJob(t *testing.T) { + ctx := context.Background() + ds := inmemory.NewInMemoryDatastore() + + // Create a scheduled job + scheduledJob := &tork.ScheduledJob{ + ID: uuid.NewUUID(), + Name: "Test Scheduled Job", + } + err := ds.CreateScheduledJob(ctx, scheduledJob) + assert.NoError(t, err) + + // Verify the scheduled job exists + retrievedJob, err := ds.GetScheduledJobByID(ctx, scheduledJob.ID) + assert.NoError(t, err) + assert.Equal(t, scheduledJob.ID, retrievedJob.ID) + + // Delete the scheduled job + err = ds.DeleteScheduledJob(ctx, scheduledJob.ID) + assert.NoError(t, err) + + // Verify the scheduled job no longer exists + _, err = ds.GetScheduledJobByID(ctx, scheduledJob.ID) + assert.Error(t, err) + assert.Equal(t, datastore.ErrJobNotFound, err) +} diff --git a/datastore/postgres/postgres.go b/datastore/postgres/postgres.go index b5faf2c5..caa9ce00 100644 --- a/datastore/postgres/postgres.go +++ b/datastore/postgres/postgres.go @@ -799,27 +799,11 @@ func (ds *PostgresDatastore) expungeExpiredJobs() (int, error) { if err := ptx.select_(&ids, "select id from jobs where (delete_at < current_timestamp) OR (created_at < $1 AND (state = 'COMPLETED' or state = 'FAILED' or state = 'CANCELLED')) limit 1000", time.Now().UTC().Add(-*ds.jobsRetentionDuration)); err != nil { return errors.Wrapf(err, "error getting list of expired job ids from the db") } - if len(ids) == 0 { - return nil - } - if _, err := ptx.exec(`delete from jobs_perms where job_id = ANY($1);`, pq.StringArray(ids)); err != nil { - return errors.Wrapf(err, "error deleting expired job perms from the db") - } - if _, err := ptx.exec(`delete from tasks_log_parts where task_id in (select id from tasks where job_id = ANY($1));`, pq.StringArray(ids)); err != nil { - return errors.Wrapf(err, "error deleting expired task log parts from the db") - } - if _, err := ptx.exec(`delete from tasks where job_id = ANY($1);`, pq.StringArray(ids)); err != nil { - return errors.Wrapf(err, "error deleting expired tasks from the db") - } - res, err := ptx.exec(`delete from jobs where id = ANY($1);`, pq.StringArray(ids)) + res, err := ds.deleteJobs(ptx, ids) if err != nil { - return errors.Wrapf(err, "error deleting expired jobs from the db") - } - rows, err := res.RowsAffected() - if err != nil { - return errors.Wrapf(err, "error getting the number of deleted jobs from the db") + return err } - n = int(rows) + n = res return nil }); err != nil { return 0, err @@ -827,6 +811,32 @@ func (ds *PostgresDatastore) expungeExpiredJobs() (int, error) { return n, nil } +func (ds *PostgresDatastore) deleteJobs(ptx *PostgresDatastore, ids []string) (int, error) { + var n int + if len(ids) == 0 { + return 0, nil + } + if _, err := ptx.exec(`delete from jobs_perms where job_id = ANY($1);`, pq.StringArray(ids)); err != nil { + return 0, errors.Wrapf(err, "error deleting expired job perms from the db") + } + if _, err := ptx.exec(`delete from tasks_log_parts where task_id in (select id from tasks where job_id = ANY($1));`, pq.StringArray(ids)); err != nil { + return 0, errors.Wrapf(err, "error deleting expired task log parts from the db") + } + if _, err := ptx.exec(`delete from tasks where job_id = ANY($1);`, pq.StringArray(ids)); err != nil { + return 0, errors.Wrapf(err, "error deleting expired tasks from the db") + } + res, err := ptx.exec(`delete from jobs where id = ANY($1);`, pq.StringArray(ids)) + if err != nil { + return 0, errors.Wrapf(err, "error deleting expired jobs from the db") + } + rows, err := res.RowsAffected() + if err != nil { + return 0, errors.Wrapf(err, "error getting the number of deleted jobs from the db") + } + n = int(rows) + return n, nil +} + func (ds *PostgresDatastore) GetTaskLogParts(ctx context.Context, taskID, q string, page, size int) (*datastore.Page[*tork.TaskLogPart], error) { searchTerm, _ := parseQuery(q) offset := (page - 1) * size @@ -1422,6 +1432,29 @@ func (ds *PostgresDatastore) UpdateScheduledJob(ctx context.Context, id string, }) } +func (ds *PostgresDatastore) DeleteScheduledJob(ctx context.Context, id string) error { + return ds.WithTx(ctx, func(tx datastore.Datastore) error { + ptx, ok := tx.(*PostgresDatastore) + if !ok { + return errors.New("unable to cast to a postgres datastore") + } + ids := []string{} + if err := ptx.select_(&ids, "select id from jobs where scheduled_job_id = $1 ", id); err != nil { + return errors.Wrapf(err, "error getting list of scheduled job instance ids from the db") + } + if _, err := ds.deleteJobs(ptx, ids); err != nil { + return errors.Wrapf(err, "error deleting scheduled job instances from the db") + } + if _, err := ptx.exec(`delete from scheduled_jobs_perms where scheduled_job_id = $1`, id); err != nil { + return errors.Wrapf(err, "error deleting scheduled job perms from the db") + } + if _, err := ptx.exec(`delete from scheduled_jobs where id = $1`, id); err != nil { + return errors.Wrapf(err, "error deleting scheduled job from the db") + } + return nil + }) +} + func (ds *PostgresDatastore) get(dest interface{}, query string, args ...interface{}) error { if ds.tx != nil { return ds.tx.Get(dest, query, args...) diff --git a/datastore/postgres/postgres_test.go b/datastore/postgres/postgres_test.go index 6c9286b1..6a701a17 100644 --- a/datastore/postgres/postgres_test.go +++ b/datastore/postgres/postgres_test.go @@ -1604,3 +1604,26 @@ func TestPostgresExpungeExpiredJobs(t *testing.T) { } } } + +func TestPostgresDeleteScheduledJob(t *testing.T) { + ctx := context.Background() + dsn := "host=localhost user=tork password=tork dbname=tork port=5432 sslmode=disable" + ds, err := NewPostgresDataStore(dsn) + assert.NoError(t, err) + + now := time.Now().UTC() + sj := tork.ScheduledJob{ + ID: uuid.NewUUID(), + Name: "Test Scheduled Job", + CreatedAt: now, + State: tork.ScheduledJobStateActive, + } + err = ds.CreateScheduledJob(ctx, &sj) + assert.NoError(t, err) + + err = ds.DeleteScheduledJob(ctx, sj.ID) + assert.NoError(t, err) + + _, err = ds.GetScheduledJobByID(ctx, sj.ID) + assert.Error(t, err) +} diff --git a/db/postgres/schema.go b/db/postgres/schema.go index 6e7ee307..bdcf5f92 100644 --- a/db/postgres/schema.go +++ b/db/postgres/schema.go @@ -49,21 +49,21 @@ CREATE TABLE users_roles ( CREATE UNIQUE INDEX idx_users_roles_uniq ON users_roles (user_id,role_id); CREATE TABLE scheduled_jobs ( - id varchar(32) not null primary key, - name varchar(64) not null, - description text not null, - tags text[] not null default '{}', - cron_expr varchar(64) not null, - inputs jsonb not null, - output_ text not null, - tasks jsonb not null, - defaults jsonb, - webhooks jsonb, - auto_delete jsonb, - secrets jsonb, - created_at timestamp not null, - created_by varchar(32) not null references users(id), - state varchar(10) not null + id varchar(32) not null primary key, + name varchar(64) not null, + description text not null, + tags text[] not null default '{}', + cron_expr varchar(64) not null, + inputs jsonb not null, + output_ text not null, + tasks jsonb not null, + defaults jsonb, + webhooks jsonb, + auto_delete jsonb, + secrets jsonb, + created_at timestamp not null, + created_by varchar(32) not null references users(id), + state varchar(10) not null ); CREATE TABLE scheduled_jobs_perms ( diff --git a/engine/datastore.go b/engine/datastore.go index fb65b288..66940bf5 100644 --- a/engine/datastore.go +++ b/engine/datastore.go @@ -162,6 +162,13 @@ func (ds *datastoreProxy) UpdateScheduledJob(ctx context.Context, id string, mod return ds.ds.UpdateScheduledJob(ctx, id, modify) } +func (ds *datastoreProxy) DeleteScheduledJob(ctx context.Context, id string) error { + if err := ds.checkInit(); err != nil { + return err + } + return ds.ds.DeleteScheduledJob(ctx, id) +} + func (ds *datastoreProxy) CreateUser(ctx context.Context, u *tork.User) error { if err := ds.checkInit(); err != nil { return err diff --git a/internal/coordinator/api/api.go b/internal/coordinator/api/api.go index b223db39..05912081 100644 --- a/internal/coordinator/api/api.go +++ b/internal/coordinator/api/api.go @@ -137,6 +137,7 @@ func NewAPI(cfg Config) (*API, error) { r.GET("/scheduled-jobs", s.listScheduledJobs) r.PUT("/scheduled-jobs/:id/pause", s.pauseScheduledJob) r.PUT("/scheduled-jobs/:id/resume", s.resumeScheduledJob) + r.DELETE("/scheduled-jobs/:id", s.deleteScheduledJob) } if v, ok := cfg.Enabled["metrics"]; !ok || v { r.GET("/metrics", s.getMetrics) @@ -632,6 +633,25 @@ func (s *API) resumeScheduledJob(c echo.Context) error { return c.JSON(http.StatusOK, map[string]string{"status": "OK"}) } +func (s *API) deleteScheduledJob(c echo.Context) error { + id := c.Param("id") + j, err := s.ds.GetScheduledJobByID(c.Request().Context(), id) + if err != nil { + return echo.NewHTTPError(http.StatusNotFound, err.Error()) + } + if j.State != tork.ScheduledJobStateActive { + return echo.NewHTTPError(http.StatusBadRequest, "scheduled job is not active") + } + j.State = tork.ScheduledJobStatePaused + if err := s.ds.DeleteScheduledJob(c.Request().Context(), id); err != nil { + return err + } + if err := s.broker.PublishEvent(c.Request().Context(), broker.TOPIC_JOB_SCHEDULED, j); err != nil { + return err + } + return c.JSON(http.StatusOK, map[string]string{"status": "OK"}) +} + // getTask // @Summary Get a task by id // @Tags tasks diff --git a/internal/coordinator/handlers/schedule.go b/internal/coordinator/handlers/schedule.go index c8fb70a8..1d16af41 100644 --- a/internal/coordinator/handlers/schedule.go +++ b/internal/coordinator/handlers/schedule.go @@ -151,9 +151,12 @@ func (h *jobSchedulerHandler) handlePaused(_ context.Context, s *tork.ScheduledJ if !ok { return errors.Errorf("unknown scheduled job: %s", s.ID) } - log.Info().Msgf("Pausing scheduled job %s", s.ID) + log.Info().Msgf("Pausing scheduled job %s", gjob.ID()) if err := h.scheduler.RemoveJob(gjob.ID()); err != nil { return err } + h.mu.Lock() + delete(h.m, s.ID) + h.mu.Unlock() return nil }