From b6b55025b4b5c2918e7f305b7d864d5846301174 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <209825114+claude[bot]@users.noreply.github.com> Date: Tue, 19 Aug 2025 02:36:49 +0000 Subject: [PATCH] feat: add resilience to pool timeout in flow update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add retry logic using backon for critical sqlx queries in update_flow_status_after_job_completion_internal - Create retry_sqlx! macro for consistent retry behavior (3 attempts, 200ms delay) - Apply retry logic to: - Initial flow status fetch query (prevents function failure on pool timeout) - Flow jobs results retrieval query (ensures proper job results collection) - Improves workflow reliability by handling temporary database connectivity issues 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: windmill-internal-app[bot] --- backend/windmill-worker/src/worker_flow.rs | 101 +++++++++++++-------- 1 file changed, 63 insertions(+), 38 deletions(-) diff --git a/backend/windmill-worker/src/worker_flow.rs b/backend/windmill-worker/src/worker_flow.rs index 6abe8e57adcdc..420e1ec5e19a2 100644 --- a/backend/windmill-worker/src/worker_flow.rs +++ b/backend/windmill-worker/src/worker_flow.rs @@ -20,6 +20,7 @@ use crate::{ }; use anyhow::Context; +use backon::{BackoffBuilder, ConstantBuilder, Retryable}; use futures::TryFutureExt; use mappable_rc::Marc; use serde::{Deserialize, Serialize}; @@ -68,6 +69,26 @@ use windmill_queue::{ type DB = sqlx::Pool; +/// Retry sqlx operations with exponential backoff to handle pool timeouts +macro_rules! retry_sqlx { + ($operation:expr) => { + (|| async { $operation }) + .retry( + ConstantBuilder::default() + .with_delay(std::time::Duration::from_millis(200)) + .with_max_times(3) + .build(), + ) + .notify(|err, dur| { + tracing::warn!( + "Database operation failed, retrying in {dur:#?}, err: {err:#?}" + ); + }) + .sleep(tokio::time::sleep) + .await + }; +} + use windmill_audit::audit_oss::{audit_log, AuditAuthor}; use windmill_audit::ActionKind; use windmill_queue::{canceled_job_to_result, push}; @@ -257,35 +278,37 @@ pub async fn update_flow_status_after_job_completion_internal( ) = { // tracing::debug!("UPDATE FLOW STATUS: {flow:?} {success} {result:?} {w_id} {depth}"); - let (job_kind, script_hash, old_status, raw_flow) = sqlx::query!( - "SELECT - kind AS \"job_kind!: JobKind\", - runnable_id AS \"script_hash: ScriptHash\", - flow_status AS \"flow_status!: Json>\", - raw_flow AS \"raw_flow: Json>\" - FROM v2_job INNER JOIN v2_job_status ON v2_job.id = v2_job_status.id WHERE v2_job.id = $1 AND v2_job.workspace_id = $2 LIMIT 1", - flow, - w_id - ) - .fetch_one(db) - .await - .map_err(|e| { - Error::internal_err(format!( - "fetching flow status {flow} while reporting {success} {result:?}: {e:#}" - )) - }) - .and_then(|record| { - Ok(( - record.job_kind, - record.script_hash, - serde_json::from_str::(record.flow_status.0.get()).map_err(|e| { - Error::internal_err(format!( - "requiring current module to be parsable as FlowStatus: {e:?}" - )) - })?, - record.raw_flow, - )) - })?; + let (job_kind, script_hash, old_status, raw_flow) = retry_sqlx!({ + sqlx::query!( + "SELECT + kind AS \"job_kind!: JobKind\", + runnable_id AS \"script_hash: ScriptHash\", + flow_status AS \"flow_status!: Json>\", + raw_flow AS \"raw_flow: Json>\" + FROM v2_job INNER JOIN v2_job_status ON v2_job.id = v2_job_status.id WHERE v2_job.id = $1 AND v2_job.workspace_id = $2 LIMIT 1", + flow, + w_id + ) + .fetch_one(db) + .await + .map_err(|e| { + Error::internal_err(format!( + "fetching flow status {flow} while reporting {success} {result:?}: {e:#}" + )) + }) + .and_then(|record| { + Ok(( + record.job_kind, + record.script_hash, + serde_json::from_str::(record.flow_status.0.get()).map_err(|e| { + Error::internal_err(format!( + "requiring current module to be parsable as FlowStatus: {e:?}" + )) + })?, + record.raw_flow, + )) + }) + })?; let flow_data = cache::job::fetch_flow(db, &job_kind, script_hash) .or_else(|_| cache::job::fetch_preview_flow(db, &flow, raw_flow)) @@ -1442,15 +1465,17 @@ async fn retrieve_flow_jobs_results( w_id: &str, job_uuids: &Vec, ) -> error::Result> { - let results = sqlx::query!( - "SELECT result, id - FROM v2_job_completed - WHERE id = ANY($1) AND workspace_id = $2", - job_uuids.as_slice(), - w_id - ) - .fetch_all(db) - .await? + let results = retry_sqlx!({ + sqlx::query!( + "SELECT result, id + FROM v2_job_completed + WHERE id = ANY($1) AND workspace_id = $2", + job_uuids.as_slice(), + w_id + ) + .fetch_all(db) + .await + })? .into_iter() .map(|br| (br.id, br.result)) .collect::>();