Skip to content

Commit

Permalink
Pull cherry-picks into latest release branch (#7085)
Browse files Browse the repository at this point in the history
Co-authored-by: jim hollenbach <[email protected]>
Co-authored-by: Brandon Duffany <[email protected]>
Co-authored-by: Tyler Williams <[email protected]>
  • Loading branch information
4 people authored Jul 25, 2024
1 parent 11ed72e commit a10dffc
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ func (s *ExecutionServer) waitExecution(ctx context.Context, req *repb.WaitExecu
}
}

func loopAfterTimeout(ctx context.Context, timeout time.Duration, f func()) {
func loopAfterTimeout(ctx context.Context, timeout time.Duration, f func() bool) {
ticker := time.NewTicker(timeout)
defer ticker.Stop()
for {
Expand All @@ -843,7 +843,9 @@ func loopAfterTimeout(ctx context.Context, timeout time.Duration, f func()) {
}
case <-ticker.C:
{
f()
if shouldContinue := f(); !shouldContinue {
return
}
}
}
}
Expand Down Expand Up @@ -895,16 +897,18 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio
// if no pubsub listener receives our published updates, we *always*
// write the execution on stage == COMPLETE or after 5 seconds have
// passed with no writes.
go loopAfterTimeout(ctx, time.Second, func() {
go loopAfterTimeout(ctx, time.Second, func() bool {
mu.Lock()
defer mu.Unlock()
if time.Since(lastWrite) > 5*time.Second && taskID != "" {
if err := s.updateExecution(ctx, taskID, stage, lastOp); err != nil {
log.CtxWarningf(ctx, "PublishOperation: FlushWrite: error updating execution: %q: %s", taskID, err.Error())
return
return false
}
lastWrite = time.Now()
return false
}
return true
})

for {
Expand Down

0 comments on commit a10dffc

Please sign in to comment.