Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions src/client/cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
//! Cleanup utilities for maintaining worker queue health
//!
//! Provides functions to clean up stale state that can accumulate when workers
//! crash or are forcibly terminated without graceful shutdown.

use std::time::Duration;

use super::BackfillClient;
use crate::BackfillError;

/// Default timeout for considering a queue lock stale.
///
/// Queue locks are held briefly during job selection (milliseconds), so any
/// lock older than this is almost certainly from a crashed worker.
pub const DEFAULT_STALE_LOCK_TIMEOUT: Duration = Duration::from_secs(300); // 5 minutes

impl BackfillClient {
/// Release stale queue locks that were left behind by crashed workers.
///
/// When a worker crashes or is killed without graceful shutdown, it may
/// leave queue locks behind. These stale locks prevent other workers
/// from processing jobs in the affected queues.
///
/// This function releases any queue locks older than the specified timeout.
///
/// # Arguments
/// * `timeout` - Locks older than this duration are considered stale
///
/// # Returns
/// Number of queue locks that were released
pub async fn release_stale_queue_locks(&self, timeout: Duration) -> Result<u64, BackfillError> {
let timeout_secs = timeout.as_secs();

let query = format!(
r#"
UPDATE {schema}._private_job_queues
SET locked_at = NULL, locked_by = NULL
WHERE locked_at IS NOT NULL
AND locked_at < NOW() - INTERVAL '{timeout_secs} seconds'
"#,
schema = self.schema,
timeout_secs = timeout_secs
);

let result = sqlx::query(&query).execute(&self.pool).await?;
let released = result.rows_affected();

if released > 0 {
log::info!(
"Released stale queue locks (count: {}, timeout_secs: {})",
released,
timeout_secs
);
}

Ok(released)
}

/// Delete permanently failed jobs from the main queue.
///
/// Jobs that have exhausted all retry attempts (attempts >= max_attempts)
/// remain in the main queue with `is_available = false`. These jobs
/// will never be processed again and should be cleaned up.
///
/// Note: These jobs should already be captured to the DLQ by the task
/// handler or DLQ processor before reaching this state. This function
/// removes the leftover rows from the main queue.
///
/// # Returns
/// Number of permanently failed jobs that were deleted
pub async fn cleanup_permanently_failed_jobs(&self) -> Result<u64, BackfillError> {
let query = format!(
r#"
DELETE FROM {schema}._private_jobs
WHERE attempts >= max_attempts
AND locked_at IS NULL
"#,
schema = self.schema
);

let result = sqlx::query(&query).execute(&self.pool).await?;
let deleted = result.rows_affected();

if deleted > 0 {
log::info!(
"Cleaned up permanently failed jobs from main queue (count: {})",
deleted
);
}

Ok(deleted)
}

/// Run all startup cleanup tasks.
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach.

///
/// This should be called when a worker starts to clean up any stale state
/// left behind by previous workers. It performs:
/// 1. Release stale queue locks (using default timeout)
/// 2. Delete permanently failed jobs from main queue
///
/// # Returns
/// Tuple of (stale_locks_released, failed_jobs_deleted)
pub async fn startup_cleanup(&self) -> Result<(u64, u64), BackfillError> {
log::info!("Running startup cleanup tasks");

let locks_released = self.release_stale_queue_locks(DEFAULT_STALE_LOCK_TIMEOUT).await?;
let jobs_deleted = self.cleanup_permanently_failed_jobs().await?;

log::info!(
"Startup cleanup completed (stale_locks_released: {}, failed_jobs_deleted: {})",
locks_released,
jobs_deleted
);

Ok((locks_released, jobs_deleted))
}
}
41 changes: 32 additions & 9 deletions src/client/dlq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ impl BackfillClient {
"CREATE INDEX IF NOT EXISTS idx_backfill_dlq_job_key ON {}.backfill_dlq (job_key) WHERE job_key IS NOT NULL",
self.schema
),
// Unique constraint on job_key for UPSERT support - prevents duplicate DLQ entries
// when a requeued job fails again. Only applies to non-NULL job_keys.
format!(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_backfill_dlq_job_key_unique ON {}.backfill_dlq (job_key) WHERE job_key IS NOT NULL",
self.schema
),
];

