Add timeout to standard pilot fetch#1255
Conversation
brandur
left a comment
There was a problem hiding this comment.
Thanks!
@bgentry Any strong opinions on how you want to handle this one? Another option is to just put the timeout in producer.go in dispatchWork:
func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultCh chan<- producerFetchResult) {
// This intentionally removes any deadlines or cancellation from the parent
// context because we don't want it to get cancelled if the producer is asked
// to shut down. In that situation, we want to finish fetching any jobs we are
// in the midst of fetching, work them, and then stop. Otherwise we'd have a
// risk of shutting down when we had already fetched jobs in the database,
// leaving those jobs stranded. We'd then potentially have to release them
// back to the queue.
ctx := context.WithoutCancel(workCtx)
// Maximum size of the `attempted_by` array on each job row. This maximum is
// rarely hit, but exists to protect against degenerate cases.
const maxAttemptedBy = 100
jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{
ClientID: p.config.ClientID,
MaxAttemptedBy: maxAttemptedBy,
MaxToLock: count,
Now: p.Time.NowOrNil(),
Queue: p.config.Queue,
ProducerID: p.id.Load(),
Schema: p.config.Schema,
})
if err != nil {
fetchResultCh <- producerFetchResult{err: err}
return
}
fetchResultCh <- producerFetchResult{jobs: jobs}
}That might be better in the way that not every pilot needs to remember to bring its own context cancellations. That said, maybe in this case we might want a longer cancellation for the pro pilot so it'd make sense to break up the two.
|
Another more robust option is to do fetches within a transaction that sets a |
|
@bgentry WFM. It'd probably be worth re-running the benchmark on the branch to verify no major degradation in performance, but given the fetch queries are relatively few compared to everything else, hopefully there wouldn't be. You were previously against use of |
Summary
Fix
StandardPilot.JobGetAvailableso a stalled fetch does not hang a producer indefinitely.Problem
producer.dispatchWorkintentionally strips cancellation from the work context before fetching jobs so an in-flight fetch is allowed to complete during shutdown:producer.go:744-766That is reasonable, but
StandardPilot.JobGetAvailableforwarded directly toexec.JobGetAvailablewith no timeout at all:rivershared/riverpilot/standard_pilot.go:18-22This meant a stalled driver call could block a standard-pilot producer forever. The pro pilot already applies per-attempt fetch timeouts, so the standard pilot was the outlier.
Change
Add a 10-second timeout inside
StandardPilot.JobGetAvailablebefore calling the driver.This keeps the existing shutdown semantics intact:
dispatchWorkThe timeout is local to the standard pilot so there is no driver SQL change and no producer state-machine change.
Testing
rivershared/riverpilot/standard_pilot_test.goMaxToLock <= 0no-op behaviorJobGetAvailablecall timing out withcontext.DeadlineExceededVerification
Locally verified with:
GOPROXY=https://goproxy.cn,direct GOSUMDB=off go test ./rivershared/riverpilot -count=1Closes #1026.