Skip to content

Commit

Permalink
fix(externalclock): fix setTimestamp racing condition
Browse files Browse the repository at this point in the history
  • Loading branch information
tvkn committed Jan 11, 2023
1 parent 211f0e1 commit 75b7648
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions externalclock/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,29 @@ import (
)

type Clock struct {
Logger logr.Logger
timestampChan chan time.Time
timeMutex sync.Mutex
currentTime time.Time
tickerMutex sync.RWMutex
tickers map[string]*ticker
Logger logr.Logger
timestampChan chan time.Time // synchronization SetTimestamp -> Run
timestampUpdated chan struct{} // synchronization Run -> SetTimestamp
timeMutex sync.Mutex
currentTime time.Time
tickerMutex sync.RWMutex
tickers map[string]*ticker
}

func New(logger logr.Logger, initialTime time.Time) *Clock {
c := &Clock{
Logger: logger,
timestampChan: make(chan time.Time),
tickers: map[string]*ticker{},
Logger: logger,
timestampChan: make(chan time.Time),
tickers: map[string]*ticker{},
timestampUpdated: make(chan struct{}),
}
c.currentTime = initialTime
return c
}

func (g *Clock) SetTimestamp(t time.Time) {
g.timestampChan <- t
<-g.timestampUpdated // wait for time to be set before returning
}

func (g *Clock) NumberOfTriggers() int {
Expand All @@ -53,6 +56,10 @@ func (g *Clock) Run(ctx context.Context) error {
case recvTime := <-g.timestampChan:
g.timeMutex.Lock()
g.tickerMutex.RLock()

g.currentTime = recvTime // ok to set early since reading is locked while in here.
g.timestampUpdated <- struct{}{} // notify SetTimestamp.

for _, tickerInstance := range g.tickers {
if !tickerInstance.IsDurationReached(recvTime) {
continue
Expand All @@ -69,7 +76,6 @@ func (g *Clock) Run(ctx context.Context) error {
g.Logger.V(1).Info("ticker dropped message", "caller", tickerInstance.caller)
}
}
g.setTime(recvTime)
g.timeMutex.Unlock()
g.tickerMutex.RUnlock()
}
Expand All @@ -82,11 +88,6 @@ func (g *Clock) getTime() time.Time {
return g.currentTime
}

func (g *Clock) setTime(newTime time.Time) {
// note: mutex should be locked when arriving here.
g.currentTime = newTime
}

func (g *Clock) After(duration time.Duration) <-chan time.Time {
_, file, no, ok := runtime.Caller(1)
var calledFrom string
Expand Down

0 comments on commit 75b7648

Please sign in to comment.