fix: use UPSERT to prevent duplicate DLQ entries#6
Merged
Conversation
When a job is requeued from the DLQ and fails again, it was creating a new DLQ entry instead of updating the existing one. This caused: 1. Duplicate DLQ entries for the same logical job (same job_key) 2. The original entry's failed_at timestamp was never updated 3. Auto-requeue cron jobs would pick up old entries repeatedly 4. Exponential growth of duplicate jobs in the queue The fix: - Add a unique partial index on job_key (WHERE job_key IS NOT NULL) - Change INSERT to UPSERT in add_to_dlq() and process_failed_jobs() - On conflict, update failed_at=NOW() and increment failure_count This ensures: - One DLQ entry per logical job (identified by job_key) - failed_at always reflects the most recent failure - Cooldown periods work correctly for auto-requeue - failure_count accumulates across all failures 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
f01b3a0 to
a8e7e02
Compare
Adds automatic cleanup on worker startup to handle stale state left by crashed workers: 1. **Stale queue lock cleanup** - Releases queue locks older than 5 minutes (DEFAULT_STALE_LOCK_TIMEOUT). Queue locks are normally held briefly during job selection, so any lock older than this is from a dead worker. 2. **Permanently failed job cleanup** - Deletes jobs with attempts >= max_attempts that are still in the main queue. These jobs are "dead" (is_available=false) and will never be processed again. They should already be in the DLQ. The cleanup runs automatically when `run_until_cancelled()` is called, before the worker starts processing jobs. Failures are logged but don't prevent the worker from starting. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
a8e7e02 to
01402ad
Compare
ceejbot
approved these changes
Dec 17, 2025
| Ok(deleted) | ||
| } | ||
|
|
||
| /// Run all startup cleanup tasks. |
|
|
||
| // Run startup cleanup to release stale locks and clean up failed jobs | ||
| if let Err(e) = self.client.startup_cleanup().await { | ||
| log::warn!("Startup cleanup failed (continuing anyway): {}", e); |
Owner
There was a problem hiding this comment.
The correct thing to do on a failure here, yeah.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Two fixes for worker queue health and DLQ reliability:
1. DLQ UPSERT - Prevent duplicate entries
When a job is requeued from the DLQ and fails again, it was creating a new DLQ entry instead of updating the existing one. This caused:
job_key)failed_attimestamps - original entry's timestamp was never updatedFix:
job_key(WHERE job_key IS NOT NULL)INSERTtoUPSERTinadd_to_dlq()andprocess_failed_jobs()failed_at=NOW()and incrementfailure_count2. Startup cleanup - Release stale locks and clean up dead jobs
When workers crash or are killed without graceful shutdown, they can leave behind:
attempts >= max_attemptsthat sit in the main queue foreverFix:
startup_cleanup()that runs automatically whenrun_until_cancelled()is calledrelease_stale_queue_locks()- releases queue locks older than 5 minutes (DEFAULT_STALE_LOCK_TIMEOUT)cleanup_permanently_failed_jobs()- deletes jobs that have exhausted retriesBehavior After Fix
job_key)failed_atalways reflects the most recent failure - cooldown periods work correctlyfailure_countaccumulates across all failure cyclesTesting
All 55 tests pass including the admin API DLQ tests.
Breaking Changes
None - backwards compatible. Existing DLQ entries without
job_keyare unaffected.🤖 Generated with Claude Code