From b2ebb6d689752fdb7313f527adc5db36db321fc7 Mon Sep 17 00:00:00 2001 From: Abel Lucas Date: Tue, 31 Dec 2024 14:30:29 +0100 Subject: [PATCH] backend: simplify `insert` in `completed_job` --- ...9f959830ee9c560214c057bf497567c3858d2.json | 17 ++ ...8ff78bbfa61d49ba670e15f43b0074c94e863.json | 35 +++ backend/windmill-queue/src/jobs.rs | 240 ++++++++---------- 3 files changed, 163 insertions(+), 129 deletions(-) create mode 100644 backend/.sqlx/query-6634d2c08dc36fa2cf93558b0ba9f959830ee9c560214c057bf497567c3858d2.json create mode 100644 backend/.sqlx/query-e0b51a22f3843ef3297b639c8b58ff78bbfa61d49ba670e15f43b0074c94e863.json diff --git a/backend/.sqlx/query-6634d2c08dc36fa2cf93558b0ba9f959830ee9c560214c057bf497567c3858d2.json b/backend/.sqlx/query-6634d2c08dc36fa2cf93558b0ba9f959830ee9c560214c057bf497567c3858d2.json new file mode 100644 index 0000000000000..f6a3e3ab5afaf --- /dev/null +++ b/backend/.sqlx/query-6634d2c08dc36fa2cf93558b0ba9f959830ee9c560214c057bf497567c3858d2.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE queue SET flow_status = jsonb_set(\n jsonb_set(\n COALESCE(flow_status, '{}'::jsonb),\n array[$1],\n COALESCE(flow_status->$1, '{}'::jsonb)\n ),\n array[$1, 'duration_ms'],\n to_jsonb($2::bigint)\n ) WHERE id = $3 AND workspace_id = $4\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Int8", + "Uuid", + "Text" + ] + }, + "nullable": [] + }, + "hash": "6634d2c08dc36fa2cf93558b0ba9f959830ee9c560214c057bf497567c3858d2" +} diff --git a/backend/.sqlx/query-e0b51a22f3843ef3297b639c8b58ff78bbfa61d49ba670e15f43b0074c94e863.json b/backend/.sqlx/query-e0b51a22f3843ef3297b639c8b58ff78bbfa61d49ba670e15f43b0074c94e863.json new file mode 100644 index 0000000000000..c3b1340ff9789 --- /dev/null +++ b/backend/.sqlx/query-e0b51a22f3843ef3297b639c8b58ff78bbfa61d49ba670e15f43b0074c94e863.json @@ -0,0 +1,35 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO completed_job\n ( workspace_id\n , id\n , parent_job\n , created_by\n , created_at\n , started_at\n , duration_ms\n , success\n , script_hash\n , script_path\n , args\n , result\n , raw_code\n , raw_lock\n , canceled\n , canceled_by\n , canceled_reason\n , job_kind\n , schedule_path\n , permissioned_as\n , flow_status\n , raw_flow\n , is_flow_step\n , is_skipped\n , language\n , email\n , visible_to_owner\n , mem_peak\n , tag\n , priority\n )\n SELECT \n queue.workspace_id\n , queue.id\n , queue.parent_job\n , queue.created_by\n , queue.created_at\n , queue.started_at\n , COALESCE($2::bigint, (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE(queue.started_at, now())))) * 1000)\n , $3 AS success\n , queue.script_hash\n , queue.script_path\n , queue.args\n , $4 AS result\n , queue.raw_code\n , queue.raw_lock\n , queue.canceled OR COALESCE($7, queue.canceled_by) IS NOT NULL as canceled\n , COALESCE($7, queue.canceled_by) as canceled_by\n , COALESCE($8, queue.canceled_reason) as canceled_reason\n , queue.job_kind\n , queue.schedule_path\n , queue.permissioned_as\n , queue.flow_status\n , queue.raw_flow\n , queue.is_flow_step\n , $5 AS is_skipped\n , queue.language\n , queue.email\n , queue.visible_to_owner\n , GREATEST($6, queue.mem_peak) AS mem_peak\n , queue.tag\n , queue.priority\n FROM queue\n WHERE queue.id = $1\n LIMIT 1\n ON CONFLICT (id) DO UPDATE SET \n success = $3, \n result = $4\n RETURNING duration_ms, canceled\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "duration_ms", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "canceled", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Int8", + "Bool", + "Jsonb", + "Bool", + "Int4", + "Varchar", + "Text" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "e0b51a22f3843ef3297b639c8b58ff78bbfa61d49ba670e15f43b0074c94e863" +} diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index a99f9a3fcfae5..5d274872fcff6 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -519,7 +519,9 @@ pub async fn add_completed_job( } let _job_id = queued_job.id; - let (opt_uuid, _duration, _skip_downstream_error_handlers) = (|| async { + let (canceled_by, canceled_reason) = + canceled_by.map_or((None, None), |c| (c.username, c.reason)); + let (opt_uuid, _duration, canceled, _skip_downstream_error_handlers) = (|| async { let mut tx = db.begin().await?; let job_id = queued_job.id; @@ -531,133 +533,118 @@ pub async fn add_completed_job( serde_json::to_string(&result).unwrap_or_else(|_| "".to_string()) ); - let (raw_code, raw_lock, raw_flow) = if !*MIN_VERSION_IS_AT_LEAST_1_427.read().await { - sqlx::query!( - "SELECT raw_code, raw_lock, raw_flow AS \"raw_flow: Json>\" - FROM job WHERE id = $1 AND workspace_id = $2 LIMIT 1", - &job_id, - &queued_job.workspace_id - ) - .fetch_one(db) - .map_ok(|record| (record.raw_code, record.raw_lock, record.raw_flow)) - .or_else(|_| { - sqlx::query!( - "SELECT raw_code, raw_lock, raw_flow AS \"raw_flow: Json>\" - FROM queue WHERE id = $1 AND workspace_id = $2 LIMIT 1", - &job_id, - &queued_job.workspace_id - ) - .fetch_one(db) - .map_ok(|record| (record.raw_code, record.raw_lock, record.raw_flow)) - }) - .await - .unwrap_or_default() - } else { - (None, None, None) - }; - - let mem_peak = mem_peak.max(queued_job.mem_peak.unwrap_or(0)); // add_time!(bench, "add_completed_job query START"); - let _duration = sqlx::query_scalar!( - "INSERT INTO completed_job AS cj - ( workspace_id - , id - , parent_job - , created_by - , created_at - , started_at - , duration_ms - , success - , script_hash - , script_path - , args - , result - , raw_code - , raw_lock - , canceled - , canceled_by - , canceled_reason - , job_kind - , schedule_path - , permissioned_as - , flow_status - , raw_flow - , is_flow_step - , is_skipped - , language - , email - , visible_to_owner - , mem_peak - , tag - , priority - ) - VALUES ($1, $2, $3, $4, $5, COALESCE($6, now()), COALESCE($30::bigint, (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE($6, now()))))*1000), $7, $8, $9,\ - $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29) - ON CONFLICT (id) DO UPDATE SET success = $7, result = $11 RETURNING duration_ms", - queued_job.workspace_id, - queued_job.id, - queued_job.parent_job, - queued_job.created_by, - queued_job.created_at, - queued_job.started_at, - success, - queued_job.script_hash.map(|x| x.0), - queued_job.script_path, - &queued_job.args as &Option>>>, - result as Json<&T>, - raw_code, - raw_lock, - canceled_by.is_some(), - canceled_by.clone().map(|cb| cb.username).flatten(), - canceled_by.clone().map(|cb| cb.reason).flatten(), - queued_job.job_kind.clone() as JobKind, - queued_job.schedule_path, - queued_job.permissioned_as, - &queued_job.flow_status as &Option>>, - &raw_flow as &Option>>, - queued_job.is_flow_step, - skipped, - queued_job.language.clone() as Option, - queued_job.email, - queued_job.visible_to_owner, - if mem_peak > 0 { Some(mem_peak) } else { None }, - queued_job.tag, - queued_job.priority, - duration, + let (_duration, canceled) = sqlx::query!( + r#" + INSERT INTO completed_job + ( workspace_id + , id + , parent_job + , created_by + , created_at + , started_at + , duration_ms + , success + , script_hash + , script_path + , args + , result + , raw_code + , raw_lock + , canceled + , canceled_by + , canceled_reason + , job_kind + , schedule_path + , permissioned_as + , flow_status + , raw_flow + , is_flow_step + , is_skipped + , language + , email + , visible_to_owner + , mem_peak + , tag + , priority + ) + SELECT + queue.workspace_id + , queue.id + , queue.parent_job + , queue.created_by + , queue.created_at + , queue.started_at + , COALESCE($2::bigint, (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE(queue.started_at, now())))) * 1000) + , $3 AS success + , queue.script_hash + , queue.script_path + , queue.args + , $4 AS result + , queue.raw_code + , queue.raw_lock + , queue.canceled OR COALESCE($7, queue.canceled_by) IS NOT NULL as canceled + , COALESCE($7, queue.canceled_by) as canceled_by + , COALESCE($8, queue.canceled_reason) as canceled_reason + , queue.job_kind + , queue.schedule_path + , queue.permissioned_as + , queue.flow_status + , queue.raw_flow + , queue.is_flow_step + , $5 AS is_skipped + , queue.language + , queue.email + , queue.visible_to_owner + , GREATEST($6, queue.mem_peak) AS mem_peak + , queue.tag + , queue.priority + FROM queue + WHERE queue.id = $1 + LIMIT 1 + ON CONFLICT (id) DO UPDATE SET + success = $3, + result = $4 + RETURNING duration_ms, canceled + "#, + /* $1 */ queued_job.id, + /* $2 */ duration, + /* $3 */ success, + /* $4 */ result as Json<&T>, + /* $5 */ skipped, + /* $6 */ if mem_peak > 0 { Some(mem_peak) } else { None }, + /* $7 */ canceled_by, + /* $8 */ canceled_reason, ) .fetch_one(&mut *tx) .await + .map(|record| (record.duration_ms, record.canceled)) .map_err(|e| Error::InternalErr(format!("Could not add completed job {job_id}: {e:#}")))?; - + // Hacky trick used by `workflow_as_code` if !queued_job.is_flow_step { - if _duration > 500 - && (queued_job.job_kind == JobKind::Script - || queued_job.job_kind == JobKind::Preview) - { - if let Err(e) = sqlx::query!( - "UPDATE completed_job SET flow_status = q.flow_status FROM queue q WHERE completed_job.id = $1 AND q.id = $1 AND q.workspace_id = $2 AND completed_job.workspace_id = $2 AND q.flow_status IS NOT NULL", - &queued_job.id, - &queued_job.workspace_id - ) - .execute(&mut *tx) - .await { - tracing::error!("Could not update job duration: {}", e); - } - } if let Some(parent_job) = queued_job.parent_job { - if let Err(e) = sqlx::query_scalar!( - "UPDATE queue SET flow_status = jsonb_set(jsonb_set(COALESCE(flow_status, '{}'::jsonb), array[$1], COALESCE(flow_status->$1, '{}'::jsonb)), array[$1, 'duration_ms'], to_jsonb($2::bigint)) WHERE id = $3 AND workspace_id = $4", - &queued_job.id.to_string(), - _duration, - parent_job, - &queued_job.workspace_id - ) - .execute(&mut *tx) - .await { - tracing::error!("Could not update parent job flow_status: {}", e); - } + let _ = sqlx::query_scalar!( + r#" + UPDATE queue SET flow_status = jsonb_set( + jsonb_set( + COALESCE(flow_status, '{}'::jsonb), + array[$1], + COALESCE(flow_status->$1, '{}'::jsonb) + ), + array[$1, 'duration_ms'], + to_jsonb($2::bigint) + ) WHERE id = $3 AND workspace_id = $4 + "#, + &queued_job.id.to_string(), + _duration, + parent_job, + &queued_job.workspace_id + ) + .execute(&mut *tx) + .await + .inspect_err(|e| tracing::error!("Could not update parent job flow_status: {}", e)); } } // tracing::error!("Added completed job {:#?}", queued_job); @@ -733,7 +720,7 @@ pub async fn add_completed_job( match err { Error::QuotaExceeded(_) => (), // scheduling next job failed and could not disable schedule => make zombie job to retry - _ => return Ok((Some(job_id), 0, true)), + _ => return Ok((Some(job_id), 0, canceled, true)), } }; } @@ -844,7 +831,7 @@ pub async fn add_completed_job( "inserted completed job: {} (success: {success})", queued_job.id ); - Ok((None, _duration, _skip_downstream_error_handlers)) as windmill_common::error::Result<(Option, i64, bool)> + Ok((None, _duration, canceled, _skip_downstream_error_handlers)) as error::Result<(Option, i64, bool, bool)> }) .retry( ConstantBuilder::default() @@ -976,13 +963,8 @@ pub async fn add_completed_job( ); } - if let Err(err) = send_error_to_workspace_handler( - &queued_job, - canceled_by.is_some(), - db, - Json(&result), - ) - .await + if let Err(err) = + send_error_to_workspace_handler(&queued_job, canceled, db, Json(&result)).await { match err { Error::QuotaExceeded(_) => {} @@ -1001,7 +983,7 @@ pub async fn add_completed_job( } } - if !queued_job.is_flow_step && queued_job.job_kind == JobKind::Script && canceled_by.is_none() { + if !queued_job.is_flow_step && queued_job.job_kind == JobKind::Script && !canceled { if let Some(hash) = queued_job.script_hash { let p = sqlx::query_scalar!( "SELECT restart_unless_cancelled FROM script WHERE hash = $1 AND workspace_id = $2",