Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backend: simplify insert in completed_job #4999

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

240 changes: 111 additions & 129 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,9 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
}

let _job_id = queued_job.id;
let (opt_uuid, _duration, _skip_downstream_error_handlers) = (|| async {
let (canceled_by, canceled_reason) =
canceled_by.map_or((None, None), |c| (c.username, c.reason));
let (opt_uuid, _duration, canceled, _skip_downstream_error_handlers) = (|| async {
let mut tx = db.begin().await?;

let job_id = queued_job.id;
Expand All @@ -531,133 +533,118 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
serde_json::to_string(&result).unwrap_or_else(|_| "".to_string())
);

let (raw_code, raw_lock, raw_flow) = if !*MIN_VERSION_IS_AT_LEAST_1_427.read().await {
sqlx::query!(
"SELECT raw_code, raw_lock, raw_flow AS \"raw_flow: Json<Box<JsonRawValue>>\"
FROM job WHERE id = $1 AND workspace_id = $2 LIMIT 1",
&job_id,
&queued_job.workspace_id
)
.fetch_one(db)
.map_ok(|record| (record.raw_code, record.raw_lock, record.raw_flow))
.or_else(|_| {
sqlx::query!(
"SELECT raw_code, raw_lock, raw_flow AS \"raw_flow: Json<Box<JsonRawValue>>\"
FROM queue WHERE id = $1 AND workspace_id = $2 LIMIT 1",
&job_id,
&queued_job.workspace_id
)
.fetch_one(db)
.map_ok(|record| (record.raw_code, record.raw_lock, record.raw_flow))
})
.await
.unwrap_or_default()
} else {
(None, None, None)
};

let mem_peak = mem_peak.max(queued_job.mem_peak.unwrap_or(0));
// add_time!(bench, "add_completed_job query START");

let _duration = sqlx::query_scalar!(
"INSERT INTO completed_job AS cj
( workspace_id
, id
, parent_job
, created_by
, created_at
, started_at
, duration_ms
, success
, script_hash
, script_path
, args
, result
, raw_code
, raw_lock
, canceled
, canceled_by
, canceled_reason
, job_kind
, schedule_path
, permissioned_as
, flow_status
, raw_flow
, is_flow_step
, is_skipped
, language
, email
, visible_to_owner
, mem_peak
, tag
, priority
)
VALUES ($1, $2, $3, $4, $5, COALESCE($6, now()), COALESCE($30::bigint, (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE($6, now()))))*1000), $7, $8, $9,\
$10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29)
ON CONFLICT (id) DO UPDATE SET success = $7, result = $11 RETURNING duration_ms",
queued_job.workspace_id,
queued_job.id,
queued_job.parent_job,
queued_job.created_by,
queued_job.created_at,
queued_job.started_at,
success,
queued_job.script_hash.map(|x| x.0),
queued_job.script_path,
&queued_job.args as &Option<Json<HashMap<String, Box<RawValue>>>>,
result as Json<&T>,
raw_code,
raw_lock,
canceled_by.is_some(),
canceled_by.clone().map(|cb| cb.username).flatten(),
canceled_by.clone().map(|cb| cb.reason).flatten(),
queued_job.job_kind.clone() as JobKind,
queued_job.schedule_path,
queued_job.permissioned_as,
&queued_job.flow_status as &Option<Json<Box<RawValue>>>,
&raw_flow as &Option<Json<Box<RawValue>>>,
queued_job.is_flow_step,
skipped,
queued_job.language.clone() as Option<ScriptLang>,
queued_job.email,
queued_job.visible_to_owner,
if mem_peak > 0 { Some(mem_peak) } else { None },
queued_job.tag,
queued_job.priority,
duration,
let (_duration, canceled) = sqlx::query!(
r#"
INSERT INTO completed_job
( workspace_id
, id
, parent_job
, created_by
, created_at
, started_at
, duration_ms
, success
, script_hash
, script_path
, args
, result
, raw_code
, raw_lock
, canceled
, canceled_by
, canceled_reason
, job_kind
, schedule_path
, permissioned_as
, flow_status
, raw_flow
, is_flow_step
, is_skipped
, language
, email
, visible_to_owner
, mem_peak
, tag
, priority
)
SELECT
queue.workspace_id
, queue.id
, queue.parent_job
, queue.created_by
, queue.created_at
, queue.started_at
, COALESCE($2::bigint, (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE(queue.started_at, now())))) * 1000)
, $3 AS success
, queue.script_hash
, queue.script_path
, queue.args
, $4 AS result
, queue.raw_code
, queue.raw_lock
, queue.canceled OR COALESCE($7, queue.canceled_by) IS NOT NULL as canceled
, COALESCE($7, queue.canceled_by) as canceled_by
, COALESCE($8, queue.canceled_reason) as canceled_reason
, queue.job_kind
, queue.schedule_path
, queue.permissioned_as
, queue.flow_status
, queue.raw_flow
, queue.is_flow_step
, $5 AS is_skipped
, queue.language
, queue.email
, queue.visible_to_owner
, GREATEST($6, queue.mem_peak) AS mem_peak
, queue.tag
, queue.priority
FROM queue
WHERE queue.id = $1
LIMIT 1
ON CONFLICT (id) DO UPDATE SET
success = $3,
result = $4
RETURNING duration_ms, canceled
"#,
/* $1 */ queued_job.id,
/* $2 */ duration,
/* $3 */ success,
/* $4 */ result as Json<&T>,
/* $5 */ skipped,
/* $6 */ if mem_peak > 0 { Some(mem_peak) } else { None },
/* $7 */ canceled_by,
/* $8 */ canceled_reason,
)
.fetch_one(&mut *tx)
.await
.map(|record| (record.duration_ms, record.canceled))
.map_err(|e| Error::InternalErr(format!("Could not add completed job {job_id}: {e:#}")))?;


