Skip to content

Commit

Permalink
feat: Allow setProgress and getProgress from within the script (#4400)
Browse files Browse the repository at this point in the history
* Allow setting progress explicitly from script body.

This feature exposes:
 * `getProgress`
 * `setProgress`
 * `incProgress`

API in TypeScript client (python is coming soon).

NOTE: Progress cannot be out of range 0..100 and cannot decrease.

With exposed APIs there is also UI changes, so progress can be shown for individual jobs as well.

For optimization reasons, jobs start to ask for progress only after N-seconds of execution.

* feat: Add `shell.nix`

If you dont have anything but nix, dont worry, run nix-shell in root, or activate with direnv and get all needed dependencies

NOTE: You will still need docker

* feat: Add `dev.nu` to typescript client

Little helper function, allowing developer to work on ts client easier.

To use:
  `./dev.nu watch`

Now add import of windmill in body of your script and `//nobundle` on top of the file

Edit ts client in your favourite editor and hit save. Script will do the rest.

* Cleanup files

* Fix: Failed to deserialize query string: missing field `get_progress`

* perf: Implement non-naive polling mechanism for getting job progress

* Add independant delay for getProgress

Problem in `TestJobLoader`:

There should be 2 delays:

    One until we find our first progress (every 5s)
    Once we found our first progress, we can do it every second

* nit: Use `query_scalar!` instead of `query_as`

* Fix: Sql error, no rows returned by a query that expected to return at least one row

* refactor: Remove global CSS for JobProgressBar

* Change UI for progress of flow subjobs

* Replace `Step 1` with `Running` in ProgressBar for individual jobs

* Remove `incProgress`

incProgress is not very usefull and error-prone

* perf: Set metric only for jobs that are actually using it

(#4373 (comment))

* Offload registering progress from clients to server

* Add `jobId?` argument to typescript-client's `setProgress` and `getProgress`

Allows to set progress of other jobs and flows,
if jobId specified, than flow id will be inferred automatically.

Could be used by SDK.

* Add `Error::MetricNotFound` for better error handling

* Fix: Make `JobProgressBar` display in red when failed

* Add persistant progress bar

Now you can reload the page after job is done and progress will be still there

* Allow succeeded individual job's progress bar stick to 100%

* Add python support 

* nit: Remove usage of undefined variable in python-client

* Add `async` in ts client (for error handling)

* nit(frontend): Remove unused import

* Dont load JobProgressBar when it is not needed

* nit: npm check fix

* cargo sqlx prepare

* fix sqlx

---------

Co-authored-by: Ruben Fiszel <[email protected]>
  • Loading branch information
pyranota and rubenfiszel committed Sep 17, 2024
1 parent f7454e6 commit d6d4756
Show file tree
Hide file tree
Showing 31 changed files with 727 additions and 14 deletions.
1 change: 1 addition & 0 deletions .envrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
use nix
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ frontend/src/routes/test.svelte
CaddyfileRemoteMalo
*.swp
**/.idea/
.direnv

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 52 additions & 1 deletion backend/windmill-api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5993,6 +5993,10 @@ paths:
in: query
schema:
type: integer
- name: get_progress
in: query
schema:
type: boolean

responses:
"200":
Expand All @@ -6012,6 +6016,8 @@ paths:
type: integer
mem_peak:
type: integer
progress:
type: integer
flow_status:
$ref: "#/components/schemas/WorkflowStatusRecord"

Expand Down Expand Up @@ -8647,6 +8653,52 @@ paths:
items:
$ref: "#/components/schemas/TimeseriesMetric"

/w/{workspace}/job_metrics/set_progress/{id}:
post:
summary: set job metrics
operationId: setJobProgress
tags:
- metrics
parameters:
- $ref: "#/components/parameters/WorkspaceId"
- $ref: "#/components/parameters/JobId"
requestBody:
description: parameters for statistics retrieval
required: true
content:
application/json:
schema:
type: object
properties:
percent:
type: integer
flow_job_id:
type: string
format: uuid

responses:
"200":
description: Job progress updated
content:
application/json:
schema: {}

/w/{workspace}/job_metrics/get_progress/{id}:
get:
summary: get job progress
operationId: getJobProgress
tags:
- metrics
parameters:
- $ref: "#/components/parameters/WorkspaceId"
- $ref: "#/components/parameters/JobId"
responses:
"200":
description: job progress between 0 and 99
content:
application/json:
schema:
type: integer
/service_logs/list_files:
get:
summary: list log files ordered by timestamp
Expand Down Expand Up @@ -8711,7 +8763,6 @@ paths:
schema:
type: string


/concurrency_groups/list:
get:
summary: List all concurrency groups
Expand Down
110 changes: 106 additions & 4 deletions backend/windmill-api/src/job_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use crate::db::DB;

use axum::{extract::Path, routing::post, Extension, Json, Router};
use axum::{
extract::Path,
routing::{get, post},
Extension, Json, Router,
};
use serde::{Deserialize, Serialize};
use tower_http::cors::{Any, CorsLayer};
use uuid::Uuid;
use windmill_common::{
error,
job_metrics::{JobStatsRecord, MetricKind},
error::{self, Error},
job_metrics::{
record_metric, register_metric_for_job, JobStatsRecord, MetricKind, MetricNumericValue,
},
};

pub fn workspaced_service() -> Router {
Expand All @@ -15,7 +21,16 @@ pub fn workspaced_service() -> Router {
.allow_headers([http::header::CONTENT_TYPE, http::header::AUTHORIZATION])
.allow_origin(Any);

Router::new().route("/get/:id", post(get_job_metrics).layer(cors.clone()))
Router::new()
.route("/get/:id", post(get_job_metrics).layer(cors.clone()))
.route(
"/set_progress/:id",
post(set_job_progress).layer(cors.clone()),
)
.route(
"/get_progress/:id",
get(get_job_progress).layer(cors.clone()),
)
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -137,6 +152,93 @@ async fn get_job_metrics(
let response = JobStatsResponse { metrics_metadata, scalar_metrics, timeseries_metrics };
Ok(Json(response))
}
#[derive(Deserialize)]
struct JobProgressSetRequest {
percent: i32,
/// Optional parent flow id
/// Used to modify flow status
/// Specifically `progress` field in corresponding FlowStatusModule in `InProgress` state
flow_job_id: Option<Uuid>,
}

async fn set_job_progress(
Extension(db): Extension<DB>,
Path((w_id, job_id)): Path<(String, Uuid)>,
Json(JobProgressSetRequest { percent, flow_job_id }): Json<JobProgressSetRequest>,
) -> error::JsonResult<()> {
// If flow_job_id exists, than we should modify flow_status of corresponding module
// Individual jobs and flows are handled differently
if let Some(flow_job_id) = flow_job_id {
// TODO: Return error if trying to set completed job?
sqlx::query!(
"UPDATE queue
SET flow_status = JSONB_SET(flow_status, ARRAY['modules', flow_status->>'step', 'progress'], $1)
WHERE id = $2",
serde_json::json!(percent.clamp(0, 99)),
flow_job_id
)
.execute(&db)
.await?;
}

let record_progress = || {
record_metric(
&db,
w_id.clone(),
job_id,
"progress_perc".to_owned(),
MetricNumericValue::Integer(percent),
)
};

// Try to record
if let Err(err) = record_progress().await {
if matches!(err, Error::MetricNotFound(..)) {
// Register
// TODO: Reset progress after job is finished (in case it reruns same job)?
_ = register_metric_for_job(
&db,
w_id.clone(),
job_id,
"progress_perc".to_string(),
MetricKind::ScalarInt,
Some("Job Execution Progress (%)".to_owned()),
)
.await?;
// Retry recording progress
record_progress().await.map_err(|err| {
// If for some reason it still returns same error, this error will be converted to BadRequest and returned
if let Error::MetricNotFound(body) = err {
Error::BadRequest(body)
} else {
err
}
})?;
} else {
return Err(err);
}
};
return Ok(Json(()));
}

async fn get_job_progress(
Extension(db): Extension<DB>,
Path((w_id, job_id)): Path<(String, Uuid)>,
) -> error::JsonResult<Option<i32>> {
let progress: Option<Option<i32>> = sqlx::query_scalar!(
"SELECT (scalar_int)::int FROM job_stats WHERE job_id = $1 AND workspace_id = $2 AND metric_id = 'progress_perc'",
job_id, w_id)
.fetch_optional(&db)
.await?;

let respond_value = if let Some(Some(progress)) = progress {
Some(progress.clamp(0, 99))
} else {
None
};

Ok(Json(respond_value))
}

fn timeseries_sample<T: Copy>(
from: Option<chrono::DateTime<chrono::Utc>>,
Expand Down
20 changes: 19 additions & 1 deletion backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4507,6 +4507,7 @@ pub async fn run_job_by_hash_inner(
pub struct JobUpdateQuery {
pub running: bool,
pub log_offset: i32,
pub get_progress: Option<bool>,
}

#[derive(Serialize)]
Expand All @@ -4516,6 +4517,7 @@ pub struct JobUpdate {
pub new_logs: Option<String>,
pub log_offset: Option<i32>,
pub mem_peak: Option<i32>,
pub progress: Option<i32>,
pub flow_status: Option<Box<serde_json::value::RawValue>>,
}

Expand Down Expand Up @@ -4583,7 +4585,7 @@ async fn get_job_update(
OptAuthed(opt_authed): OptAuthed,
Extension(db): Extension<DB>,
Path((w_id, job_id)): Path<(String, Uuid)>,
Query(JobUpdateQuery { running, log_offset }): Query<JobUpdateQuery>,
Query(JobUpdateQuery { running, log_offset, get_progress }): Query<JobUpdateQuery>,
) -> error::JsonResult<JobUpdate> {
let record = sqlx::query_as::<_, JobUpdateRow>(
"SELECT running, substr(concat(coalesce(queue.logs, ''), job_logs.logs), greatest($1 - job_logs.log_offset, 0)) as logs, mem_peak,
Expand All @@ -4599,6 +4601,20 @@ async fn get_job_update(
.fetch_optional(&db)
.await?;

let progress: Option<i32> = if get_progress == Some(true){
sqlx::query_scalar!(
"SELECT scalar_int FROM job_stats WHERE workspace_id = $1 AND job_id = $2 AND metric_id = $3",
&w_id,
job_id,
"progress_perc"

)
.fetch_one(&db)
.await?
} else {
None
};

if let Some(record) = record {
if opt_authed.is_none() && record.created_by != "anonymous" {
return Err(Error::BadRequest(
Expand All @@ -4616,6 +4632,7 @@ async fn get_job_update(
completed: None,
new_logs: record.logs,
mem_peak: record.mem_peak,
progress,
flow_status: record
.flow_status
.map(|x: sqlx::types::Json<Box<RawValue>>| x.0),
Expand Down Expand Up @@ -4648,6 +4665,7 @@ async fn get_job_update(
log_offset: record.log_offset,
new_logs: record.logs,
mem_peak: record.mem_peak,
progress,
flow_status: record
.flow_status
.map(|x: sqlx::types::Json<Box<RawValue>>| x.0),
Expand Down
2 changes: 2 additions & 0 deletions backend/windmill-common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub enum Error {
NotFound(String),
#[error("Not authorized: {0}")]
NotAuthorized(String),
#[error("Metric not found: {0}")]
MetricNotFound(String),
#[error("Permission denied: {0}")]
PermissionDenied(String),
#[error("Require Admin privileges for {0}")]
Expand Down
4 changes: 4 additions & 0 deletions backend/windmill-common/src/flow_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ struct UntaggedFlowStatusModule {
type_: String,
id: Option<String>,
count: Option<u16>,
progress: Option<u8>,
job: Option<Uuid>,
iterator: Option<Iterator>,
flow_jobs: Option<Vec<Uuid>>,
Expand Down Expand Up @@ -147,6 +148,8 @@ pub enum FlowStatusModule {
id: String,
job: Uuid,
#[serde(skip_serializing_if = "Option::is_none")]
progress: Option<u8>,
#[serde(skip_serializing_if = "Option::is_none")]
iterator: Option<Iterator>,
#[serde(skip_serializing_if = "Option::is_none")]
flow_jobs: Option<Vec<Uuid>>,
Expand Down Expand Up @@ -237,6 +240,7 @@ impl<'de> Deserialize<'de> for FlowStatusModule {
branchall: untagged.branchall,
parallel: untagged.parallel.unwrap_or(false),
while_loop: untagged.while_loop.unwrap_or(false),
progress: untagged.progress,
}),
"Success" => Ok(FlowStatusModule::Success {
id: untagged
Expand Down
2 changes: 1 addition & 1 deletion backend/windmill-common/src/job_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub async fn record_metric(
.await?;

if metric_kind_opt.is_none() {
return Err(error::Error::BadRequest(format!(
return Err(error::Error::MetricNotFound(format!(
"Metric {} not yet registered for job {}.",
metric_id, job_id
)));
Expand Down
Loading

0 comments on commit d6d4756

Please sign in to comment.