diff --git a/cron.go b/cron.go index c7e9176..4be8e2a 100644 --- a/cron.go +++ b/cron.go @@ -11,19 +11,20 @@ import ( // specified by the schedule. It may be started, stopped, and the entries may // be inspected while running. type Cron struct { - entries []*Entry - chain Chain - stop chan struct{} - add chan *Entry - remove chan EntryID - snapshot chan chan []Entry - running bool - logger Logger - runningMu sync.Mutex - location *time.Location - parser ScheduleParser - nextID EntryID - jobWaiter sync.WaitGroup + entries []*Entry + chain Chain + stop chan struct{} + add chan *Entry + remove chan EntryID + snapshot chan chan []Entry + scheduleNow chan EntryID + running bool + logger Logger + runningMu sync.Mutex + location *time.Location + parser ScheduleParser + nextID EntryID + jobWaiter sync.WaitGroup } // ScheduleParser is an interface for schedule spec parsers that return a Schedule @@ -97,32 +98,33 @@ func (s byTime) Less(i, j int) bool { // // Available Settings // -// Time Zone -// Description: The time zone in which schedules are interpreted -// Default: time.Local +// Time Zone +// Description: The time zone in which schedules are interpreted +// Default: time.Local // -// Parser -// Description: Parser converts cron spec strings into cron.Schedules. -// Default: Accepts this spec: https://en.wikipedia.org/wiki/Cron +// Parser +// Description: Parser converts cron spec strings into cron.Schedules. +// Default: Accepts this spec: https://en.wikipedia.org/wiki/Cron // -// Chain -// Description: Wrap submitted jobs to customize behavior. -// Default: A chain that recovers panics and logs them to stderr. +// Chain +// Description: Wrap submitted jobs to customize behavior. +// Default: A chain that recovers panics and logs them to stderr. // // See "cron.With*" to modify the default behavior. func New(opts ...Option) *Cron { c := &Cron{ - entries: nil, - chain: NewChain(), - add: make(chan *Entry), - stop: make(chan struct{}), - snapshot: make(chan chan []Entry), - remove: make(chan EntryID), - running: false, - runningMu: sync.Mutex{}, - logger: DefaultLogger, - location: time.Local, - parser: standardParser, + entries: nil, + chain: NewChain(), + add: make(chan *Entry), + stop: make(chan struct{}), + snapshot: make(chan chan []Entry), + remove: make(chan EntryID), + scheduleNow: make(chan EntryID), + running: false, + runningMu: sync.Mutex{}, + logger: DefaultLogger, + location: time.Local, + parser: standardParser, } for _, opt := range opts { opt(c) @@ -283,6 +285,17 @@ func (c *Cron) run() { c.entries = append(c.entries, newEntry) c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next) + case id := <-c.scheduleNow: + timer.Stop() + now = c.now().Add(100 * time.Millisecond) + for _, e := range c.entries { + if e.ID == id { + e.Next = now + c.logger.Info("scheduleNow", "now", now, "entry", e.ID, "next", e.Next) + break + } + } + case replyChan := <-c.snapshot: replyChan <- c.entrySnapshot() continue @@ -353,3 +366,8 @@ func (c *Cron) removeEntry(id EntryID) { } c.entries = entries } + +// ScheduleEntryNow schedules an entry to run 100ms from now. +func (c *Cron) ScheduleEntryNow(id EntryID) { + c.scheduleNow <- id +} diff --git a/cron_test.go b/cron_test.go index 36f06bf..587be3f 100644 --- a/cron_test.go +++ b/cron_test.go @@ -678,6 +678,39 @@ func TestMultiThreadedStartAndStop(t *testing.T) { cron.Stop() } +func TestScheduleEntryNow(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + + cron := newWithSeconds() + job0ID, err := cron.AddJob("0 0 0 30 Feb ?", testJob{wg, "job0"}) + if err != nil { + t.Fatal(err) + } + + cron.Start() + defer cron.Stop() + + time.Sleep(10 * time.Millisecond) + + if cron.Entry(job0ID).Next.Year() != 1 { + t.Fatal("expected job0 to be scheduled in year 0001") + } + + cron.ScheduleEntryNow(job0ID) + nextScheduleTimeMs := cron.Entry(job0ID).Next.Sub(time.Now()).Milliseconds() + if nextScheduleTimeMs < 0 || nextScheduleTimeMs > 100 { + t.Fatal("expected job0 to be scheduled in 100ms") + } + + select { + case <-time.After(120 * time.Millisecond): + t.FailNow() + case <-wait(wg): + } + +} + func wait(wg *sync.WaitGroup) chan bool { ch := make(chan bool) go func() {