From ec6b97486ef65fe1bcae05246f792cd9a7ecd978 Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Thu, 26 Sep 2024 14:27:33 +0200 Subject: [PATCH] improve list queue api --- backend/windmill-api/src/jobs.rs | 11 ++++++++++- backend/windmill-common/src/utils.rs | 2 +- cli/queues.ts | 11 +++++++++-- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 82e58c5322d8f..d6ce16dcae68d 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -1254,13 +1254,16 @@ pub fn list_queue_jobs_query( w_id: &str, lq: &ListQueueQuery, fields: &[&str], + pagination: Pagination, join_outstanding_wait_times: bool, tags: Option>, ) -> SqlBuilder { + let (limit, offset) = paginate(pagination); let mut sqlb = SqlBuilder::select_from("queue") .fields(fields) .order_by("created_at", lq.order_desc.unwrap_or(true)) - .limit(1000) + .limit(limit) + .offset(offset) .clone(); if let Some(tags) = tags { @@ -1273,6 +1276,7 @@ pub fn list_queue_jobs_query( #[derive(Serialize, FromRow)] struct ListableQueuedJob { pub id: Uuid, + pub running: bool, pub created_by: String, pub created_at: chrono::DateTime, pub started_at: Option>, @@ -1295,6 +1299,7 @@ async fn list_queue_jobs( authed: ApiAuthed, Extension(user_db): Extension, Path(w_id): Path, + Query(pagination): Query, Query(lq): Query, ) -> error::JsonResult> { let sql = list_queue_jobs_query( @@ -1302,6 +1307,7 @@ async fn list_queue_jobs( &lq, &[ "id", + "running", "created_by", "created_at", "started_at", @@ -1321,6 +1327,7 @@ async fn list_queue_jobs( "priority", "workspace_id", ], + pagination, false, get_scope_tags(&authed), ) @@ -1576,6 +1583,7 @@ async fn list_jobs( ) -> error::JsonResult> { check_scopes(&authed, || format!("jobs:listjobs"))?; + let limit = pagination.per_page.unwrap_or(1000); let (per_page, offset) = paginate(pagination); let lqc = lq.clone(); @@ -1607,6 +1615,7 @@ async fn list_jobs( &w_id, &ListQueueQuery { order_desc: Some(true), ..lq.into() }, UnifiedJob::queued_job_fields(), + Pagination { per_page: Some(limit), page: None }, true, get_scope_tags(&authed), ); diff --git a/backend/windmill-common/src/utils.rs b/backend/windmill-common/src/utils.rs index 8bbe541f07a28..97a68fe40562f 100644 --- a/backend/windmill-common/src/utils.rs +++ b/backend/windmill-common/src/utils.rs @@ -38,7 +38,7 @@ lazy_static::lazy_static! { .build().unwrap(); } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct Pagination { pub page: Option, pub per_page: Option, diff --git a/cli/queues.ts b/cli/queues.ts index 1d027bf5038be..f93bbd9da9f04 100644 --- a/cli/queues.ts +++ b/cli/queues.ts @@ -8,6 +8,7 @@ type Data = { later: number; waiting: number; running: number; + runningFlows: number; rps30s: string; rps5min: string; rps30min: string; @@ -29,6 +30,7 @@ function createRow(tag: string, data: Record) { waiting: 0, later: 0, running: 0, + runningFlows: 0, rps30s: "", rps5min: "", rps30min: "", @@ -68,7 +70,11 @@ async function displayQueues(opts: GlobalOptions, workspace?: string) { createRow(job.tag, data); const scheduledFor = new Date(job.scheduled_for ?? ""); if (job.running) { - data[job.tag].running += 1; + if (job.job_kind === "flow" || job.job_kind === "flowpreview") { + data[job.tag].runningFlows += 1; + } else { + data[job.tag].running += 1; + } } else if (scheduledFor <= nowFromDb) { data[job.tag].waiting += 1; } else { @@ -101,7 +107,8 @@ async function displayQueues(opts: GlobalOptions, workspace?: string) { const table = new Table(); table.header([ "", - "Running", + "Jobs", + "Active Flows", "Waiting", "Later", "RPS (30s)",