Skip to content

Commit

Permalink
backend: check queries to queue and completed_job where possible (v2 …
Browse files Browse the repository at this point in the history
…phase 2)
  • Loading branch information
uael committed Feb 4, 2025
1 parent 013d112 commit 59dc597
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 353 deletions.
73 changes: 39 additions & 34 deletions backend/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use tokio::{
join,
sync::{mpsc, RwLock},
};
use uuid::Uuid;

#[cfg(feature = "embedding")]
use windmill_api::embeddings::update_embeddings_db;
Expand All @@ -36,7 +37,7 @@ use windmill_common::{
auth::JWT_SECRET,
ee::CriticalErrorChannel,
error,
flow_status::FlowStatusModule,
flow_status::{FlowStatus, FlowStatusModule},
global_settings::{
BASE_URL_SETTING, BUNFIG_INSTALL_SCOPES_SETTING, CRITICAL_ALERT_MUTE_UI_SETTING,
CRITICAL_ERROR_CHANNELS_SETTING, DEFAULT_TAGS_PER_WORKSPACE_SETTING,
Expand Down Expand Up @@ -1637,20 +1638,28 @@ async fn handle_zombie_jobs(db: &Pool<Postgres>, base_internal_url: &str, worker
}

async fn handle_zombie_flows(db: &DB) -> error::Result<()> {
let flows = sqlx::query_as::<_, QueuedJob>(
let flows = sqlx::query!(
r#"
SELECT *
SELECT
id AS "id!", workspace_id AS "workspace_id!", parent_job, is_flow_step,
flow_status AS "flow_status: Box<str>", last_ping, same_worker
FROM queue
WHERE running = true AND suspend = 0 AND suspend_until IS null AND scheduled_for <= now() AND (job_kind = 'flow' OR job_kind = 'flowpreview' OR job_kind = 'flownode')
AND last_ping IS NOT NULL AND last_ping < NOW() - ($1 || ' seconds')::interval AND canceled = false
WHERE running = true AND suspend = 0 AND suspend_until IS null AND scheduled_for <= now()
AND (job_kind = 'flow' OR job_kind = 'flowpreview' OR job_kind = 'flownode')
AND last_ping IS NOT NULL AND last_ping < NOW() - ($1 || ' seconds')::interval
AND canceled = false
"#,
).bind(FLOW_ZOMBIE_TRANSITION_TIMEOUT.as_str())
FLOW_ZOMBIE_TRANSITION_TIMEOUT.as_str()
)
.fetch_all(db)
.await?;

for flow in flows {
let status = flow.parse_flow_status();
if !flow.same_worker
let status = flow
.flow_status
.as_deref()
.and_then(|x| serde_json::from_str::<FlowStatus>(x).ok());
if !flow.same_worker.unwrap_or(false)
&& status.is_some_and(|s| {
s.modules
.get(0)
Expand Down Expand Up @@ -1695,44 +1704,39 @@ async fn handle_zombie_flows(db: &DB) -> error::Result<()> {
let now = now_from_db(db).await?;
let reason = format!(
"{} was hanging in between 2 steps. Last ping: {last_ping:?} (now: {now})",
if flow.is_flow_step && flow.parent_job.is_some() {
if flow.is_flow_step.unwrap_or(false) && flow.parent_job.is_some() {
format!("Flow was cancelled because subflow {id}")
} else {
format!("Flow {id} was cancelled because it")
}
);
report_critical_error(reason.clone(), db.clone(), Some(&flow.workspace_id), None).await;
cancel_zombie_flow_job(db, flow, reason).await?;
cancel_zombie_flow_job(db, flow.id, &flow.workspace_id, reason).await?;
}
}

let flows2 = sqlx::query!(
"
DELETE
FROM parallel_monitor_lock
WHERE last_ping IS NOT NULL AND last_ping < NOW() - ($1 || ' seconds')::interval
RETURNING parent_flow_id, job_id, last_ping
",
r#"
DELETE
FROM parallel_monitor_lock
WHERE last_ping IS NOT NULL AND last_ping < NOW() - ($1 || ' seconds')::interval
RETURNING parent_flow_id, job_id, last_ping, (SELECT workspace_id FROM queue q
WHERE q.id = parent_flow_id AND q.running = true AND q.canceled = false) AS workspace_id
"#,
FLOW_ZOMBIE_TRANSITION_TIMEOUT.as_str()
)
.fetch_all(db)
.await?;

for flow in flows2 {
let in_queue = sqlx::query_as::<_, QueuedJob>(
"SELECT * FROM queue WHERE id = $1 AND running = true AND canceled = false",
)
.bind(flow.parent_flow_id)
.fetch_optional(db)
.await?;
if let Some(job) = in_queue {
if let Some(parent_flow_workspace_id) = flow.workspace_id {
tracing::error!(
"parallel Zombie flow detected: {} in workspace {}. Last ping was: {:?}.",
job.id,
job.workspace_id,
flow.parent_flow_id,
parent_flow_workspace_id,
flow.last_ping
);
cancel_zombie_flow_job(db, job,
cancel_zombie_flow_job(db, flow.parent_flow_id, &parent_flow_workspace_id,
format!("Flow {} cancelled as one of the parallel branch {} was unable to make the last transition ", flow.parent_flow_id, flow.job_id))
.await?;
} else {
Expand All @@ -1744,27 +1748,28 @@ async fn handle_zombie_flows(db: &DB) -> error::Result<()> {

async fn cancel_zombie_flow_job(
db: &Pool<Postgres>,
flow: QueuedJob,
id: Uuid,
workspace_id: &str,
message: String,
) -> Result<(), error::Error> {
let tx = db.begin().await.unwrap();
let mut tx = db.begin().await?;
tracing::error!(
"zombie flow detected: {} in workspace {}. Cancelling it.",
flow.id,
flow.workspace_id
id,
workspace_id
);
let (ntx, _) = cancel_job(
(tx, _) = cancel_job(
"monitor",
Some(message),
flow.id,
flow.workspace_id.as_str(),
id,
workspace_id,
tx,
db,
true,
false,
)
.await?;
ntx.commit().await?;
tx.commit().await?;
Ok(())
}

Expand Down
12 changes: 5 additions & 7 deletions backend/tests/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ use tokio::time::{timeout, Duration};

use windmill_api_client::types::{CreateFlowBody, RawScript};

use sqlx::query;

#[cfg(feature = "enterprise")]
use windmill_api_client::types::{EditSchedule, NewSchedule, ScriptArgs};

Expand Down Expand Up @@ -3206,7 +3204,7 @@ async fn test_script_schedule_handlers(db: Pool<Postgres>) {
let uuid = uuid.unwrap().unwrap();

let completed_job =
query!("SELECT script_path FROM completed_job WHERE id = $1", uuid)
sqlx::query!("SELECT script_path FROM completed_job WHERE id = $1", uuid)
.fetch_one(&db2)
.await
.unwrap();
Expand Down Expand Up @@ -3274,7 +3272,7 @@ async fn test_script_schedule_handlers(db: Pool<Postgres>) {
let uuid = uuid.unwrap().unwrap();

let completed_job =
query!("SELECT script_path FROM completed_job WHERE id = $1", uuid)
sqlx::query!("SELECT script_path FROM completed_job WHERE id = $1", uuid)
.fetch_one(&db2)
.await
.unwrap();
Expand Down Expand Up @@ -3358,7 +3356,7 @@ async fn test_flow_schedule_handlers(db: Pool<Postgres>) {
let uuid = uuid.unwrap().unwrap();

let completed_job =
query!("SELECT script_path FROM completed_job WHERE id = $1", uuid)
sqlx::query!("SELECT script_path FROM completed_job WHERE id = $1", uuid)
.fetch_one(&db2)
.await
.unwrap();
Expand Down Expand Up @@ -3427,7 +3425,7 @@ async fn test_flow_schedule_handlers(db: Pool<Postgres>) {
let uuid = uuid.unwrap().unwrap();

let completed_job =
query!("SELECT script_path FROM completed_job WHERE id = $1", uuid)
sqlx::query!("SELECT script_path FROM completed_job WHERE id = $1", uuid)
.fetch_one(&db2)
.await
.unwrap();
Expand Down Expand Up @@ -3502,7 +3500,7 @@ async fn run_deployed_relative_imports(
async move {
completed.next().await; // deployed script

let script = query!(
let script = sqlx::query!(
"SELECT hash FROM script WHERE path = $1",
"f/system/test_import".to_string()
)
Expand Down
25 changes: 15 additions & 10 deletions backend/windmill-api/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,16 @@ impl AuthCache {
}
}
_ => {
let user_o = sqlx::query_as::<_, (Option<String>, Option<String>, bool, Option<Vec<String>>, Option<String>)>(
"UPDATE token SET last_used_at = now() WHERE token = $1 AND (expiration > NOW() \
OR expiration IS NULL) AND (workspace_id IS NULL OR workspace_id = $2) RETURNING owner, email, super_admin, scopes, label",
let user_o = sqlx::query!(
"UPDATE token SET last_used_at = now() WHERE
token = $1
AND (expiration > NOW() OR expiration IS NULL)
AND (workspace_id IS NULL OR workspace_id = $2)
RETURNING owner, email, super_admin, scopes, label",
token,
w_id.as_ref(),
)
.bind(token)
.bind(w_id.as_ref())
.map(|x| (x.owner, x.email, x.super_admin, x.scopes, x.label))
.fetch_optional(&self.db)
.await
.ok()
Expand Down Expand Up @@ -251,12 +255,13 @@ impl AuthCache {
(_, Some(email), super_admin, scopes, label) => {
let username_override = username_override_from_label(label);
if w_id.is_some() {
let row_o = sqlx::query_as::<_, (String, bool, bool)>(
"SELECT username, is_admin, operator FROM usr where email = $1 AND \
workspace_id = $2 AND disabled = false",
let row_o = sqlx::query!(
"SELECT username, is_admin, operator FROM usr WHERE
email = $1 AND workspace_id = $2 AND disabled = false",
&email,
w_id.as_ref().unwrap()
)
.bind(&email)
.bind(&w_id.as_ref().unwrap())
.map(|x| (x.username, x.is_admin, x.operator))
.fetch_optional(&self.db)
.await
.unwrap_or(Some(("error".to_string(), false, false)));
Expand Down
Loading

0 comments on commit 59dc597

Please sign in to comment.