Skip to content

Commit

Permalink
fix: improve runFlowAsync and run_flow_async default behavior + time …
Browse files Browse the repository at this point in the history
…formatting of scheduled for
  • Loading branch information
rubenfiszel committed Sep 11, 2024
1 parent 509f4a0 commit 51e6f36
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 13 deletions.
4 changes: 2 additions & 2 deletions backend/windmill-common/src/variables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* LICENSE-AGPL for a copy of the license.
*/

use chrono::Utc;
use chrono::{SecondsFormat, Utc};
use magic_crypt::{MagicCrypt256, MagicCryptError, MagicCryptTrait};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -263,7 +263,7 @@ pub async fn get_reserved_variables(
ContextualVariable {
name: WM_SCHEDULED_FOR.to_string(),
value: scheduled_for
.map(|ts| ts.to_string())
.map(|ts| ts.to_rfc3339_opts(SecondsFormat::Secs, true))
.unwrap_or_else(|| "".to_string()),
description: "date-time in UTC (e.g: 2014-11-28T12:45:59.324310806Z) of when the job was scheduled".to_string(),
is_custom: false,
Expand Down
18 changes: 14 additions & 4 deletions python-client/wmill/wmill/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,19 @@ def run_flow_async(
path: str,
args: dict = None,
scheduled_in_secs: int = None,
# can only be set to false if this the job will be fully await and not concurrent with any other job
# as otherwise the child flow and its own child will store their state in the parent job which will
# lead to incorrectness and failures
do_not_track_in_parent: bool = True,
) -> str:
"""Create a flow job and return its job id."""
args = args or {}
params = {"scheduled_in_secs": scheduled_in_secs} if scheduled_in_secs else {}
if os.environ.get("WM_JOB_ID"):
params["parent_job"] = os.environ.get("WM_JOB_ID")
if os.environ.get("WM_ROOT_FLOW_JOB_ID"):
params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID")
if not do_not_track_in_parent:
if os.environ.get("WM_JOB_ID"):
params["parent_job"] = os.environ.get("WM_JOB_ID")
if os.environ.get("WM_ROOT_FLOW_JOB_ID"):
params["root_job"] = os.environ.get("WM_ROOT_FLOW_JOB_ID")
if path:
endpoint = f"/w/{self.workspace}/jobs/run/f/{path}"
else:
Expand Down Expand Up @@ -655,11 +660,16 @@ def run_flow_async(
path: str,
args: Dict[str, Any] = None,
scheduled_in_secs: int = None,
# can only be set to false if this the job will be fully await and not concurrent with any other job
# as otherwise the child flow and its own child will store their state in the parent job which will
# lead to incorrectness and failures
do_not_track_in_parent: bool = True,
) -> str:
return _client.run_flow_async(
path=path,
args=args,
scheduled_in_secs=scheduled_in_secs,
do_not_track_in_parent=do_not_track_in_parent,
)


Expand Down
16 changes: 9 additions & 7 deletions typescript-client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,10 @@ export async function runFlowAsync(
path: string | null,
args: Record<string, any> | null,
scheduledInSeconds: number | null = null,
flowOutlivesParent: boolean = true
// can only be set to false if this the job will be fully await and not concurrent with any other job
// as otherwise the child flow and its own child will store their state in the parent job which will
// lead to incorrectness and failures
doNotTrackInParent: boolean = true
): Promise<string> {
// Create a script job and return its job id.

Expand All @@ -284,16 +287,15 @@ export async function runFlowAsync(
params["scheduled_in_secs"] = scheduledInSeconds;
}

if (!flowOutlivesParent) {
if (!doNotTrackInParent) {
let parentJobId = getEnv("WM_JOB_ID");
if (parentJobId !== undefined) {
params["parent_job"] = parentJobId;
}
}

let rootJobId = getEnv("WM_ROOT_FLOW_JOB_ID");
if (rootJobId != undefined && rootJobId != "") {
params["root_job"] = rootJobId;
let rootJobId = getEnv("WM_ROOT_FLOW_JOB_ID");
if (rootJobId != undefined && rootJobId != "") {
params["root_job"] = rootJobId;
}
}

let endpoint: string;
Expand Down

0 comments on commit 51e6f36

Please sign in to comment.