Skip to content

Commit

Permalink
nit: generalize usage of has_failure_module (#4706)
Browse files Browse the repository at this point in the history
  • Loading branch information
uael authored Nov 14, 2024
1 parent ca8020c commit 2d57269
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 35 deletions.
27 changes: 27 additions & 0 deletions backend/windmill-common/src/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use rand::Rng;
use serde::{Deserialize, Serialize, Serializer};

use crate::{
error::Error,
more_serde::{default_empty_string, default_id, default_null, default_true, is_default},
scripts::{Schema, ScriptHash, ScriptLang},
};
Expand Down Expand Up @@ -651,3 +652,29 @@ pub fn add_virtual_items_if_necessary(modules: &mut Vec<FlowModule>) {
});
}
}

pub async fn has_failure_module<'c>(flow: sqlx::types::Uuid, db: &sqlx::Pool<sqlx::Postgres>, completed: bool) -> Result<bool, Error> {
if completed {
sqlx::query_scalar!(
"SELECT raw_flow->'failure_module' != 'null'::jsonb
FROM completed_job
WHERE id = $1",
flow
)
} else {
sqlx::query_scalar!(
"SELECT raw_flow->'failure_module' != 'null'::jsonb
FROM queue
WHERE id = $1",
flow
)
}
.fetch_one(db)
.await
.map_err(|e| {
Error::InternalErr(format!(
"error during retrieval of has_failure_module: {e:#}"
))
})
.map(|v| v.unwrap_or(false))
}
20 changes: 4 additions & 16 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ use windmill_common::BASE_URL;
#[cfg(feature = "cloud")]
use windmill_common::users::SUPERADMIN_SYNC_EMAIL;

#[cfg(feature = "enterprise")]
use windmill_common::flows::has_failure_module;

#[cfg(feature = "enterprise")]
use windmill_common::worker::CLOUD_HOSTED;

Expand Down Expand Up @@ -482,13 +485,6 @@ where
}
}

#[cfg(feature = "enterprise")]
#[derive(Deserialize)]
struct RawFlowFailureModule {
#[cfg(feature = "enterprise")]
failure_module: Option<Box<RawValue>>,
}

#[instrument(level = "trace", skip_all)]
pub async fn add_completed_job_error<R: rsmq_async::RsmqConnection + Clone + Send>(
db: &Pool<Postgres>,
Expand Down Expand Up @@ -952,15 +948,7 @@ pub async fn add_completed_job<
} else if !skip_downstream_error_handlers
&& (matches!(queued_job.job_kind, JobKind::Script)
|| matches!(queued_job.job_kind, JobKind::Flow)
&& queued_job
.raw_flow
.as_ref()
.and_then(|v| {
serde_json::from_str::<RawFlowFailureModule>((**v).get())
.ok()
.and_then(|v| v.failure_module)
})
.is_none())
&& !has_failure_module(job_id, db, true).await.unwrap_or(false))
&& queued_job.parent_job.is_none()
{
let result = serde_json::from_str(
Expand Down
21 changes: 2 additions & 19 deletions backend/windmill-worker/src/worker_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use windmill_common::{
Approval, BranchAllStatus, BranchChosen, FlowStatus, FlowStatusModule, RetryStatus,
MAX_RETRY_ATTEMPTS, MAX_RETRY_INTERVAL,
},
flows::{FlowModule, FlowModuleValue, FlowValue, InputTransform, Retry, Suspend},
flows::{has_failure_module, FlowModule, FlowModuleValue, FlowValue, InputTransform, Retry, Suspend},
};
use windmill_queue::schedule::get_schedule_opt;
use windmill_queue::{
Expand Down Expand Up @@ -963,7 +963,7 @@ pub async fn update_flow_status_after_job_completion_internal<
false
if !is_failure_step
&& !skip_error_handler
&& has_failure_module(flow, db).await? =>
&& has_failure_module(flow, db, false).await? =>
{
true
}
Expand Down Expand Up @@ -1290,23 +1290,6 @@ async fn compute_skip_branchall_failure<'c>(
}))
}

async fn has_failure_module<'c>(flow: Uuid, db: &DB) -> Result<bool, Error> {
sqlx::query_scalar::<_, Option<bool>>(
"SELECT raw_flow->'failure_module' != 'null'::jsonb
FROM queue
WHERE id = $1",
)
.bind(flow)
.fetch_one(db)
.await
.map_err(|e| {
Error::InternalErr(format!(
"error during retrieval of has_failure_module: {e:#}"
))
})
.map(|v| v.unwrap_or(false))
}

// async fn retrieve_cleanup_module<'c>(flow_uuid: Uuid, db: &DB) -> Result<FlowCleanupModule, Error> {
// tracing::warn!("Retrieving cleanup module of flow {}", flow_uuid);
// let raw_value = sqlx::query_scalar!(
Expand Down

0 comments on commit 2d57269

Please sign in to comment.