diff --git a/src/client/cleanup.rs b/src/client/cleanup.rs new file mode 100644 index 0000000..01179d4 --- /dev/null +++ b/src/client/cleanup.rs @@ -0,0 +1,117 @@ +//! Cleanup utilities for maintaining worker queue health +//! +//! Provides functions to clean up stale state that can accumulate when workers +//! crash or are forcibly terminated without graceful shutdown. + +use std::time::Duration; + +use super::BackfillClient; +use crate::BackfillError; + +/// Default timeout for considering a queue lock stale. +/// +/// Queue locks are held briefly during job selection (milliseconds), so any +/// lock older than this is almost certainly from a crashed worker. +pub const DEFAULT_STALE_LOCK_TIMEOUT: Duration = Duration::from_secs(300); // 5 minutes + +impl BackfillClient { + /// Release stale queue locks that were left behind by crashed workers. + /// + /// When a worker crashes or is killed without graceful shutdown, it may + /// leave queue locks behind. These stale locks prevent other workers + /// from processing jobs in the affected queues. + /// + /// This function releases any queue locks older than the specified timeout. + /// + /// # Arguments + /// * `timeout` - Locks older than this duration are considered stale + /// + /// # Returns + /// Number of queue locks that were released + pub async fn release_stale_queue_locks(&self, timeout: Duration) -> Result { + let timeout_secs = timeout.as_secs(); + + let query = format!( + r#" + UPDATE {schema}._private_job_queues + SET locked_at = NULL, locked_by = NULL + WHERE locked_at IS NOT NULL + AND locked_at < NOW() - INTERVAL '{timeout_secs} seconds' + "#, + schema = self.schema, + timeout_secs = timeout_secs + ); + + let result = sqlx::query(&query).execute(&self.pool).await?; + let released = result.rows_affected(); + + if released > 0 { + log::info!( + "Released stale queue locks (count: {}, timeout_secs: {})", + released, + timeout_secs + ); + } + + Ok(released) + } + + /// Delete permanently failed jobs from the main queue. + /// + /// Jobs that have exhausted all retry attempts (attempts >= max_attempts) + /// remain in the main queue with `is_available = false`. These jobs + /// will never be processed again and should be cleaned up. + /// + /// Note: These jobs should already be captured to the DLQ by the task + /// handler or DLQ processor before reaching this state. This function + /// removes the leftover rows from the main queue. + /// + /// # Returns + /// Number of permanently failed jobs that were deleted + pub async fn cleanup_permanently_failed_jobs(&self) -> Result { + let query = format!( + r#" + DELETE FROM {schema}._private_jobs + WHERE attempts >= max_attempts + AND locked_at IS NULL + "#, + schema = self.schema + ); + + let result = sqlx::query(&query).execute(&self.pool).await?; + let deleted = result.rows_affected(); + + if deleted > 0 { + log::info!( + "Cleaned up permanently failed jobs from main queue (count: {})", + deleted + ); + } + + Ok(deleted) + } + + /// Run all startup cleanup tasks. + /// + /// This should be called when a worker starts to clean up any stale state + /// left behind by previous workers. It performs: + /// 1. Release stale queue locks (using default timeout) + /// 2. Delete permanently failed jobs from main queue + /// + /// # Returns + /// Tuple of (stale_locks_released, failed_jobs_deleted) + pub async fn startup_cleanup(&self) -> Result<(u64, u64), BackfillError> { + log::info!("Running startup cleanup tasks"); + + let locks_released = self.release_stale_queue_locks(DEFAULT_STALE_LOCK_TIMEOUT).await?; + let jobs_deleted = self.cleanup_permanently_failed_jobs().await?; + + log::info!( + "Startup cleanup completed (stale_locks_released: {}, failed_jobs_deleted: {})", + locks_released, + jobs_deleted + ); + + Ok((locks_released, jobs_deleted)) + } +} diff --git a/src/client/dlq.rs b/src/client/dlq.rs index ec5855d..8b9b5db 100644 --- a/src/client/dlq.rs +++ b/src/client/dlq.rs @@ -166,6 +166,12 @@ impl BackfillClient { "CREATE INDEX IF NOT EXISTS idx_backfill_dlq_job_key ON {}.backfill_dlq (job_key) WHERE job_key IS NOT NULL", self.schema ), + // Unique constraint on job_key for UPSERT support - prevents duplicate DLQ entries + // when a requeued job fails again. Only applies to non-NULL job_keys. + format!( + "CREATE UNIQUE INDEX IF NOT EXISTS idx_backfill_dlq_job_key_unique ON {}.backfill_dlq (job_key) WHERE job_key IS NOT NULL", + self.schema + ), ]; for index_query in indexes { @@ -477,19 +483,30 @@ impl BackfillClient { "default".to_string() }; - let insert_query = format!( + // Use UPSERT to handle the case where a requeued job fails again. + // If a DLQ entry with the same job_key already exists, update it + // instead of creating a duplicate. This ensures one DLQ entry per + // logical job and keeps failed_at current for cooldown calculations. + let upsert_query = format!( r#" INSERT INTO {}.backfill_dlq ( original_job_id, task_identifier, payload, queue_name, priority, job_key, max_attempts, failure_reason, failure_count, last_error, original_created_at, original_run_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + ON CONFLICT (job_key) WHERE job_key IS NOT NULL DO UPDATE SET + failed_at = NOW(), + failure_count = {schema}.backfill_dlq.failure_count + EXCLUDED.failure_count, + failure_reason = EXCLUDED.failure_reason, + last_error = EXCLUDED.last_error, + original_job_id = EXCLUDED.original_job_id RETURNING * "#, - self.schema + self.schema, + schema = self.schema ); - let row = sqlx::query(&insert_query) + let row = sqlx::query(&upsert_query) .bind(original_job.id()) .bind(original_job.task_identifier()) .bind(original_job.payload()) @@ -588,21 +605,27 @@ impl BackfillClient { // Convert last_error from TEXT to JSONB for DLQ table let last_error_json = last_error.map(serde_json::Value::String); - // Move to DLQ - let insert_dlq_query = format!( + // Move to DLQ using UPSERT to handle requeued jobs that fail again + let upsert_dlq_query = format!( r#" - INSERT INTO {}.backfill_dlq ( + INSERT INTO {schema}.backfill_dlq ( original_job_id, task_identifier, payload, queue_name, priority, job_key, max_attempts, failure_reason, failure_count, last_error, original_created_at, original_run_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + ON CONFLICT (job_key) WHERE job_key IS NOT NULL DO UPDATE SET + failed_at = NOW(), + failure_count = {schema}.backfill_dlq.failure_count + EXCLUDED.failure_count, + failure_reason = EXCLUDED.failure_reason, + last_error = EXCLUDED.last_error, + original_job_id = EXCLUDED.original_job_id "#, - self.schema + schema = self.schema ); let failure_reason = format!("Job exceeded maximum retry attempts ({}/{})", attempts, max_attempts); - let insert_result = sqlx::query(&insert_dlq_query) + let upsert_result = sqlx::query(&upsert_dlq_query) .bind(job_id) .bind(&task_identifier) .bind(&payload) @@ -618,7 +641,7 @@ impl BackfillClient { .execute(&self.pool) .await; - match insert_result { + match upsert_result { Ok(_) => { // Successfully moved to DLQ, now remove from main jobs table let delete_query = format!("DELETE FROM {}._private_jobs WHERE id = $1", self.schema); diff --git a/src/client/mod.rs b/src/client/mod.rs index 0f96b92..06955e8 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,8 +1,10 @@ //! The backfill client, split across a couple of files. +mod cleanup; mod dlq; mod enqueue; +pub use cleanup::DEFAULT_STALE_LOCK_TIMEOUT; pub use dlq::*; /// High-level client for the backfill job queue system. diff --git a/src/worker.rs b/src/worker.rs index 10d905c..7188cfc 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -554,6 +554,11 @@ impl WorkerRunner { self.config.dlq_processor_interval.is_some() ); + // Run startup cleanup to release stale locks and clean up failed jobs + if let Err(e) = self.client.startup_cleanup().await { + log::warn!("Startup cleanup failed (continuing anyway): {}", e); + } + // Record worker starting (increment active worker count) crate::metrics::update_worker_active("worker", 1);