Was skimming through this to debug a customer issue. Realized that we explicitly remove all cancellation from the context, which is intentional because it's derived from work context and we don't want to halt an in-progress fetch during shutdown because we may have already locked/updated those jobs and would need to cleanly release them to avoid them needing to be rescued. With the pro pilot, we have a retry loop with individualized per-attempt timeouts applied. However, for the standard pilot, there are no additional timeouts applied at all to this method:
|
func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultCh chan<- producerFetchResult) { |
|
// This intentionally removes any deadlines or cancellation from the parent |
|
// context because we don't want it to get cancelled if the producer is asked |
|
// to shut down. In that situation, we want to finish fetching any jobs we are |
|
// in the midst of fetching, work them, and then stop. Otherwise we'd have a |
|
// risk of shutting down when we had already fetched jobs in the database, |
|
// leaving those jobs stranded. We'd then potentially have to release them |
|
// back to the queue. |
|
ctx := context.WithoutCancel(workCtx) |
|
|
|
// Maximum size of the `attempted_by` array on each job row. This maximum is |
|
// rarely hit, but exists to protect against degenerate cases. |
|
const maxAttemptedBy = 100 |
|
|
|
jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{ |
|
ClientID: p.config.ClientID, |
|
MaxAttemptedBy: maxAttemptedBy, |
|
MaxToLock: count, |
|
Now: p.Time.NowUTCOrNil(), |
|
Queue: p.config.Queue, |
|
ProducerID: p.id.Load(), |
|
Schema: p.config.Schema, |
|
}) |
|
if err != nil { |
|
fetchResultCh <- producerFetchResult{err: err} |
|
return |
|
} |
|
|
|
fetchResultCh <- producerFetchResult{jobs: jobs} |
|
} |
What's your thought @brandur? Should we apply a fixed timeout to these fetches in the standard pilot so that it doesn't hang indefinitely (freezing the producer and/or blocking shutdown), or something else?
Was skimming through this to debug a customer issue. Realized that we explicitly remove all cancellation from the context, which is intentional because it's derived from work context and we don't want to halt an in-progress fetch during shutdown because we may have already locked/updated those jobs and would need to cleanly release them to avoid them needing to be rescued. With the pro pilot, we have a retry loop with individualized per-attempt timeouts applied. However, for the standard pilot, there are no additional timeouts applied at all to this method:
river/producer.go
Lines 735 to 764 in ed11967
What's your thought @brandur? Should we apply a fixed timeout to these fetches in the standard pilot so that it doesn't hang indefinitely (freezing the producer and/or blocking shutdown), or something else?