Skip to content

Commit

Permalink
origin modules not active,thanks robfig#423 resolve other pull reques…
Browse files Browse the repository at this point in the history
…t conflict
  • Loading branch information
penglj committed May 9, 2022
1 parent 0798dc2 commit 4d1a910
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 22 deletions.
45 changes: 24 additions & 21 deletions cron.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package cron

import (
"container/heap"
"context"
"errors"
"fmt"
"sort"
"sync"
"time"
)
Expand All @@ -13,7 +13,7 @@ import (
// specified by the schedule. It may be started, stopped, and the entries may
// be inspected while running.
type Cron struct {
entries []*Entry
entries EntryHeap
chain Chain
stop chan struct{}
add chan *Entry
Expand Down Expand Up @@ -242,7 +242,7 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job, entryOpts ...EntryOption) En
fn(entry)
}
if !c.running {
c.entries = append(c.entries, entry)
heap.Push(&c.entries, entry)
} else {
c.add <- entry
}
Expand Down Expand Up @@ -358,9 +358,6 @@ func (c *Cron) Run() {
// timeTillEarliestEntry returns the time remaining until the first
// execution of the earliest entry to run.
func (c *Cron) timeTillEarliestEntry(now time.Time) time.Duration {
// Determine the next entry to run.
sort.Sort(byTime(c.entries))

if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
Expand All @@ -376,11 +373,16 @@ func (c *Cron) run() {

// Figure out the next activation times for each entry.
now := c.now()
for _, entry := range c.entries {
sortedEntries := new(EntryHeap)
for len(c.entries) > 0 {
entry := heap.Pop(&c.entries).(*Entry)
entry.Next = entry.ScheduleFirst(now)
heap.Push(sortedEntries, entry)
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
}

c.entries = *sortedEntries

for {
// timer to timeout when it is time for the next entry to run.
timer := time.NewTimer(c.timeTillEarliestEntry(now))
Expand All @@ -391,34 +393,36 @@ func (c *Cron) run() {
now = now.In(c.location)
c.logger.Info("wake", "now", now)

del := 0

// Run every entry whose next time was less than now
for k, e := range c.entries {
for {
e := c.entries.Peek()
if e.Next.After(now) || e.Next.IsZero() {
break
}

if e.isPaused() {
// Updating Next and Prev so that the schedule continues to be maintained.
// This will help us proceed once the job is continued.
e = heap.Pop(&c.entries).(*Entry)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
c.logger.Info("paused,skip that schedule", "now", now, "entry", e.ID, "next", e.Next)
heap.Push(&c.entries, e)
continue
}

e = heap.Pop(&c.entries).(*Entry)

c.startJob(e.WrappedJob)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
// only run once
if e.Schedule.IsOnce() {
c.entries[k] = c.entries[del]
del++
if !e.Schedule.IsOnce() {
heap.Push(&c.entries, e)
} else {
c.logger.Info("job run once,not add again", "now", now, "entry", e.ID, "once", e.Schedule.IsOnce())
}

}
c.entries = c.entries[del:]

case newEntry := <-c.add:
timer.Stop()
Expand Down Expand Up @@ -554,13 +558,12 @@ func (c *Cron) entrySnapshot() []Entry {

// removeEntry removes the entry corresponding to the given ID from the entry list.
func (c *Cron) removeEntry(id EntryID) {
var entries []*Entry
for _, e := range c.entries {
if e.ID != id {
entries = append(entries, e)
for idx, e := range c.entries {
if e.ID == id {
heap.Remove(&c.entries, idx)
return
}
}
c.entries = entries
}

func (c *Cron) hasEntry(id EntryID) bool {
Expand Down
8 changes: 7 additions & 1 deletion cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cron

import (
"bytes"
"container/heap"
"fmt"
"log"
"strings"
Expand Down Expand Up @@ -487,9 +488,14 @@ func TestJob(t *testing.T) {
expecteds := []string{"job2", "job4", "job5", "job1", "job3", "job0"}

var actuals []string
for _, entry := range cron.Entries() {

clone := new(EntryHeap)
for len(cron.entries) > 0 {
entry := heap.Pop(&cron.entries).(*Entry)
actuals = append(actuals, entry.Job.(testJob).name)
heap.Push(clone, entry)
}
cron.entries = *clone

for i, expected := range expecteds {
if actuals[i] != expected {
Expand Down
37 changes: 37 additions & 0 deletions entry_heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package cron

type EntryHeap []*Entry

func (h *EntryHeap) Less(i, j int) bool {
if (*h)[i].Next.IsZero() {
return false
}
if (*h)[j].Next.IsZero() {
return true
}
return (*h)[i].Next.Before((*h)[j].Next)
}

func (h *EntryHeap) Swap(i, j int) {
(*h)[i], (*h)[j] = (*h)[j], (*h)[i]
}

func (h *EntryHeap) Len() int {
return len(*h)
}

func (h *EntryHeap) Pop() (v interface{}) {
*h, v = (*h)[:h.Len()-1], (*h)[h.Len()-1]
return
}

func (h *EntryHeap) Push(v interface{}) {
*h = append(*h, v.(*Entry))
}

func (h *EntryHeap) Peek() *Entry {
if len(*h) == 0 {
return nil
}
return (*h)[0]
}

0 comments on commit 4d1a910

Please sign in to comment.