diff --git a/scheduler.go b/scheduler.go index fb1f26c..ea38acf 100644 --- a/scheduler.go +++ b/scheduler.go @@ -9,7 +9,7 @@ import ( ) type Scheduler struct { - tasks map[uuid.UUID]*Task + tasks map[uuid.UUID]*task taskLock sync.Mutex } @@ -18,23 +18,22 @@ type Scheduler struct { // To use the scheduler, add a task via `scheduler.RegisterTask` or `scheduler.RunOnce` func NewScheduler() *Scheduler { return &Scheduler{ - tasks: make(map[uuid.UUID]*Task), + tasks: make(map[uuid.UUID]*task), } } -// Register allows for a task to be added within the execution loop of the scheduler -func (s *Scheduler) RegisterTask(t *Task) (uuid.UUID, error) { +// RegisterTask allows for a task to be added within the execution loop of the scheduler +func (s *Scheduler) RegisterTask(t *task) error { return s.registerTask(t, false) } // RunOnce allows a one-time execution of a task directly within the runtime of the scheduler -func (s *Scheduler) RunOnce(t *Task) error { - _, err := s.registerTask(t, true) - return err +func (s *Scheduler) RunOnce(t *task) error { + return s.registerTask(t, true) } // GetTask returns a task registered under provided uuid.UUID. If the task is not registered, the function returns an error. -func (s *Scheduler) GetTask(idx uuid.UUID) (*Task, error) { +func (s *Scheduler) GetTask(idx uuid.UUID) (*task, error) { s.taskLock.Lock() defer s.taskLock.Unlock() @@ -52,6 +51,9 @@ func (s *Scheduler) RemoveTask(idx uuid.UUID) error { return err } + s.taskLock.Lock() + defer s.taskLock.Unlock() + delete(s.tasks, idx) return nil } @@ -73,49 +75,36 @@ func (s *Scheduler) Stop() { // registerTask validates task's correctness before adding it to the group of tasks. // Tasks can run either on a schedule, or be executed only once within the context of the scheduler via the parameter `runOnce`. -func (s *Scheduler) registerTask(t *Task, runOnce bool) (uuid.UUID, error) { +func (s *Scheduler) registerTask(t *task, runOnce bool) error { s.taskLock.Lock() defer s.taskLock.Unlock() - // Go panics with duration <0 - if t.Interval <= time.Duration(0) { - return uuid.UUID{}, fmt.Errorf("invalid interval=%d", t.Interval) - } - - // Tasks are identified with Google's UUID, but long-term it would be nice - // to have internal ID generation to eliminate external dependency - taskid, _ := uuid.NewUUID() - if _, ok := s.tasks[taskid]; ok { - return uuid.UUID{}, fmt.Errorf("taskid=%s is already registered", taskid) - } - - // Only add the task to the hash-map of tasks if it's not a one-time run. - if !runOnce { - s.tasks[taskid] = t + if _, ok := s.tasks[t.ID]; ok { + return fmt.Errorf("taskid=%s is already registered", t.ID) } + s.tasks[t.ID] = t s.execTask(t, runOnce) - return taskid, nil + return nil } -// exec is the entrypoint for the execution of the task. It only accepts a task's identifier. +// execTask is the entrypoint for the execution of the task. It only accepts a task's identifier. // Tasks are ran in goroutines, which belong to each task respectively. // // @TODO: Should exec accept a task pointer to decouple it from the internal array buffer of tasks? -func (s *Scheduler) execTask(task *Task, runOnce bool) { +func (s *Scheduler) execTask(task *task, runOnce bool) { go func() { time.AfterFunc(time.Until(task.StartTime), func() { if err := task.ctx.Err(); err != nil { - // @TODO: Add IDs to the task itself // @TODO: Never print stuff to a console directly // A) Return as a slice of errors to the caller // B) Expose a logging interface for the caller to have a control over // C) Ignore (?) - fmt.Printf("err: task is cancelled but wanted to be ran\n") + fmt.Printf("err: task=%s is cancelled but wanted to be ran\n", task.ID) // Make sure to also stop the tick timer - if task.t != nil { - task.t.Stop() + if task.timer != nil { + task.timer.Stop() } return } @@ -128,23 +117,13 @@ func (s *Scheduler) execTask(task *Task, runOnce bool) { tick = time.Until(task.cron.Next()) } - task.t = time.AfterFunc(tick, func() { + task.timer = time.AfterFunc(tick, func() { go task.run() defer func() { if !runOnce { - // Reset the internal timer. - // @TODO: Is there a cleaner way to set this? - if task.cron != nil { - // CRON reset - task.t.Reset(time.Until(task.cron.Next())) - } else { - // Interval reset - task.t.Reset(task.Interval) - } + task.resetTimer() } else { - // Cancel the context and stop the internal timer for a runOnce task - task.cancel() - task.t.Stop() + s.RemoveTask(task.ID) } }() }) @@ -167,8 +146,8 @@ func (s *Scheduler) stopTask(taskid uuid.UUID) error { s.tasks[taskid].cancel() // Stops the internal timer of the task - if s.tasks[taskid].t != nil { - s.tasks[taskid].t.Stop() + if s.tasks[taskid].timer != nil { + s.tasks[taskid].timer.Stop() } return nil } diff --git a/scheduler_test.go b/scheduler_test.go index e3413bb..a1279ab 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -15,19 +15,18 @@ func TestSchedulerAddsTask(t *testing.T) { fmt.Println("hello, world!") return nil }, - // Large enough interval for the whole test to finish so that we do not print anything to the console Interval: 15 * time.Second, }) scheduler := scheduled.NewScheduler() - taskid, err := scheduler.RegisterTask(task) + err := scheduler.RegisterTask(task) require.NoError(t, err) - registeredTask, err := scheduler.GetTask(taskid) + registeredTask, err := scheduler.GetTask(task.ID) require.NoError(t, err) require.Equal(t, task, registeredTask) - err = scheduler.RemoveTask(taskid) + err = scheduler.RemoveTask(task.ID) require.NoError(t, err) } @@ -41,13 +40,13 @@ func TestSchedulerRemovesTask(t *testing.T) { }) scheduler := scheduled.NewScheduler() - taskid, err := scheduler.RegisterTask(task) + err := scheduler.RegisterTask(task) require.NoError(t, err) - err = scheduler.RemoveTask(taskid) + err = scheduler.RemoveTask(task.ID) require.NoError(t, err) - _, err = scheduler.GetTask(taskid) + _, err = scheduler.GetTask(task.ID) require.Error(t, err) } @@ -63,7 +62,7 @@ func TestSchedulerExecutesTaskAtLeastOnce(t *testing.T) { }) scheduler := scheduled.NewScheduler() - _, err := scheduler.RegisterTask(task) + err := scheduler.RegisterTask(task) require.NoError(t, err) test_loop: @@ -120,7 +119,7 @@ func TestSchedulerExecutesTaskMultipleTimes(t *testing.T) { }) scheduler := scheduled.NewScheduler() - idx, err := scheduler.RegisterTask(task) + err := scheduler.RegisterTask(task) require.NoError(t, err) var count = 0 @@ -138,7 +137,7 @@ test_loop: } } - err = scheduler.RemoveTask(idx) + err = scheduler.RemoveTask(task.ID) require.NoError(t, err) } @@ -155,7 +154,7 @@ func TestSchedulerExecutesTaskAtStartTime(t *testing.T) { }) scheduler := scheduled.NewScheduler() - _, err := scheduler.RegisterTask(task) + err := scheduler.RegisterTask(task) require.NoError(t, err) test_loop: @@ -186,7 +185,7 @@ func TestSchedulerExecutesCRONTaskAtLeastOnce(t *testing.T) { }) scheduler := scheduled.NewScheduler() - _, err := scheduler.RegisterTask(task) + err := scheduler.RegisterTask(task) require.NoError(t, err) test_loop: diff --git a/task.go b/task.go index c6ac502..7e5153c 100644 --- a/task.go +++ b/task.go @@ -6,6 +6,7 @@ import ( "time" "github.com/essentialkaos/ek/v12/cron" + "github.com/google/uuid" ) type TaskOpts struct { @@ -28,7 +29,10 @@ type TaskOpts struct { type TaskFunc func() error type TaskErrFunc func(err error) -type Task struct { +type task struct { + // Task identifier is used to be able to get and stop tasks already registered in the scheduler + ID uuid.UUID + // Underlying function to be ran within the scheduler Fn TaskFunc @@ -47,13 +51,14 @@ type Task struct { ctx context.Context cancel context.CancelFunc - t *time.Timer + timer *time.Timer } -func NewTask(opts TaskOpts) *Task { - var ( - ctx, cancel = context.WithCancel(context.Background()) - ) +func NewTask(opts TaskOpts) *task { + // If the caller has picked neither CRON nor interval, then we have no idea how to schedule the task + if opts.Cron == "" && opts.Interval <= 0 { + panic("neither cron nor interval is set") + } // Only set the cron expression if the value is set var cronExpr *cron.Expr = nil @@ -69,7 +74,18 @@ func NewTask(opts TaskOpts) *Task { cronExpr = c } - return &Task{ + var ( + // uuid.NewUUID() only returns an (uuid, error) for historical reasons. + // It never returns an error anymore, so can be safely ignored. + // [Issue](https://github.com/google/uuid/issues/63) + taskid, _ = uuid.NewUUID() + + // Internal context for the task. Used for cancellation. + ctx, cancel = context.WithCancel(context.Background()) + ) + + return &task{ + ID: taskid, Fn: opts.Fn, ErrFn: opts.ErrFn, Interval: opts.Interval, @@ -80,7 +96,7 @@ func NewTask(opts TaskOpts) *Task { } } -func (t *Task) run() { +func (t *task) run() { if err := t.Fn(); err != nil { if t.ErrFn != nil { // While it may not be pretty, it works. @@ -93,3 +109,13 @@ func (t *Task) run() { return } } + +// resetTimer sets the next tick for the task to run with its internal timer. +func (t *task) resetTimer() { + var next = t.Interval + if t.cron != nil { + // CRON has a precedence over interval + next = time.Until(t.cron.Next()) + } + t.timer.Reset(next) +}