Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Datastore interface {
GetScheduledJobs(ctx context.Context, currentUser string, page, size int) (*Page[*tork.ScheduledJobSummary], error)
GetScheduledJobByID(ctx context.Context, id string) (*tork.ScheduledJob, error)
UpdateScheduledJob(ctx context.Context, id string, modify func(u *tork.ScheduledJob) error) error
DeleteScheduledJob(ctx context.Context, id string) error

CreateUser(ctx context.Context, u *tork.User) error
GetUser(ctx context.Context, username string) (*tork.User, error)
Expand Down
10 changes: 10 additions & 0 deletions datastore/inmemory/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,16 @@ func (ds *InMemoryDatastore) UpdateScheduledJob(ctx context.Context, id string,
return nil
}

func (ds *InMemoryDatastore) DeleteScheduledJob(ctx context.Context, id string) error {
ds.jobs.Iterate(func(_ string, j *tork.Job) {
if j.Schedule != nil && j.Schedule.ID == id {
ds.jobs.Delete(j.ID)
}
})
ds.scheduledJobs.Delete(id)
return nil
}

func (ds *InMemoryDatastore) WithTx(ctx context.Context, f func(tx datastore.Datastore) error) error {
return f(ds)
}
Expand Down
27 changes: 27 additions & 0 deletions datastore/inmemory/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,3 +870,30 @@ func TestInMemoryUpdateScheduledJob(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, datastore.ErrJobNotFound, err)
}

func TestInMemoryDeleteScheduledJob(t *testing.T) {
ctx := context.Background()
ds := inmemory.NewInMemoryDatastore()

// Create a scheduled job
scheduledJob := &tork.ScheduledJob{
ID: uuid.NewUUID(),
Name: "Test Scheduled Job",
}
err := ds.CreateScheduledJob(ctx, scheduledJob)
assert.NoError(t, err)

// Verify the scheduled job exists
retrievedJob, err := ds.GetScheduledJobByID(ctx, scheduledJob.ID)
assert.NoError(t, err)
assert.Equal(t, scheduledJob.ID, retrievedJob.ID)

// Delete the scheduled job
err = ds.DeleteScheduledJob(ctx, scheduledJob.ID)
assert.NoError(t, err)

// Verify the scheduled job no longer exists
_, err = ds.GetScheduledJobByID(ctx, scheduledJob.ID)
assert.Error(t, err)
assert.Equal(t, datastore.ErrJobNotFound, err)
}
71 changes: 52 additions & 19 deletions datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,34 +799,44 @@ func (ds *PostgresDatastore) expungeExpiredJobs() (int, error) {
if err := ptx.select_(&ids, "select id from jobs where (delete_at < current_timestamp) OR (created_at < $1 AND (state = 'COMPLETED' or state = 'FAILED' or state = 'CANCELLED')) limit 1000", time.Now().UTC().Add(-*ds.jobsRetentionDuration)); err != nil {
return errors.Wrapf(err, "error getting list of expired job ids from the db")
}
if len(ids) == 0 {
return nil
}
if _, err := ptx.exec(`delete from jobs_perms where job_id = ANY($1);`, pq.StringArray(ids)); err != nil {
return errors.Wrapf(err, "error deleting expired job perms from the db")
}
if _, err := ptx.exec(`delete from tasks_log_parts where task_id in (select id from tasks where job_id = ANY($1));`, pq.StringArray(ids)); err != nil {
return errors.Wrapf(err, "error deleting expired task log parts from the db")
}
if _, err := ptx.exec(`delete from tasks where job_id = ANY($1);`, pq.StringArray(ids)); err != nil {
return errors.Wrapf(err, "error deleting expired tasks from the db")
}
res, err := ptx.exec(`delete from jobs where id = ANY($1);`, pq.StringArray(ids))
res, err := ds.deleteJobs(ptx, ids)
if err != nil {
return errors.Wrapf(err, "error deleting expired jobs from the db")
}
rows, err := res.RowsAffected()
if err != nil {
return errors.Wrapf(err, "error getting the number of deleted jobs from the db")
return err
}
n = int(rows)
n = res
return nil
}); err != nil {
return 0, err
}
return n, nil
}

