From 75b76488d450253a8190f95022e9b19fbc76e8e3 Mon Sep 17 00:00:00 2001 From: Tristan Nicholls Date: Thu, 5 Jan 2023 20:37:49 +0100 Subject: [PATCH] fix(externalclock): fix setTimestamp racing condition --- externalclock/clock.go | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/externalclock/clock.go b/externalclock/clock.go index bb92f98..a484259 100644 --- a/externalclock/clock.go +++ b/externalclock/clock.go @@ -15,19 +15,21 @@ import ( ) type Clock struct { - Logger logr.Logger - timestampChan chan time.Time - timeMutex sync.Mutex - currentTime time.Time - tickerMutex sync.RWMutex - tickers map[string]*ticker + Logger logr.Logger + timestampChan chan time.Time // synchronization SetTimestamp -> Run + timestampUpdated chan struct{} // synchronization Run -> SetTimestamp + timeMutex sync.Mutex + currentTime time.Time + tickerMutex sync.RWMutex + tickers map[string]*ticker } func New(logger logr.Logger, initialTime time.Time) *Clock { c := &Clock{ - Logger: logger, - timestampChan: make(chan time.Time), - tickers: map[string]*ticker{}, + Logger: logger, + timestampChan: make(chan time.Time), + tickers: map[string]*ticker{}, + timestampUpdated: make(chan struct{}), } c.currentTime = initialTime return c @@ -35,6 +37,7 @@ func New(logger logr.Logger, initialTime time.Time) *Clock { func (g *Clock) SetTimestamp(t time.Time) { g.timestampChan <- t + <-g.timestampUpdated // wait for time to be set before returning } func (g *Clock) NumberOfTriggers() int { @@ -53,6 +56,10 @@ func (g *Clock) Run(ctx context.Context) error { case recvTime := <-g.timestampChan: g.timeMutex.Lock() g.tickerMutex.RLock() + + g.currentTime = recvTime // ok to set early since reading is locked while in here. + g.timestampUpdated <- struct{}{} // notify SetTimestamp. + for _, tickerInstance := range g.tickers { if !tickerInstance.IsDurationReached(recvTime) { continue @@ -69,7 +76,6 @@ func (g *Clock) Run(ctx context.Context) error { g.Logger.V(1).Info("ticker dropped message", "caller", tickerInstance.caller) } } - g.setTime(recvTime) g.timeMutex.Unlock() g.tickerMutex.RUnlock() } @@ -82,11 +88,6 @@ func (g *Clock) getTime() time.Time { return g.currentTime } -func (g *Clock) setTime(newTime time.Time) { - // note: mutex should be locked when arriving here. - g.currentTime = newTime -} - func (g *Clock) After(duration time.Duration) <-chan time.Time { _, file, no, ok := runtime.Caller(1) var calledFrom string