-
Notifications
You must be signed in to change notification settings - Fork 0
/
task.go
146 lines (115 loc) · 3.88 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package scheduled
import (
"context"
"fmt"
"time"
"github.com/essentialkaos/ek/v12/cron"
"github.com/google/uuid"
)
type TaskOpts struct {
// Underlying function to be ran within the scheduler.
Fn TaskFunc
// In case of an error in Fn, ErrFn will be executed if provided.
ErrFn TaskErrFunc
// Offsets the initial startup to a given start time. By default it will start immediately on schedule.
StartTime time.Time
// Restricts the time boundary of the runtime to an end date. By default it is unbound.
EndTime time.Time
// Interval for Fn's execution within the scheduler. This is the function's tick.
Interval time.Duration
// Limits the amount of times a single task can be executed in the runtime. After `MaxIter` steps, it will end.
// By default, it is 0 (infinite). Negative values are not allowed.
MaxIter int
// Allows the scheduling based on a CRON string. Overrides `Interval`
Cron string
}
type TaskFunc func() error
type TaskErrFunc func(err error)
type task struct {
// Task identifier is used to be able to get and stop tasks already registered in the scheduler
ID uuid.UUID
// Underlying function to be ran within the scheduler
Fn TaskFunc
// In case of an error in Fn, ErrFn will be executed if provided
ErrFn TaskErrFunc
// Offsets the initial startup to a given start time. By default it will start immediately on schedule.
StartTime time.Time
// Restricts the time boundary of the runtime to an end date. By default it is unbound.
EndTime *time.Time
// Interval for Fn's execution within the scheduler. This is the function's tick.
Interval time.Duration
// Limits the amount of times a single task can be executed in the runtime. After `MaxIter` steps, it will end.
// By default, it is 0 (infinite). Negative values are not allowed.
MaxIter int
// Allows the scheduling based on a CRON string. Overrides `Interval`
cron *cron.Expr
ctx context.Context
cancel context.CancelFunc
// Internal timer to keep track of ticks
timer *time.Timer
// Counts the current iteration. Only used if `MaxIter` is set. A task will stop once `itercnt == MaxIter`
itercnt int
}
func NewTask(opts TaskOpts) *task {
// If the caller has picked neither CRON nor interval, then we have no idea how to schedule the task
if opts.Cron == "" && opts.Interval <= 0 {
panic("neither cron nor interval is set")
}
// Only set the cron expression if the value is set
var cronExpr *cron.Expr = nil
if opts.Cron != "" {
expr := &opts.Cron
// Parse the cron expression to validate the task
c, err := cron.Parse(*expr)
if err != nil {
// @TODO: return an error
panic(err)
}
cronExpr = c
}
var (
// uuid.NewUUID() only returns an (uuid, error) for historical reasons.
// It never returns an error anymore, so can be safely ignored.
// [Issue](https://github.com/google/uuid/issues/63)
taskid, _ = uuid.NewUUID()
// Internal context for the task. Used for cancellation.
ctx, cancel = context.WithCancel(context.Background())
endTime *time.Time
)
// If EndTime is not set, it defaults to 0001-01-01 00:00:00 +0000 UTC
if !opts.EndTime.IsZero() {
// Only allow end time if it is in the future
if opts.EndTime.Before(time.Now()) {
panic(fmt.Sprintf("task was scheduled to end in the past: %s", opts.EndTime.String()))
}
endTime = &opts.EndTime
}
return &task{
ID: taskid,
Fn: opts.Fn,
ErrFn: opts.ErrFn,
Interval: opts.Interval,
MaxIter: opts.MaxIter,
StartTime: opts.StartTime,
EndTime: endTime,
cron: cronExpr,
ctx: ctx,
cancel: cancel,
}
}
func (t *task) run() {
if err := t.Fn(); err != nil {
if t.ErrFn != nil {
t.ErrFn(err)
}
}
}
// resetTimer sets the next tick for the task to run with its internal timer.
func (t *task) resetTimer() {
var next = t.Interval
if t.cron != nil {
// CRON has a precedence over interval
next = time.Until(t.cron.Next())
}
t.timer.Reset(next)
}