func (ds *PostgresDatastore) deleteJobs(ptx *PostgresDatastore, ids []string) (int, error) {
var n int
if len(ids) == 0 {
return 0, nil
}
if _, err := ptx.exec(`delete from jobs_perms where job_id = ANY($1);`, pq.StringArray(ids)); err != nil {
return 0, errors.Wrapf(err, "error deleting expired job perms from the db")
}
if _, err := ptx.exec(`delete from tasks_log_parts where task_id in (select id from tasks where job_id = ANY($1));`, pq.StringArray(ids)); err != nil {
return 0, errors.Wrapf(err, "error deleting expired task log parts from the db")
}
if _, err := ptx.exec(`delete from tasks where job_id = ANY($1);`, pq.StringArray(ids)); err != nil {
return 0, errors.Wrapf(err, "error deleting expired tasks from the db")
}
res, err := ptx.exec(`delete from jobs where id = ANY($1);`, pq.StringArray(ids))
if err != nil {
return 0, errors.Wrapf(err, "error deleting expired jobs from the db")
}
rows, err := res.RowsAffected()
if err != nil {
return 0, errors.Wrapf(err, "error getting the number of deleted jobs from the db")
}
n = int(rows)
return n, nil
}

func (ds *PostgresDatastore) GetTaskLogParts(ctx context.Context, taskID, q string, page, size int) (*datastore.Page[*tork.TaskLogPart], error) {
searchTerm, _ := parseQuery(q)
offset := (page - 1) * size
Expand Down Expand Up @@ -1422,6 +1432,29 @@ func (ds *PostgresDatastore) UpdateScheduledJob(ctx context.Context, id string,
})
}

func (ds *PostgresDatastore) DeleteScheduledJob(ctx context.Context, id string) error {
return ds.WithTx(ctx, func(tx datastore.Datastore) error {
ptx, ok := tx.(*PostgresDatastore)
if !ok {
return errors.New("unable to cast to a postgres datastore")
}
ids := []string{}
if err := ptx.select_(&ids, "select id from jobs where scheduled_job_id = $1 ", id); err != nil {
return errors.Wrapf(err, "error getting list of scheduled job instance ids from the db")
}
if _, err := ds.deleteJobs(ptx, ids); err != nil {
return errors.Wrapf(err, "error deleting scheduled job instances from the db")
}
if _, err := ptx.exec(`delete from scheduled_jobs_perms where scheduled_job_id = $1`, id); err != nil {
return errors.Wrapf(err, "error deleting scheduled job perms from the db")
}
if _, err := ptx.exec(`delete from scheduled_jobs where id = $1`, id); err != nil {
return errors.Wrapf(err, "error deleting scheduled job from the db")
}
return nil
})
}

func (ds *PostgresDatastore) get(dest interface{}, query string, args ...interface{}) error {
if ds.tx != nil {
return ds.tx.Get(dest, query, args...)
Expand Down
23 changes: 23 additions & 0 deletions datastore/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1604,3 +1604,26 @@ func TestPostgresExpungeExpiredJobs(t *testing.T) {
}
}
}