for index_query in indexes {
Expand Down Expand Up @@ -477,19 +483,30 @@ impl BackfillClient {
"default".to_string()
};

let insert_query = format!(
// Use UPSERT to handle the case where a requeued job fails again.
// If a DLQ entry with the same job_key already exists, update it
// instead of creating a duplicate. This ensures one DLQ entry per
// logical job and keeps failed_at current for cooldown calculations.
let upsert_query = format!(
r#"
INSERT INTO {}.backfill_dlq (
original_job_id, task_identifier, payload, queue_name, priority,
job_key, max_attempts, failure_reason, failure_count, last_error,
original_created_at, original_run_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
ON CONFLICT (job_key) WHERE job_key IS NOT NULL DO UPDATE SET
failed_at = NOW(),
failure_count = {schema}.backfill_dlq.failure_count + EXCLUDED.failure_count,
failure_reason = EXCLUDED.failure_reason,
last_error = EXCLUDED.last_error,
original_job_id = EXCLUDED.original_job_id
RETURNING *
"#,
self.schema
self.schema,
schema = self.schema
);

let row = sqlx::query(&insert_query)
let row = sqlx::query(&upsert_query)
.bind(original_job.id())
.bind(original_job.task_identifier())
.bind(original_job.payload())
Expand Down Expand Up @@ -588,21 +605,27 @@ impl BackfillClient {
// Convert last_error from TEXT to JSONB for DLQ table
let last_error_json = last_error.map(serde_json::Value::String);

// Move to DLQ
let insert_dlq_query = format!(
// Move to DLQ using UPSERT to handle requeued jobs that fail again
let upsert_dlq_query = format!(
r#"
INSERT INTO {}.backfill_dlq (
INSERT INTO {schema}.backfill_dlq (
original_job_id, task_identifier, payload, queue_name, priority,
job_key, max_attempts, failure_reason, failure_count, last_error,
original_created_at, original_run_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
ON CONFLICT (job_key) WHERE job_key IS NOT NULL DO UPDATE SET
failed_at = NOW(),
failure_count = {schema}.backfill_dlq.failure_count + EXCLUDED.failure_count,
failure_reason = EXCLUDED.failure_reason,
last_error = EXCLUDED.last_error,
original_job_id = EXCLUDED.original_job_id
"#,
self.schema
schema = self.schema
);

let failure_reason = format!("Job exceeded maximum retry attempts ({}/{})", attempts, max_attempts);

let insert_result = sqlx::query(&insert_dlq_query)
let upsert_result = sqlx::query(&upsert_dlq_query)
.bind(job_id)
.bind(&task_identifier)
.bind(&payload)
Expand All @@ -618,7 +641,7 @@ impl BackfillClient {
.execute(&self.pool)
.await;

match insert_result {
match upsert_result {
Ok(_) => {
// Successfully moved to DLQ, now remove from main jobs table
let delete_query = format!("DELETE FROM {}._private_jobs WHERE id = $1", self.schema);
Expand Down
2 changes: 2 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
//! The backfill client, split across a couple of files.

mod cleanup;
mod dlq;
mod enqueue;

pub use cleanup::DEFAULT_STALE_LOCK_TIMEOUT;
pub use dlq::*;

/// High-level client for the backfill job queue system.
Expand Down
5 changes: 5 additions & 0 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,11 @@ impl WorkerRunner {
self.config.dlq_processor_interval.is_some()
);

// Run startup cleanup to release stale locks and clean up failed jobs
if let Err(e) = self.client.startup_cleanup().await {
log::warn!("Startup cleanup failed (continuing anyway): {}", e);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correct thing to do on a failure here, yeah.

}

// Record worker starting (increment active worker count)
crate::metrics::update_worker_active("worker", 1);

Expand Down