From d6d4756b7a40249dbc622c8a93633e8d8b8333da Mon Sep 17 00:00:00 2001 From: pyranota <92104930+pyranota@users.noreply.github.com> Date: Tue, 17 Sep 2024 23:16:19 +0000 Subject: [PATCH] feat: Allow setProgress and getProgress from within the script (#4400) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Allow setting progress explicitly from script body. This feature exposes: * `getProgress` * `setProgress` * `incProgress` API in TypeScript client (python is coming soon). NOTE: Progress cannot be out of range 0..100 and cannot decrease. With exposed APIs there is also UI changes, so progress can be shown for individual jobs as well. For optimization reasons, jobs start to ask for progress only after N-seconds of execution. * feat: Add `shell.nix` If you dont have anything but nix, dont worry, run nix-shell in root, or activate with direnv and get all needed dependencies NOTE: You will still need docker * feat: Add `dev.nu` to typescript client Little helper function, allowing developer to work on ts client easier. To use: `./dev.nu watch` Now add import of windmill in body of your script and `//nobundle` on top of the file Edit ts client in your favourite editor and hit save. Script will do the rest. * Cleanup files * Fix: Failed to deserialize query string: missing field `get_progress` * perf: Implement non-naive polling mechanism for getting job progress * Add independant delay for getProgress Problem in `TestJobLoader`: There should be 2 delays: One until we find our first progress (every 5s) Once we found our first progress, we can do it every second * nit: Use `query_scalar!` instead of `query_as` * Fix: Sql error, no rows returned by a query that expected to return at least one row * refactor: Remove global CSS for JobProgressBar * Change UI for progress of flow subjobs * Replace `Step 1` with `Running` in ProgressBar for individual jobs * Remove `incProgress` incProgress is not very usefull and error-prone * perf: Set metric only for jobs that are actually using it (https://github.com/windmill-labs/windmill/pull/4373#discussion_r1759843773) * Offload registering progress from clients to server * Add `jobId?` argument to typescript-client's `setProgress` and `getProgress` Allows to set progress of other jobs and flows, if jobId specified, than flow id will be inferred automatically. Could be used by SDK. * Add `Error::MetricNotFound` for better error handling * Fix: Make `JobProgressBar` display in red when failed * Add persistant progress bar Now you can reload the page after job is done and progress will be still there * Allow succeeded individual job's progress bar stick to 100% * Add python support  * nit: Remove usage of undefined variable in python-client * Add `async` in ts client (for error handling) * nit(frontend): Remove unused import * Dont load JobProgressBar when it is not needed * nit: npm check fix * cargo sqlx prepare * fix sqlx --------- Co-authored-by: Ruben Fiszel --- .envrc | 1 + .gitignore | 1 + ...af22783c81bc757d735a4b247cc693dfed719.json | 15 +++ ...6819d7b6a2f5531740a7deea1b51775335977.json | 24 ++++ ...8f65143894f86517f2b5b2d51bc9a2857695c.json | 23 ++++ backend/windmill-api/openapi.yaml | 53 ++++++++- backend/windmill-api/src/job_metrics.rs | 110 +++++++++++++++++- backend/windmill-api/src/jobs.rs | 20 +++- backend/windmill-common/src/error.rs | 2 + backend/windmill-common/src/flow_status.rs | 4 + backend/windmill-common/src/job_metrics.rs | 2 +- backend/windmill-queue/src/jobs.rs | 2 + backend/windmill-worker/src/worker.rs | 5 + backend/windmill-worker/src/worker_flow.rs | 4 + .../src/lib/components/ModulePreview.svelte | 15 +++ .../src/lib/components/ScriptEditor.svelte | 14 ++- .../src/lib/components/TestJobLoader.svelte | 52 ++++++++- .../components/flows/FlowProgressBar.svelte | 11 +- .../lib/components/jobs/JobProgressBar.svelte | 58 +++++++++ .../components/progressBar/ProgressBar.svelte | 19 ++- .../src/lib/components/runs/JobPreview.svelte | 1 + .../components/scriptEditor/LogPanel.svelte | 1 + .../(root)/(logged)/run/[...run]/+page.svelte | 31 ++++- openflow.openapi.yaml | 2 + python-client/dev.nu | 54 +++++++++ python-client/wmill/wmill/client.py | 44 +++++++ shell.nix | 59 ++++++++++ typescript-client/README_DEV.md | 6 + typescript-client/build.sh | 2 +- typescript-client/client.ts | 44 +++++++ typescript-client/dev.nu | 62 ++++++++++ 31 files changed, 727 insertions(+), 14 deletions(-) create mode 100644 .envrc create mode 100644 backend/.sqlx/query-76ca60e456022cf3d1931245b7daf22783c81bc757d735a4b247cc693dfed719.json create mode 100644 backend/.sqlx/query-9422431d79de41518f651ef24e86819d7b6a2f5531740a7deea1b51775335977.json create mode 100644 backend/.sqlx/query-d600a0ad953ed131952d9c46deb8f65143894f86517f2b5b2d51bc9a2857695c.json create mode 100644 frontend/src/lib/components/jobs/JobProgressBar.svelte create mode 100755 python-client/dev.nu create mode 100644 shell.nix create mode 100755 typescript-client/dev.nu diff --git a/.envrc b/.envrc new file mode 100644 index 0000000000000..1d953f4bd7359 --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +use nix diff --git a/.gitignore b/.gitignore index 282fa5fb48419..ffda06e6cb8be 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ frontend/src/routes/test.svelte CaddyfileRemoteMalo *.swp **/.idea/ +.direnv diff --git a/backend/.sqlx/query-76ca60e456022cf3d1931245b7daf22783c81bc757d735a4b247cc693dfed719.json b/backend/.sqlx/query-76ca60e456022cf3d1931245b7daf22783c81bc757d735a4b247cc693dfed719.json new file mode 100644 index 0000000000000..530638af1fa36 --- /dev/null +++ b/backend/.sqlx/query-76ca60e456022cf3d1931245b7daf22783c81bc757d735a4b247cc693dfed719.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue\n SET flow_status = JSONB_SET(flow_status, ARRAY['modules', flow_status->>'step', 'progress'], $1)\n WHERE id = $2", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Jsonb", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "76ca60e456022cf3d1931245b7daf22783c81bc757d735a4b247cc693dfed719" +} diff --git a/backend/.sqlx/query-9422431d79de41518f651ef24e86819d7b6a2f5531740a7deea1b51775335977.json b/backend/.sqlx/query-9422431d79de41518f651ef24e86819d7b6a2f5531740a7deea1b51775335977.json new file mode 100644 index 0000000000000..7f2936f195891 --- /dev/null +++ b/backend/.sqlx/query-9422431d79de41518f651ef24e86819d7b6a2f5531740a7deea1b51775335977.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT scalar_int FROM job_stats WHERE workspace_id = $1 AND job_id = $2 AND metric_id = $3", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "scalar_int", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text", + "Uuid", + "Text" + ] + }, + "nullable": [ + true + ] + }, + "hash": "9422431d79de41518f651ef24e86819d7b6a2f5531740a7deea1b51775335977" +} diff --git a/backend/.sqlx/query-d600a0ad953ed131952d9c46deb8f65143894f86517f2b5b2d51bc9a2857695c.json b/backend/.sqlx/query-d600a0ad953ed131952d9c46deb8f65143894f86517f2b5b2d51bc9a2857695c.json new file mode 100644 index 0000000000000..34afed8276400 --- /dev/null +++ b/backend/.sqlx/query-d600a0ad953ed131952d9c46deb8f65143894f86517f2b5b2d51bc9a2857695c.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT (scalar_int)::int FROM job_stats WHERE job_id = $1 AND workspace_id = $2 AND metric_id = 'progress_perc'", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "scalar_int", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true + ] + }, + "hash": "d600a0ad953ed131952d9c46deb8f65143894f86517f2b5b2d51bc9a2857695c" +} diff --git a/backend/windmill-api/openapi.yaml b/backend/windmill-api/openapi.yaml index c6551920514b2..6a51c4dd954e8 100644 --- a/backend/windmill-api/openapi.yaml +++ b/backend/windmill-api/openapi.yaml @@ -5993,6 +5993,10 @@ paths: in: query schema: type: integer + - name: get_progress + in: query + schema: + type: boolean responses: "200": @@ -6012,6 +6016,8 @@ paths: type: integer mem_peak: type: integer + progress: + type: integer flow_status: $ref: "#/components/schemas/WorkflowStatusRecord" @@ -8647,6 +8653,52 @@ paths: items: $ref: "#/components/schemas/TimeseriesMetric" + /w/{workspace}/job_metrics/set_progress/{id}: + post: + summary: set job metrics + operationId: setJobProgress + tags: + - metrics + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - $ref: "#/components/parameters/JobId" + requestBody: + description: parameters for statistics retrieval + required: true + content: + application/json: + schema: + type: object + properties: + percent: + type: integer + flow_job_id: + type: string + format: uuid + + responses: + "200": + description: Job progress updated + content: + application/json: + schema: {} + + /w/{workspace}/job_metrics/get_progress/{id}: + get: + summary: get job progress + operationId: getJobProgress + tags: + - metrics + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - $ref: "#/components/parameters/JobId" + responses: + "200": + description: job progress between 0 and 99 + content: + application/json: + schema: + type: integer /service_logs/list_files: get: summary: list log files ordered by timestamp @@ -8711,7 +8763,6 @@ paths: schema: type: string - /concurrency_groups/list: get: summary: List all concurrency groups diff --git a/backend/windmill-api/src/job_metrics.rs b/backend/windmill-api/src/job_metrics.rs index 4a57ed28d4f60..2af59383a89af 100644 --- a/backend/windmill-api/src/job_metrics.rs +++ b/backend/windmill-api/src/job_metrics.rs @@ -1,12 +1,18 @@ use crate::db::DB; -use axum::{extract::Path, routing::post, Extension, Json, Router}; +use axum::{ + extract::Path, + routing::{get, post}, + Extension, Json, Router, +}; use serde::{Deserialize, Serialize}; use tower_http::cors::{Any, CorsLayer}; use uuid::Uuid; use windmill_common::{ - error, - job_metrics::{JobStatsRecord, MetricKind}, + error::{self, Error}, + job_metrics::{ + record_metric, register_metric_for_job, JobStatsRecord, MetricKind, MetricNumericValue, + }, }; pub fn workspaced_service() -> Router { @@ -15,7 +21,16 @@ pub fn workspaced_service() -> Router { .allow_headers([http::header::CONTENT_TYPE, http::header::AUTHORIZATION]) .allow_origin(Any); - Router::new().route("/get/:id", post(get_job_metrics).layer(cors.clone())) + Router::new() + .route("/get/:id", post(get_job_metrics).layer(cors.clone())) + .route( + "/set_progress/:id", + post(set_job_progress).layer(cors.clone()), + ) + .route( + "/get_progress/:id", + get(get_job_progress).layer(cors.clone()), + ) } #[derive(Deserialize)] @@ -137,6 +152,93 @@ async fn get_job_metrics( let response = JobStatsResponse { metrics_metadata, scalar_metrics, timeseries_metrics }; Ok(Json(response)) } +#[derive(Deserialize)] +struct JobProgressSetRequest { + percent: i32, + /// Optional parent flow id + /// Used to modify flow status + /// Specifically `progress` field in corresponding FlowStatusModule in `InProgress` state + flow_job_id: Option, +} + +async fn set_job_progress( + Extension(db): Extension, + Path((w_id, job_id)): Path<(String, Uuid)>, + Json(JobProgressSetRequest { percent, flow_job_id }): Json, +) -> error::JsonResult<()> { + // If flow_job_id exists, than we should modify flow_status of corresponding module + // Individual jobs and flows are handled differently + if let Some(flow_job_id) = flow_job_id { + // TODO: Return error if trying to set completed job? + sqlx::query!( + "UPDATE queue + SET flow_status = JSONB_SET(flow_status, ARRAY['modules', flow_status->>'step', 'progress'], $1) + WHERE id = $2", + serde_json::json!(percent.clamp(0, 99)), + flow_job_id + ) + .execute(&db) + .await?; + } + + let record_progress = || { + record_metric( + &db, + w_id.clone(), + job_id, + "progress_perc".to_owned(), + MetricNumericValue::Integer(percent), + ) + }; + + // Try to record + if let Err(err) = record_progress().await { + if matches!(err, Error::MetricNotFound(..)) { + // Register + // TODO: Reset progress after job is finished (in case it reruns same job)? + _ = register_metric_for_job( + &db, + w_id.clone(), + job_id, + "progress_perc".to_string(), + MetricKind::ScalarInt, + Some("Job Execution Progress (%)".to_owned()), + ) + .await?; + // Retry recording progress + record_progress().await.map_err(|err| { + // If for some reason it still returns same error, this error will be converted to BadRequest and returned + if let Error::MetricNotFound(body) = err { + Error::BadRequest(body) + } else { + err + } + })?; + } else { + return Err(err); + } + }; + return Ok(Json(())); +} + +async fn get_job_progress( + Extension(db): Extension, + Path((w_id, job_id)): Path<(String, Uuid)>, +) -> error::JsonResult> { + let progress: Option> = sqlx::query_scalar!( + "SELECT (scalar_int)::int FROM job_stats WHERE job_id = $1 AND workspace_id = $2 AND metric_id = 'progress_perc'", + job_id, w_id) + .fetch_optional(&db) + .await?; + + let respond_value = if let Some(Some(progress)) = progress { + Some(progress.clamp(0, 99)) + } else { + None + }; + + Ok(Json(respond_value)) +} fn timeseries_sample( from: Option>, diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index a80a225581623..cec301ad05995 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -4507,6 +4507,7 @@ pub async fn run_job_by_hash_inner( pub struct JobUpdateQuery { pub running: bool, pub log_offset: i32, + pub get_progress: Option, } #[derive(Serialize)] @@ -4516,6 +4517,7 @@ pub struct JobUpdate { pub new_logs: Option, pub log_offset: Option, pub mem_peak: Option, + pub progress: Option, pub flow_status: Option>, } @@ -4583,7 +4585,7 @@ async fn get_job_update( OptAuthed(opt_authed): OptAuthed, Extension(db): Extension, Path((w_id, job_id)): Path<(String, Uuid)>, - Query(JobUpdateQuery { running, log_offset }): Query, + Query(JobUpdateQuery { running, log_offset, get_progress }): Query, ) -> error::JsonResult { let record = sqlx::query_as::<_, JobUpdateRow>( "SELECT running, substr(concat(coalesce(queue.logs, ''), job_logs.logs), greatest($1 - job_logs.log_offset, 0)) as logs, mem_peak, @@ -4599,6 +4601,20 @@ async fn get_job_update( .fetch_optional(&db) .await?; + let progress: Option = if get_progress == Some(true){ + sqlx::query_scalar!( + "SELECT scalar_int FROM job_stats WHERE workspace_id = $1 AND job_id = $2 AND metric_id = $3", + &w_id, + job_id, + "progress_perc" + + ) + .fetch_one(&db) + .await? + } else { + None + }; + if let Some(record) = record { if opt_authed.is_none() && record.created_by != "anonymous" { return Err(Error::BadRequest( @@ -4616,6 +4632,7 @@ async fn get_job_update( completed: None, new_logs: record.logs, mem_peak: record.mem_peak, + progress, flow_status: record .flow_status .map(|x: sqlx::types::Json>| x.0), @@ -4648,6 +4665,7 @@ async fn get_job_update( log_offset: record.log_offset, new_logs: record.logs, mem_peak: record.mem_peak, + progress, flow_status: record .flow_status .map(|x: sqlx::types::Json>| x.0), diff --git a/backend/windmill-common/src/error.rs b/backend/windmill-common/src/error.rs index 65316155d267c..8ac8537c11c6c 100644 --- a/backend/windmill-common/src/error.rs +++ b/backend/windmill-common/src/error.rs @@ -30,6 +30,8 @@ pub enum Error { NotFound(String), #[error("Not authorized: {0}")] NotAuthorized(String), + #[error("Metric not found: {0}")] + MetricNotFound(String), #[error("Permission denied: {0}")] PermissionDenied(String), #[error("Require Admin privileges for {0}")] diff --git a/backend/windmill-common/src/flow_status.rs b/backend/windmill-common/src/flow_status.rs index dd11b8e9aff31..946cd45bffa4a 100644 --- a/backend/windmill-common/src/flow_status.rs +++ b/backend/windmill-common/src/flow_status.rs @@ -116,6 +116,7 @@ struct UntaggedFlowStatusModule { type_: String, id: Option, count: Option, + progress: Option, job: Option, iterator: Option, flow_jobs: Option>, @@ -147,6 +148,8 @@ pub enum FlowStatusModule { id: String, job: Uuid, #[serde(skip_serializing_if = "Option::is_none")] + progress: Option, + #[serde(skip_serializing_if = "Option::is_none")] iterator: Option, #[serde(skip_serializing_if = "Option::is_none")] flow_jobs: Option>, @@ -237,6 +240,7 @@ impl<'de> Deserialize<'de> for FlowStatusModule { branchall: untagged.branchall, parallel: untagged.parallel.unwrap_or(false), while_loop: untagged.while_loop.unwrap_or(false), + progress: untagged.progress, }), "Success" => Ok(FlowStatusModule::Success { id: untagged diff --git a/backend/windmill-common/src/job_metrics.rs b/backend/windmill-common/src/job_metrics.rs index f5b16d742cfc0..c0c2300fed22b 100644 --- a/backend/windmill-common/src/job_metrics.rs +++ b/backend/windmill-common/src/job_metrics.rs @@ -112,7 +112,7 @@ pub async fn record_metric( .await?; if metric_kind_opt.is_none() { - return Err(error::Error::BadRequest(format!( + return Err(error::Error::MetricNotFound(format!( "Metric {} not yet registered for job {}.", metric_id, job_id ))); diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 73c914c661fac..8a6fdcbab7e12 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -4096,6 +4096,7 @@ async fn restarted_flows_resolution( }), parallel: parallel, while_loop: false, + progress: None, }); } Ok(FlowModuleValue::ForloopFlow { parallel, .. }) => { @@ -4133,6 +4134,7 @@ async fn restarted_flows_resolution( branchall: None, parallel: parallel, while_loop: false, + progress: None, }); } _ => { diff --git a/backend/windmill-worker/src/worker.rs b/backend/windmill-worker/src/worker.rs index ad4e242be3a61..7b795b16076a8 100644 --- a/backend/windmill-worker/src/worker.rs +++ b/backend/windmill-worker/src/worker.rs @@ -1520,14 +1520,19 @@ pub async fn run_worker { + + last_executed_job = None; jobs_executed += 1; tracing::debug!("started handling of job {}", job.id); + if matches!(job.job_kind, JobKind::Script | JobKind::Preview) { + if !dedicated_workers.is_empty() { let key_o = if is_flow_worker { job.flow_step_id.as_ref().map(|x| x.to_string()) diff --git a/backend/windmill-worker/src/worker_flow.rs b/backend/windmill-worker/src/worker_flow.rs index 57c3ffdb606c0..9064cf1579abb 100644 --- a/backend/windmill-worker/src/worker_flow.rs +++ b/backend/windmill-worker/src/worker_flow.rs @@ -2553,6 +2553,7 @@ async fn push_next_flow_job id: status_module.id(), parallel: false, while_loop, + progress: None, } } NextStatus::AllFlowJobs { iterator, branchall, .. } => FlowStatusModule::InProgress { @@ -2565,6 +2566,7 @@ async fn push_next_flow_job id: status_module.id(), parallel: true, while_loop: false, + progress: None, }, NextStatus::NextBranchStep(NextBranch { mut flow_jobs, @@ -2587,6 +2589,7 @@ async fn push_next_flow_job id: status_module.id(), parallel: false, while_loop: false, + progress: None, } } @@ -2600,6 +2603,7 @@ async fn push_next_flow_job id: status_module.id(), parallel: false, while_loop: false, + progress: None, }, NextStatus::NextStep => { FlowStatusModule::WaitingForExecutor { id: status_module.id(), job: one_uuid? } diff --git a/frontend/src/lib/components/ModulePreview.svelte b/frontend/src/lib/components/ModulePreview.svelte index 0f5b076c3a5ab..e1a245a8aecd0 100644 --- a/frontend/src/lib/components/ModulePreview.svelte +++ b/frontend/src/lib/components/ModulePreview.svelte @@ -13,6 +13,7 @@ import LogViewer from './LogViewer.svelte' import TestJobLoader from './TestJobLoader.svelte' import ModulePreviewForm from './ModulePreviewForm.svelte' + import JobProgressBar from '$lib/components/jobs/JobProgressBar.svelte' import { evalValue } from './flows/utils' import type { PickableProperties } from './flows/previousResults' import type DiffEditor from './DiffEditor.svelte' @@ -31,10 +32,13 @@ getContext('FlowEditorContext') // Test + let scriptProgress = undefined; let testJobLoader: TestJobLoader let testIsLoading = false let testJob: Job | undefined = undefined + let jobProgressReset: () => void + let stepArgs: Record | undefined = Object.fromEntries( Object.keys(schema.properties ?? {}).map((k) => [ k, @@ -49,6 +53,9 @@ } export async function runTest(args: any) { + // Not defined if JobProgressBar not loaded + if (jobProgressReset) jobProgressReset(); + const val = mod.value // let jobId: string | undefined = undefined if (val.type == 'rawscript') { @@ -91,9 +98,12 @@ let forceJson = false + + jobDone()} + bind:scriptProgress bind:this={testJobLoader} bind:isLoading={testIsLoading} bind:job={testJob} @@ -143,6 +153,9 @@ /> + {#if scriptProgress} + + {/if} {#if testJob != undefined && 'result' in testJob && testJob.result != undefined}
{#if testIsLoading} + {#if !scriptProgress} + {/if} {:else} Test to see the result here {/if} diff --git a/frontend/src/lib/components/ScriptEditor.svelte b/frontend/src/lib/components/ScriptEditor.svelte index 0d41a31aea4cd..78064d2453f04 100644 --- a/frontend/src/lib/components/ScriptEditor.svelte +++ b/frontend/src/lib/components/ScriptEditor.svelte @@ -12,6 +12,7 @@ import LogPanel from './scriptEditor/LogPanel.svelte' import EditorBar, { EDITOR_BAR_WIDTH_THRESHOLD } from './EditorBar.svelte' import TestJobLoader from './TestJobLoader.svelte' + import JobProgressBar from '$lib/components/jobs/JobProgressBar.svelte' import { createEventDispatcher, onDestroy, onMount } from 'svelte' import { Button } from './common' import SplitPanesWrapper from './splitPanes/SplitPanesWrapper.svelte' @@ -46,6 +47,8 @@ export let watchChanges = false export let customUi: ScriptEditorWhitelabelCustomUi = {} + let jobProgressReset: () => void + let websocketAlive = { pyright: false, deno: false, @@ -68,6 +71,7 @@ let args: Record = initialArgs let isValid: boolean = true + let scriptProgress = undefined; // Test let testIsLoading = false @@ -98,6 +102,8 @@ } function runTest() { + // Not defined if JobProgressBar not loaded + if (jobProgressReset) jobProgressReset(); //@ts-ignore testJobLoader.runPreview(path, code, lang, args, tag) } @@ -214,6 +220,7 @@ + > + {#if scriptProgress} + + + {/if} +
diff --git a/frontend/src/lib/components/TestJobLoader.svelte b/frontend/src/lib/components/TestJobLoader.svelte index 84dc5b835e373..a8545062ed479 100644 --- a/frontend/src/lib/components/TestJobLoader.svelte +++ b/frontend/src/lib/components/TestJobLoader.svelte @@ -13,6 +13,18 @@ export let jobUpdateLastFetch: Date | undefined = undefined export let toastError = false export let lazyLogs = false + // Will be set to number if job is not a flow + // If you want to find out progress of subjobs of a flow, check job.flow_status.progress + export let scriptProgress: number | undefined = undefined; + + /// Last time asked for job progress + let lastTimeCheckedProgress: number | undefined = undefined; + + /// Will try to poll progress every 5s and if once progress returned was not undefined, will be ignored + /// and getProgressRate will be used instead + const getProgressRetryRate: number = 5000; + /// How often loader poll progress + const getProgressRate: number = 1000; const dispatch = createEventDispatcher() @@ -97,7 +109,7 @@ workspace: workspace!, id: job.id, running: `running` in job && job.running, - logOffset: job.logs?.length ?? 0 + logOffset: job.logs?.length ?? 0, }) if ((job.logs ?? '').length == 0) { @@ -115,6 +127,11 @@ tag: string | undefined, lock?: string ): Promise { + + // Reset in case we rerun job without reloading + scriptProgress = undefined; + lastTimeCheckedProgress = undefined; + return abstractRun(() => JobService.runScriptPreview({ workspace: $workspaceStore!, @@ -173,6 +190,27 @@ if (currentId === id) { try { if (job && `running` in job) { + + let getProgress: boolean | undefined = undefined; + // We only pull individual job progress this way + // Flow's progress we are getting from FlowStatusModule of flow job + if (job.job_kind == "script" || job.job_kind == "preview"){ + // First time, before running job, lastTimeCheckedProgress is always undefined + if (lastTimeCheckedProgress){ + const lastTimeCheckedMs = Date.now() - lastTimeCheckedProgress; + // Ask for progress if the last time we asked is >5s OR the progress was once not undefined + if (lastTimeCheckedMs > getProgressRetryRate || (scriptProgress != undefined && lastTimeCheckedMs > getProgressRate)){ + lastTimeCheckedProgress = Date.now(); + getProgress = true; + } + } else { + // Make it think we asked for progress, but in reality we didnt. First 5s we want to wait without putting extra work on db + // 99.99% of the jobs won't have progress be set so we have to do a balance between having low-latency for jobs that use it and job that don't + // we would usually not care to have progress the first 5s and jobs that are less than 5s + lastTimeCheckedProgress = Date.now(); + } + } + const offset = logOffset == 0 ? (job.logs?.length ? job.logs?.length + 1 : 0) : logOffset console.log('getLogs') @@ -180,9 +218,19 @@ workspace: workspace!, id, running: job.running, - logOffset: offset + logOffset: offset, + getProgress: getProgress }) + // Clamp number between two values with the following line: + const clamp = (num, min, max) => Math.min(Math.max(num, min), max); + + if (previewJobUpdates.progress){ + // Progress cannot go back and cannot be set to 100 + scriptProgress = clamp(previewJobUpdates.progress, scriptProgress ?? 0, 99); + } + + if (previewJobUpdates.new_logs) { if (offset == 0) { job.logs = previewJobUpdates.new_logs ?? '' diff --git a/frontend/src/lib/components/flows/FlowProgressBar.svelte b/frontend/src/lib/components/flows/FlowProgressBar.svelte index 784bcf1c15308..629c2d2eb3ede 100644 --- a/frontend/src/lib/components/flows/FlowProgressBar.svelte +++ b/frontend/src/lib/components/flows/FlowProgressBar.svelte @@ -10,6 +10,7 @@ let subLength: number | undefined = undefined let length = 1 let nextInProgress = false + let subIndexIsPercent: boolean = false $: if (job) updateJobProgress(job) @@ -38,7 +39,8 @@ newError = maxDone maxDone = maxDone + 1 } - } + } + subIndexIsPercent = false; // Loop is still iterating if (module?.iterator) { @@ -51,6 +53,12 @@ } else if (module?.branchall) { subStepIndex = module.branchall.branch subStepLength = module.branchall.len + } else if (module?.progress) { + const clamp = (num, min, max) => Math.min(Math.max(num, min), max) + subStepIndex = clamp(module?.progress, subIndex ?? 0, 99) + // Jitter protection >^^^^^^^^ + subStepLength = 100 + subIndexIsPercent = true; } error = newError @@ -81,5 +89,6 @@ {subLength} {subIndex} {error} + bind:subIndexIsPercent class={$$props.class} /> diff --git a/frontend/src/lib/components/jobs/JobProgressBar.svelte b/frontend/src/lib/components/jobs/JobProgressBar.svelte new file mode 100644 index 0000000000000..4b4684e32a5e1 --- /dev/null +++ b/frontend/src/lib/components/jobs/JobProgressBar.svelte @@ -0,0 +1,58 @@ + + + diff --git a/frontend/src/lib/components/progressBar/ProgressBar.svelte b/frontend/src/lib/components/progressBar/ProgressBar.svelte index 00c80136c2626..cf1fba97b4e2f 100644 --- a/frontend/src/lib/components/progressBar/ProgressBar.svelte +++ b/frontend/src/lib/components/progressBar/ProgressBar.svelte @@ -1,6 +1,7 @@
+ {#if !compact}
{$percent.toFixed(0)}%
+ {/if} @@ -68,7 +82,10 @@ {getPercent(index)} | {/each} --> -
+
{#each new Array(length) as _, partIndex (partIndex)}
{#if partIndex == index && nextInProgress} diff --git a/frontend/src/lib/components/runs/JobPreview.svelte b/frontend/src/lib/components/runs/JobPreview.svelte index edd642697fad4..e4baf7012c36e 100644 --- a/frontend/src/lib/components/runs/JobPreview.svelte +++ b/frontend/src/lib/components/runs/JobPreview.svelte @@ -67,6 +67,7 @@ bind:watchJob on:done={onDone} /> +
{#if job}
diff --git a/frontend/src/lib/components/scriptEditor/LogPanel.svelte b/frontend/src/lib/components/scriptEditor/LogPanel.svelte index 4a7f2b008ed41..8b7a59c44d59c 100644 --- a/frontend/src/lib/components/scriptEditor/LogPanel.svelte +++ b/frontend/src/lib/components/scriptEditor/LogPanel.svelte @@ -118,6 +118,7 @@ /> + {#if previewJob != undefined && 'result' in previewJob}
diff --git a/frontend/src/routes/(root)/(logged)/run/[...run]/+page.svelte b/frontend/src/routes/(root)/(logged)/run/[...run]/+page.svelte index 2ab2a5eb54cff..4ae0562272ebc 100644 --- a/frontend/src/routes/(root)/(logged)/run/[...run]/+page.svelte +++ b/frontend/src/routes/(root)/(logged)/run/[...run]/+page.svelte @@ -8,7 +8,10 @@ type Script, type WorkflowStatus, type NewScript, - ConcurrencyGroupsService + ConcurrencyGroupsService, + + MetricsService + } from '$lib/gen' import { canWrite, @@ -68,6 +71,7 @@ import FlowMetadata from '$lib/components/FlowMetadata.svelte' import JobArgs from '$lib/components/JobArgs.svelte' import FlowProgressBar from '$lib/components/flows/FlowProgressBar.svelte' + import JobProgressBar from '$lib/components/jobs/JobProgressBar.svelte' import Tabs from '$lib/components/common/tabs/Tabs.svelte' import Badge from '$lib/components/common/badge/Badge.svelte' import Tooltip from '$lib/components/Tooltip.svelte' @@ -91,6 +95,8 @@ let job: Job | undefined let jobUpdateLastFetch: Date | undefined + let scriptProgress: number | undefined = undefined; + let viewTab: 'result' | 'logs' | 'code' | 'stats' = 'result' let selectedJobStep: string | undefined = undefined let branchOrIterationN: number = 0 @@ -190,6 +196,24 @@ let persistentScriptDefinition: Script | undefined = undefined async function onJobLoaded() { + // We want to set up scriptProgress once job is loaded + // We need this to show progress bar if job has progress and is finished + if (job && job.type == "CompletedJob"){ + // If error occured and job is completed + // than we fetch progress from server to display on what progress did it fail + // Could be displayed after run or as a historical page + // If opening page without running job (e.g. reloading page after run) progress will be displayed instantly + MetricsService.getJobProgress({ + workspace: job.workspace_id ?? "NO_WORKSPACE", + id: job.id, + }).then(progress => { + // Returned progress is not always 100%, could be 65%, 33%, anything + // Its ok if its a failure and we want to keep that value + // But we want progress to be 100% if job has been succeeded + scriptProgress = progress; + }); + } + if (job === undefined || job.job_kind !== 'script' || job.script_hash === undefined) { return } @@ -200,6 +224,7 @@ if (script.restart_unless_cancelled ?? false) { persistentScriptDefinition = script } + } $: { @@ -319,6 +344,7 @@ job?.['result'] != undefined && (viewTab = 'result')} bind:this={testJobLoader} bind:getLogs @@ -731,6 +757,9 @@ flowDone={job.type == 'CompletedJob'} /> {/if} + {#if scriptProgress} + + {/if}
diff --git a/openflow.openapi.yaml b/openflow.openapi.yaml index deaaa150c3322..1cc4f018027ab 100644 --- a/openflow.openapi.yaml +++ b/openflow.openapi.yaml @@ -481,6 +481,8 @@ components: format: uuid count: type: integer + progress: + type: integer iterator: type: object properties: diff --git a/python-client/dev.nu b/python-client/dev.nu new file mode 100755 index 0000000000000..52d68a7bec03d --- /dev/null +++ b/python-client/dev.nu @@ -0,0 +1,54 @@ +#! /usr/bin/env nu + +let cache = "/tmp/windmill/cache/pip/" + +# Clean cache +def "main clean" [] { + ^rm -rf ($cache ++ "/wmill*") +} + +# Watch changes in directory and autopatch (watchexec required) +def "main watch" [] { + # watchexec -w ../backend/windmill-api/openapi.yaml './dev.nu -g' & + # TODO: Watch openapi.yaml + ^watchexec ./dev.nu -p + +} + +# Build client and move to windmill's cache +# To build you will need nushell and tsc (typescript compiler) +# If none arguments selected, all will be turned on +# If any argument specified, all others will be disabled +def main [ + --gen(-g) # Generate code (OpenAPI codegen) + --compile(-c) # Compile code (TS >> JS) + --patch(-p) # Patch +] { + + let do_all = not ($gen or $compile or $patch); + + # TODO: Gen windmill-client.js + # TODO: Gen bundle? (README_DEV.md) + + if ($do_all or $gen) { + print "Generating code from openapi.yml..." + ./build.sh + } + + if ($do_all or $patch) { + print "Patching cache..." + + # Clean up in all versions + rm -rf ($cache ++ wmill*/wmill/*) + + # Copy files from local ./dist to every wm-client version in cache + ls /tmp/windmill/cache/pip/wmill* | each { + |i| + + let path = $i | get name; + ^cp -r wmill/wmill/* ($path ++ "/wmill") + } + } + + print Done! +} diff --git a/python-client/wmill/wmill/client.py b/python-client/wmill/wmill/client.py index f68ee13dcc950..42ea61281f797 100644 --- a/python-client/wmill/wmill/client.py +++ b/python-client/wmill/wmill/client.py @@ -346,6 +346,36 @@ def set_resource( def set_state(self, value: Any): self.set_resource(value, path=self.state_path, resource_type="state") + def set_progress(self, value: int, job_id: Optional[str] = None): + workspace = get_workspace() + flow_id = os.environ.get("WM_FLOW_JOB_ID") + job_id = job_id or os.environ.get("WM_JOB_ID") + + if job_id != None: + job = self.get_job(job_id) + flow_id = job.get("parent_job") + + self.post( + f"/w/{workspace}/job_metrics/set_progress/{job_id}", + json={ + "percent": value, + "flow_job_id": flow_id or None, + }, + ) + + def get_progress(self, job_id: Optional[str] = None ) -> Any: + workspace = get_workspace() + job_id = job_id or os.environ.get("WM_JOB_ID") + + r = self.get( + f"/w/{workspace}/job_metrics/get_progress/{job_id}", + ) + if r.status_code == 404: + print(f"Job {job_id} does not exist") + return None + else: + return r.json() + def set_flow_user_state(self, key: str, value: Any) -> None: """Set the user state of a flow at a given key""" flow_id = self.get_root_job_id() @@ -838,6 +868,20 @@ def set_state(value: Any) -> None: """ return _client.set_state(value) +@init_global_client +def set_progress(value: int, job_id: Optional[str] = None) -> None: + """ + Set the progress + """ + return _client.set_progress(value, job_id) + +@init_global_client +def get_progress(job_id: Optional[str] = None) -> Any: + """ + Get the progress + """ + return _client.get_progress(job_id) + def set_shared_state_pickle(value: Any, path="state.pickle") -> None: """ diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000000000..58085ee67519f --- /dev/null +++ b/shell.nix @@ -0,0 +1,59 @@ +{ pkgs ? import { } }: + +/* based on + https://discourse.nixos.org/t/how-can-i-set-up-my-rust-programming-environment/4501/9 +*/ +let + rust_overlay = import (builtins.fetchTarball + "https://github.com/oxalica/rust-overlay/archive/master.tar.gz"); + pkgs = import { overlays = [ rust_overlay ]; }; + # TODO: Pin version? + rustVersion = "latest"; + # rustVersion = "1.83.0"; + rust = pkgs.rust-bin.nightly.${rustVersion}.default.override { + extensions = [ + "rust-src" # for rust-analyzer + "rust-analyzer" + ]; + }; +in pkgs.mkShell { + + packages = with pkgs; [ + rust + # rustup + cargo-watch + typescript # tsc + typescript-language-server + postgresql + watchexec # used in client's dev.nu + poetry # for python client + python312Packages.pip-tools # pip-compile + ]; + + # buildInputs = with pkgs; [ xz lzma ]; + + # Add the following lines to set the LD_LIBRARY_PATH + LD_LIBRARY_PATH = "${pkgs.lib.makeLibraryPath (with pkgs; [ + lzma + libseccomp + bzip2 + openssl_3_3 + # + ])}"; + + REMOTE = "http://127.0.0.1:8000"; + REMOTE_LSP = "http://127.0.0.1:3001"; + + DATABASE_URL = + "postgres://postgres:changeme@127.0.0.1:5432/windmill?sslmode=disable"; + + RUSTC_LINKER = "${pkgs.clang}/bin/clang"; + CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER = "${pkgs.clang}/bin/clang"; + + RUSTFLAGS = + "-C link-arg=-fuse-ld=${pkgs.mold}/bin/mold -Zshare-generics=y -Z threads=4"; + RUSTC_WRAPPER = "${pkgs.sccache}/bin/sccache"; + + # Use mold as a linker (for faster compilation) + +} diff --git a/typescript-client/README_DEV.md b/typescript-client/README_DEV.md index c9915d2242bc6..5e2481d97898f 100644 --- a/typescript-client/README_DEV.md +++ b/typescript-client/README_DEV.md @@ -9,3 +9,9 @@ undle --outfile=windmill.js --format=esm node_modules/dts-bundle-generator/dist/bin/dts-bundle-generator.js -o windmill.d.ts types/in dex.d.ts + +# Develop client locally + +`./dev.nu watch` + +> If something not working, try to put //nobundle inside script body diff --git a/typescript-client/build.sh b/typescript-client/build.sh index d3e2cdcbf8b02..436f2a526bdfa 100755 --- a/typescript-client/build.sh +++ b/typescript-client/build.sh @@ -34,4 +34,4 @@ cp "${script_dirpath}/s3Types.ts" "${script_dirpath}/src/" echo "" >> "${script_dirpath}/src/index.ts" echo 'export type { S3Object, DenoS3LightClientSettings } from "./s3Types";' >> "${script_dirpath}/src/index.ts" echo "" >> "${script_dirpath}/src/index.ts" -echo 'export { type Base64, setClient, getVariable, setVariable, getResource, setResource, getResumeUrls, setState, getState, getIdToken, denoS3LightClientSettings, loadS3FileStream, loadS3File, writeS3File, task, runScript, runScriptAsync, runFlow, runFlowAsync, waitJob, getRootJobId, setFlowUserState, getFlowUserState, usernameToEmail } from "./client";' >> "${script_dirpath}/src/index.ts" +echo 'export { type Base64, setClient, getVariable, setVariable, getResource, setResource, getResumeUrls, setState, setProgress, getProgress, getState, getIdToken, denoS3LightClientSettings, loadS3FileStream, loadS3File, writeS3File, task, runScript, runScriptAsync, runFlow, runFlowAsync, waitJob, getRootJobId, setFlowUserState, getFlowUserState, usernameToEmail } from "./client";' >> "${script_dirpath}/src/index.ts" diff --git a/typescript-client/client.ts b/typescript-client/client.ts index 1ebda795482d0..2943aba5a5c80 100644 --- a/typescript-client/client.ts +++ b/typescript-client/client.ts @@ -3,6 +3,7 @@ import { VariableService, JobService, HelpersService, + MetricsService, OidcService, UserService, } from "./index"; @@ -386,6 +387,49 @@ export async function setState(state: any): Promise { await setResource(state, undefined, "state"); } +/** + * Set the progress + * Progress cannot go back and limited to 0% to 99% range + * @param percent Progress to set in % + */ +export async function setProgress(percent: number, jobId?: any): Promise { + const workspace = getWorkspace(); + let flowId = getEnv("WM_FLOW_JOB_ID"); + + // If jobId specified we need to find if there is a parent/flow + if (jobId) { + const job = await JobService.getJob({ + id: jobId ?? "NO_JOB_ID", + workspace, + noLogs: true + }); + + // Could be actual flowId or undefined + flowId = job.parent_job; + } + + await MetricsService.setJobProgress({ + id: jobId ?? getEnv("WM_JOB_ID") ?? "NO_JOB_ID", + workspace, + requestBody: { + percent, + flow_job_id: (flowId == "") ? undefined : flowId, + } + }); +} + +/** + * Get the progress + * @returns Optional clamped between 0 and 100 progress value + */ +export async function getProgress(jobId?: any): Promise { + // TODO: Delete or set to 100 completed job metrics + return await MetricsService.getJobProgress({ + id: jobId ?? getEnv("WM_JOB_ID") ?? "NO_JOB_ID", + workspace: getWorkspace(), + }); +} + /** * Set a flow user state * @param key key of the state diff --git a/typescript-client/dev.nu b/typescript-client/dev.nu new file mode 100755 index 0000000000000..36b384eeb3a2e --- /dev/null +++ b/typescript-client/dev.nu @@ -0,0 +1,62 @@ +#! /usr/bin/env nu + +let cache = "/tmp/windmill/cache_nomount/bun/" + +# Clean cache +def "main clean" [] { + ^rm -rf ($cache ++ "/windmill-client") +} + +# Watch changes in directory and autopatch (watchexec required) +def "main watch" [] { + # watchexec -w ../backend/windmill-api/openapi.yaml './dev.nu -g' & + # TODO: Watch openapi.yaml + ^watchexec ./dev.nu + +} + +# Build client and move to windmill's cache +# To build you will need nushell and tsc (typescript compiler) +# If none arguments selected, all will be turned on +# If any argument specified, all others will be disabled +def main [ + --gen(-g) # Generate code (OpenAPI codegen) + --compile(-c) # Compile code (TS >> JS) + --patch(-p) # Patch +] { + + let do_all = not ($gen or $compile or $patch); + + # TODO: Gen windmill-client.js + # TODO: Gen bundle? (README_DEV.md) + + if ($do_all or $gen) { + print "Generating code from openapi.yml..." + ./build.sh + } + + if ($do_all or $compile) { + print "Compiling Typescript..." + tsc + } + + if ($do_all or $patch) { + print "Patching cache..." + + # Clean up in all versions + rm -rf ($cache ++ windmill-client@*/dist/*) + + # Delete all script bundles + # rm -rf /tmp/windmill/cache/bun/* + + # Copy files from local ./dist to every wm-client version in cache + ls ($cache ++ "windmill-client/") | each { + |i| + + let path = $i | get name; + ^cp -r dist/* ($path ++ "/dist") + } + } + + print Done! +}