Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

227 changes: 9 additions & 218 deletions engine/artifacts/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions engine/packages/api-types/src/runner_configs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ pub struct RunnerConfigResponse {
#[serde(flatten)]
pub config: rivet_types::runner_configs::RunnerConfig,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schema(value_type = Option<Object>, additional_properties = true)]
pub runner_pool_error: Option<rivet_types::actor::RunnerPoolError>,
}
6 changes: 2 additions & 4 deletions engine/packages/gasoline/src/builder/workflow/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
builder::BuilderError,
ctx::WorkflowCtx,
error::WorkflowError,
history::{cursor::HistoryResult, event::EventType, removed::Signal as RemovedSignal},
history::{cursor::HistoryResult, event::EventType},
metrics,
signal::Signal,
workflow::Workflow,
Expand Down Expand Up @@ -113,9 +113,7 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
}

// Check if this signal is being replayed and previously had no target (will have a removed event)
if self.graceful_not_found && self.ctx.cursor().is_removed() {
self.ctx.cursor().compare_removed::<RemovedSignal<T>>()?;

if self.graceful_not_found && self.ctx.cursor().current_event_is_removed() {
tracing::debug!("replaying gracefully not found signal dispatch");

// Move to next event
Expand Down
48 changes: 28 additions & 20 deletions engine/packages/gasoline/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
executable::{AsyncResult, Executable},
history::{
History,
cursor::{Cursor, HistoryResult},
cursor::{Cursor, HistoryResult, RemovedHistoryResult},
event::SleepState,
location::Location,
removed::Removed,
Expand Down Expand Up @@ -1108,27 +1108,35 @@
pub async fn removed<T: Removed>(&mut self) -> Result<()> {
self.check_stop()?;

// Existing event
if self.cursor.compare_removed::<T>()? {
tracing::debug!("skipping removed step");
}
// New "removed" event
else {
tracing::debug!("inserting removed step");
match self.cursor.compare_removed::<T>() {
RemovedHistoryResult::New => {
tracing::debug!("inserting removed step");

self.db
.commit_workflow_removed_event(
self.workflow_id,
&self.cursor.current_location(),
T::event_type(),
T::name(),
self.loop_location(),
)
.await?;
};
self.db
.commit_workflow_removed_event(
self.workflow_id,
&self.cursor.current_location(),
T::event_type(),
T::name(),
self.loop_location(),
)
.await?;

// Move to next event
self.cursor.inc();
// Move to next event
self.cursor.inc();
}
RemovedHistoryResult::Skip => {
tracing::debug!("skipping removed step");

// Move to next event
self.cursor.inc();
}
RemovedHistoryResult::Ignore(msg) => {
tracing::debug!(
"removed event filter doesn't match existing event, ignoring: {msg}"
);
}
}

Ok(())
}
Expand All @@ -1147,12 +1155,12 @@
}

let history_res = self.cursor.compare_version_check()?;
let check_version_location = self.cursor.current_location_for(&history_res);

Check failure on line 1158 in engine/packages/gasoline/src/ctx/workflow.rs

View workflow job for this annotation

GitHub Actions / Check

mismatched types

let (version, insert) = match history_res {
CheckVersionHistoryResult::New => (latest_version, true),

Check failure on line 1161 in engine/packages/gasoline/src/ctx/workflow.rs

View workflow job for this annotation

GitHub Actions / Check

failed to resolve: use of undeclared type `CheckVersionHistoryResult`
CheckVersionHistoryResult::Event(version) => (version, false),

Check failure on line 1162 in engine/packages/gasoline/src/ctx/workflow.rs

View workflow job for this annotation

GitHub Actions / Check

failed to resolve: use of undeclared type `CheckVersionHistoryResult`
CheckVersionHistoryResult::Insertion(next_event_version) => (next_event_version, true),

Check failure on line 1163 in engine/packages/gasoline/src/ctx/workflow.rs

View workflow job for this annotation

GitHub Actions / Check

failed to resolve: use of undeclared type `CheckVersionHistoryResult`
};

if insert {
Expand Down
6 changes: 5 additions & 1 deletion engine/packages/gasoline/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,13 @@ impl WorkflowData {
.map_err(WorkflowError::DeserializeWorkflowOutput)
}

pub fn has_output(&self) -> bool {
pub fn is_complete(&self) -> bool {
self.output.is_some()
}

pub fn is_dead(&self) -> bool {
!self.is_complete() && !self.has_wake_condition
}
}

#[derive(Debug)]
Expand Down
Loading
Loading