// Hacky trick used by `workflow_as_code`
if !queued_job.is_flow_step {
if _duration > 500
&& (queued_job.job_kind == JobKind::Script
|| queued_job.job_kind == JobKind::Preview)
{
if let Err(e) = sqlx::query!(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rubenfiszel Removed this as after this change, completed_job.flow_status should have the same value as q.flow_status

"UPDATE completed_job SET flow_status = q.flow_status FROM queue q WHERE completed_job.id = $1 AND q.id = $1 AND q.workspace_id = $2 AND completed_job.workspace_id = $2 AND q.flow_status IS NOT NULL",
&queued_job.id,
&queued_job.workspace_id
)
.execute(&mut *tx)
.await {
tracing::error!("Could not update job duration: {}", e);
}
}
if let Some(parent_job) = queued_job.parent_job {
if let Err(e) = sqlx::query_scalar!(
"UPDATE queue SET flow_status = jsonb_set(jsonb_set(COALESCE(flow_status, '{}'::jsonb), array[$1], COALESCE(flow_status->$1, '{}'::jsonb)), array[$1, 'duration_ms'], to_jsonb($2::bigint)) WHERE id = $3 AND workspace_id = $4",
&queued_job.id.to_string(),
_duration,
parent_job,
&queued_job.workspace_id
)
.execute(&mut *tx)
.await {
tracing::error!("Could not update parent job flow_status: {}", e);
}
let _ = sqlx::query_scalar!(
r#"
UPDATE queue SET flow_status = jsonb_set(
jsonb_set(
COALESCE(flow_status, '{}'::jsonb),
array[$1],
COALESCE(flow_status->$1, '{}'::jsonb)
),
array[$1, 'duration_ms'],
to_jsonb($2::bigint)
) WHERE id = $3 AND workspace_id = $4
"#,
&queued_job.id.to_string(),
_duration,
parent_job,
&queued_job.workspace_id
)
.execute(&mut *tx)
.await
.inspect_err(|e| tracing::error!("Could not update parent job flow_status: {}", e));
}
}
// tracing::error!("Added completed job {:#?}", queued_job);
Expand Down Expand Up @@ -733,7 +720,7 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
match err {
Error::QuotaExceeded(_) => (),
// scheduling next job failed and could not disable schedule => make zombie job to retry
_ => return Ok((Some(job_id), 0, true)),
_ => return Ok((Some(job_id), 0, canceled, true)),
}
};
}
Expand Down Expand Up @@ -844,7 +831,7 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
"inserted completed job: {} (success: {success})",
queued_job.id
);
Ok((None, _duration, _skip_downstream_error_handlers)) as windmill_common::error::Result<(Option<Uuid>, i64, bool)>
Ok((None, _duration, canceled, _skip_downstream_error_handlers)) as error::Result<(Option<Uuid>, i64, bool, bool)>
})
.retry(
ConstantBuilder::default()
Expand Down Expand Up @@ -976,13 +963,8 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
);
}

if let Err(err) = send_error_to_workspace_handler(
&queued_job,
canceled_by.is_some(),
db,
Json(&result),
)
.await
if let Err(err) =
send_error_to_workspace_handler(&queued_job, canceled, db, Json(&result)).await
{
match err {
Error::QuotaExceeded(_) => {}
Expand All @@ -1001,7 +983,7 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
}
}

if !queued_job.is_flow_step && queued_job.job_kind == JobKind::Script && canceled_by.is_none() {
if !queued_job.is_flow_step && queued_job.job_kind == JobKind::Script && !canceled {
if let Some(hash) = queued_job.script_hash {
let p = sqlx::query_scalar!(
"SELECT restart_unless_cancelled FROM script WHERE hash = $1 AND workspace_id = $2",
Expand Down
Loading