func TestPostgresDeleteScheduledJob(t *testing.T) {
ctx := context.Background()
dsn := "host=localhost user=tork password=tork dbname=tork port=5432 sslmode=disable"
ds, err := NewPostgresDataStore(dsn)
assert.NoError(t, err)

now := time.Now().UTC()
sj := tork.ScheduledJob{
ID: uuid.NewUUID(),
Name: "Test Scheduled Job",
CreatedAt: now,
State: tork.ScheduledJobStateActive,
}
err = ds.CreateScheduledJob(ctx, &sj)
assert.NoError(t, err)

err = ds.DeleteScheduledJob(ctx, sj.ID)
assert.NoError(t, err)

_, err = ds.GetScheduledJobByID(ctx, sj.ID)
assert.Error(t, err)
}
30 changes: 15 additions & 15 deletions db/postgres/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,21 @@ CREATE TABLE users_roles (
CREATE UNIQUE INDEX idx_users_roles_uniq ON users_roles (user_id,role_id);

CREATE TABLE scheduled_jobs (
id varchar(32) not null primary key,
name varchar(64) not null,
description text not null,
tags text[] not null default '{}',
cron_expr varchar(64) not null,
inputs jsonb not null,
output_ text not null,
tasks jsonb not null,
defaults jsonb,
webhooks jsonb,
auto_delete jsonb,
secrets jsonb,
created_at timestamp not null,
created_by varchar(32) not null references users(id),
state varchar(10) not null
id varchar(32) not null primary key,
name varchar(64) not null,
description text not null,
tags text[] not null default '{}',
cron_expr varchar(64) not null,
inputs jsonb not null,
output_ text not null,
tasks jsonb not null,
defaults jsonb,
webhooks jsonb,
auto_delete jsonb,
secrets jsonb,
created_at timestamp not null,
created_by varchar(32) not null references users(id),
state varchar(10) not null
);

CREATE TABLE scheduled_jobs_perms (
Expand Down
7 changes: 7 additions & 0 deletions engine/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ func (ds *datastoreProxy) UpdateScheduledJob(ctx context.Context, id string, mod
return ds.ds.UpdateScheduledJob(ctx, id, modify)
}

func (ds *datastoreProxy) DeleteScheduledJob(ctx context.Context, id string) error {
if err := ds.checkInit(); err != nil {
return err
}
return ds.ds.DeleteScheduledJob(ctx, id)
}

func (ds *datastoreProxy) CreateUser(ctx context.Context, u *tork.User) error {
if err := ds.checkInit(); err != nil {
return err
Expand Down
20 changes: 20 additions & 0 deletions internal/coordinator/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func NewAPI(cfg Config) (*API, error) {
r.GET("/scheduled-jobs", s.listScheduledJobs)
r.PUT("/scheduled-jobs/:id/pause", s.pauseScheduledJob)
r.PUT("/scheduled-jobs/:id/resume", s.resumeScheduledJob)
r.DELETE("/scheduled-jobs/:id", s.deleteScheduledJob)
}
if v, ok := cfg.Enabled["metrics"]; !ok || v {
r.GET("/metrics", s.getMetrics)
Expand Down Expand Up @@ -632,6 +633,25 @@ func (s *API) resumeScheduledJob(c echo.Context) error {
return c.JSON(http.StatusOK, map[string]string{"status": "OK"})
}

func (s *API) deleteScheduledJob(c echo.Context) error {
id := c.Param("id")
j, err := s.ds.GetScheduledJobByID(c.Request().Context(), id)
if err != nil {
return echo.NewHTTPError(http.StatusNotFound, err.Error())
}
if j.State != tork.ScheduledJobStateActive {
return echo.NewHTTPError(http.StatusBadRequest, "scheduled job is not active")
}
j.State = tork.ScheduledJobStatePaused
if err := s.ds.DeleteScheduledJob(c.Request().Context(), id); err != nil {
return err
}
if err := s.broker.PublishEvent(c.Request().Context(), broker.TOPIC_JOB_SCHEDULED, j); err != nil {
return err
}
return c.JSON(http.StatusOK, map[string]string{"status": "OK"})
}

// getTask
// @Summary Get a task by id
// @Tags tasks
Expand Down
5 changes: 4 additions & 1 deletion internal/coordinator/handlers/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,12 @@ func (h *jobSchedulerHandler) handlePaused(_ context.Context, s *tork.ScheduledJ
if !ok {
return errors.Errorf("unknown scheduled job: %s", s.ID)
}
log.Info().Msgf("Pausing scheduled job %s", s.ID)
log.Info().Msgf("Pausing scheduled job %s", gjob.ID())
if err := h.scheduler.RemoveJob(gjob.ID()); err != nil {
return err
}
h.mu.Lock()
delete(h.m, s.ID)
h.mu.Unlock()
return nil
}