Skip to content

Commit 7a3d575

Browse files
committed
feat: context with timeout
For context with external time context with timeouts can be used.
1 parent 0649f69 commit 7a3d575

File tree

10 files changed

+508
-42
lines changed

10 files changed

+508
-42
lines changed

pkg/clock/clock.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package clock
33

44
import (
5+
"context"
56
"time"
67

78
"google.golang.org/protobuf/types/known/timestamppb"
@@ -12,6 +13,9 @@ type Clock interface {
1213
// After waits for the duration to elapse and then sends the current time on the returned channel.
1314
After(duration time.Duration) <-chan time.Time
1415

16+
// AfterFunc waits for the duration to elapse and then calls the function f given to it.
17+
AfterFunc(d time.Duration, f func()) Timer
18+
1519
// NewTicker returns a new Ticker.
1620
NewTicker(d time.Duration) Ticker
1721

@@ -37,3 +41,19 @@ type Ticker interface {
3741
// Stop the Ticker.
3842
Stop()
3943
}
44+
45+
type Timer interface {
46+
// C returns the channel on which the timer is going to be triggered.
47+
C() <-chan time.Time
48+
49+
// Stop the Timer.
50+
Stop() bool
51+
}
52+
53+
// ContextCtrl is clocked by an external interface.
54+
type ContextCtrl interface {
55+
WithTimeout(
56+
parent context.Context,
57+
timeout time.Duration,
58+
) (context.Context, context.CancelFunc)
59+
}

pkg/clock/system.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,22 @@ func (c systemClock) Sleep(d time.Duration) {
3939
time.Sleep(d)
4040
}
4141

42+
func (c systemClock) AfterFunc(d time.Duration, f func()) Timer {
43+
return &systemTimer{Timer: time.AfterFunc(d, f)}
44+
}
45+
4246
type systemTicker struct {
4347
time.Ticker
4448
}
4549

4650
func (t systemTicker) C() <-chan time.Time {
4751
return t.Ticker.C
4852
}
53+
54+
type systemTimer struct {
55+
*time.Timer
56+
}
57+
58+
func (t systemTimer) C() <-chan time.Time {
59+
return t.Timer.C
60+
}

pkg/contextctrl/context.go

Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
package contextctrl
2+
3+
import (
4+
"context"
5+
"sync"
6+
"sync/atomic"
7+
"time"
8+
9+
"github.com/einride/clock-go/pkg/clock"
10+
)
11+
12+
// newCancelCtx returns an initialized cancelCtx.
13+
func newCancelCtx(parent context.Context) cancelCtx {
14+
return cancelCtx{Context: parent}
15+
}
16+
17+
// goroutines counts the number of goroutines ever created; for testing.
18+
var goroutines int32 //nolint:gochecknoglobals
19+
20+
// propagateCancel arranges for child to be canceled when parent is.
21+
func propagateCancel(parent context.Context, child canceler) {
22+
done := parent.Done() //nolint:ifshort
23+
if done == nil {
24+
return // parent is never canceled
25+
}
26+
27+
select {
28+
case <-done:
29+
// parent is already canceled
30+
child.cancel(false, parent.Err())
31+
return
32+
default:
33+
}
34+
35+
if p, ok := parentCancelCtx(parent); ok {
36+
p.mu.Lock()
37+
if p.err != nil {
38+
// parent has already been canceled
39+
child.cancel(false, p.err)
40+
} else {
41+
if p.children == nil {
42+
p.children = make(map[canceler]struct{})
43+
}
44+
p.children[child] = struct{}{}
45+
}
46+
p.mu.Unlock()
47+
} else {
48+
atomic.AddInt32(&goroutines, +1)
49+
go func() {
50+
select {
51+
case <-parent.Done():
52+
child.cancel(false, parent.Err())
53+
case <-child.Done():
54+
}
55+
}()
56+
}
57+
}
58+
59+
// &cancelCtxKey is the key that a cancelCtx returns itself for.
60+
var cancelCtxKey int //nolint:gochecknoglobals
61+
62+
// parentCancelCtx returns the underlying *cancelCtx for parent.
63+
// It does this by looking up parent.Value(&cancelCtxKey) to find
64+
// the innermost enclosing *cancelCtx and then checking whether
65+
// parent.Done() matches that *cancelCtx. (If not, the *cancelCtx
66+
// has been wrapped in a custom implementation providing a
67+
// different done channel, in which case we should not bypass it.)
68+
func parentCancelCtx(parent context.Context) (*cancelCtx, bool) {
69+
done := parent.Done()
70+
if done == closedchan || done == nil {
71+
return nil, false
72+
}
73+
p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
74+
if !ok {
75+
return nil, false
76+
}
77+
pdone, _ := p.done.Load().(chan struct{})
78+
if pdone != done {
79+
return nil, false
80+
}
81+
return p, true
82+
}
83+
84+
// removeChild removes a context from its parent.
85+
func removeChild(parent context.Context, child canceler) {
86+
p, ok := parentCancelCtx(parent)
87+
if !ok {
88+
return
89+
}
90+
p.mu.Lock()
91+
if p.children != nil {
92+
delete(p.children, child)
93+
}
94+
p.mu.Unlock()
95+
}
96+
97+
// A canceler is a context type that can be canceled directly. The
98+
// implementations are *cancelCtx and *timerCtx.
99+
type canceler interface {
100+
cancel(removeFromParent bool, err error)
101+
Done() <-chan struct{}
102+
}
103+
104+
// closedchan is a reusable closed channel.
105+
var closedchan = make(chan struct{}) //nolint:gochecknoglobals
106+
107+
func init() { //nolint:gochecknoinits
108+
close(closedchan)
109+
}
110+
111+
// A cancelCtx can be canceled. When canceled, it also cancels any children
112+
// that implement canceler.
113+
type cancelCtx struct {
114+
context.Context
115+
116+
mu sync.Mutex // protects following fields
117+
done atomic.Value // of chan struct{}, created lazily, closed by first cancel call
118+
children map[canceler]struct{} // set to nil by the first cancel call
119+
err error // set to non-nil by the first cancel call
120+
}
121+
122+
func (c *cancelCtx) Value(key interface{}) interface{} {
123+
if key == &cancelCtxKey {
124+
return c
125+
}
126+
return c.Context.Value(key)
127+
}
128+
129+
func (c *cancelCtx) Done() <-chan struct{} {
130+
d := c.done.Load()
131+
if d != nil {
132+
return d.(chan struct{})
133+
}
134+
c.mu.Lock()
135+
defer c.mu.Unlock()
136+
d = c.done.Load()
137+
if d == nil {
138+
d = make(chan struct{})
139+
c.done.Store(d)
140+
}
141+
return d.(chan struct{})
142+
}
143+
144+
func (c *cancelCtx) Err() error {
145+
c.mu.Lock()
146+
err := c.err
147+
c.mu.Unlock()
148+
return err
149+
}
150+
151+
// cancel closes C.done, cancels each of C's children, and, if
152+
// removeFromParent is true, removes C from its parent's children.
153+
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
154+
if err == nil {
155+
panic("context: internal error: missing cancel error")
156+
}
157+
c.mu.Lock()
158+
if c.err != nil {
159+
c.mu.Unlock()
160+
return // already canceled
161+
}
162+
c.err = err
163+
d, _ := c.done.Load().(chan struct{})
164+
if d == nil {
165+
c.done.Store(closedchan)
166+
} else {
167+
close(d)
168+
}
169+
for child := range c.children {
170+
// NOTE: acquiring the child's lock while holding parent's lock.
171+
child.cancel(false, err)
172+
}
173+
c.children = nil
174+
c.mu.Unlock()
175+
if removeFromParent {
176+
removeChild(c.Context, c)
177+
}
178+
}
179+
180+
// WithDeadline returns a copy of the parent context with the deadline adjusted
181+
// to be no later than d. If the parent's deadline is already earlier than d,
182+
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
183+
// context's Done channel is closed when the deadline expires, when the returned
184+
// cancel function is called, or when the parent context's Done channel is
185+
// closed, whichever happens first.
186+
//
187+
// Canceling this context releases resources associated with it, so code should
188+
// call cancel as soon as the operations running in this Context complete.
189+
func (g *ExternalContext) WithDeadline(parent context.Context, d time.Time) (context.Context, context.CancelFunc) {
190+
if parent == nil {
191+
panic("cannot create context from nil parent")
192+
}
193+
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
194+
// The current deadline is already sooner than the new one.
195+
return context.WithCancel(parent)
196+
}
197+
c := &timerCtx{
198+
cancelCtx: newCancelCtx(parent),
199+
deadline: d,
200+
}
201+
propagateCancel(parent, c)
202+
dur := d.Sub(g.C.Now())
203+
if dur <= 0 {
204+
c.cancel(true, context.DeadlineExceeded) // deadline has already passed
205+
return c, func() { c.cancel(false, context.Canceled) }
206+
}
207+
c.mu.Lock()
208+
defer c.mu.Unlock()
209+
if c.err == nil {
210+
c.timer = g.C.AfterFunc(dur, func() {
211+
c.cancel(true, context.DeadlineExceeded)
212+
})
213+
}
214+
return c, func() { c.cancel(true, context.Canceled) }
215+
}
216+
217+
// A timerCtx carries a Timer and a deadline. It embeds a cancelCtx to
218+
// implement Done and Err. It implements cancel by stopping its Timer then
219+
// delegating to cancelCtx.cancel.
220+
type timerCtx struct {
221+
cancelCtx
222+
timer clock.Timer // Under cancelCtx.mu.
223+
224+
deadline time.Time
225+
}
226+
227+
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
228+
return c.deadline, true
229+
}
230+
231+
func (c *timerCtx) cancel(removeFromParent bool, err error) {
232+
c.cancelCtx.cancel(false, err)
233+
if removeFromParent {
234+
// Remove this timerCtx from its parent cancelCtx's children.
235+
removeChild(c.cancelCtx.Context, c)
236+
}
237+
// C.mu.Lock() // removed from original code in context
238+
if c.timer != nil {
239+
tim := c.timer
240+
c.timer = nil
241+
tim.Stop()
242+
}
243+
// C.mu.Unlock() // removed from original code in context
244+
}
245+
246+
// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
247+
//
248+
// Canceling this context releases resources associated with it, so code should
249+
// call cancel as soon as the operations running in this Context complete:
250+
//
251+
// func slowOperationWithTimeout(ctx context.Context) (Result, error) {
252+
// ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
253+
// defer cancel() // releases resources if slowOperation completes before timeout elapses
254+
// return slowOperation(ctx)
255+
// }
256+
func (g *ExternalContext) WithTimeout(
257+
parent context.Context,
258+
timeout time.Duration,
259+
) (context.Context, context.CancelFunc) {
260+
return g.WithDeadline(parent, g.C.Now().Add(timeout))
261+
}
262+
263+
type ExternalContext struct {
264+
C clock.Clock
265+
}

0 commit comments

Comments
 (0)