From bca4099d1bc4ba45f4b637423a70f54b75d08e9c Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Thu, 26 Sep 2024 16:52:09 +0200 Subject: [PATCH] add suspended flows to cli queues --- backend/windmill-api/openapi.yaml | 2 ++ backend/windmill-api/src/jobs.rs | 2 +- backend/windmill-common/src/utils.rs | 6 ++++++ cli/gen/types.gen.ts | 1 + cli/queues.ts | 13 ++++++++++--- 5 files changed, 20 insertions(+), 4 deletions(-) diff --git a/backend/windmill-api/openapi.yaml b/backend/windmill-api/openapi.yaml index b4954fb482358..370e2e0ca6ba1 100644 --- a/backend/windmill-api/openapi.yaml +++ b/backend/windmill-api/openapi.yaml @@ -9987,6 +9987,8 @@ components: type: number aggregate_wait_time_ms: type: number + suspend: + type: number required: - id - running diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index d6ce16dcae68d..d187914abbe1f 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -1258,7 +1258,7 @@ pub fn list_queue_jobs_query( join_outstanding_wait_times: bool, tags: Option>, ) -> SqlBuilder { - let (limit, offset) = paginate(pagination); + let (limit, offset) = paginate_without_limits(pagination); let mut sqlb = SqlBuilder::select_from("queue") .fields(fields) .order_by("created_at", lq.order_desc.unwrap_or(true)) diff --git a/backend/windmill-common/src/utils.rs b/backend/windmill-common/src/utils.rs index 97a68fe40562f..8676878352a8d 100644 --- a/backend/windmill-common/src/utils.rs +++ b/backend/windmill-common/src/utils.rs @@ -82,6 +82,12 @@ pub fn paginate(pagination: Pagination) -> (usize, usize) { (per_page, offset) } +pub fn paginate_without_limits(pagination: Pagination) -> (usize, usize) { + let per_page = pagination.per_page.unwrap_or(MAX_PER_PAGE); + let offset = (pagination.page.unwrap_or(1).max(1) - 1) * per_page; + (per_page, offset) +} + pub async fn now_from_db<'c, E: sqlx::PgExecutor<'c>>( db: E, ) -> Result> { diff --git a/cli/gen/types.gen.ts b/cli/gen/types.gen.ts index 4e62e030b50a7..92bfe933d9a96 100644 --- a/cli/gen/types.gen.ts +++ b/cli/gen/types.gen.ts @@ -160,6 +160,7 @@ export type QueuedJob = { priority?: number; self_wait_time_ms?: number; aggregate_wait_time_ms?: number; + suspend?: number; }; export type job_kind = 'script' | 'preview' | 'dependencies' | 'flowdependencies' | 'appdependencies' | 'flow' | 'flowpreview' | 'script_hub' | 'identity' | 'deploymentcallback' | 'singlescriptflow'; diff --git a/cli/queues.ts b/cli/queues.ts index 1d027bf5038be..12586f1abb5bf 100644 --- a/cli/queues.ts +++ b/cli/queues.ts @@ -6,6 +6,7 @@ import { pickInstance } from "./instance.ts"; type Data = { count: number; later: number; + suspended: number; waiting: number; running: number; rps30s: string; @@ -29,6 +30,7 @@ function createRow(tag: string, data: Record) { waiting: 0, later: 0, running: 0, + suspended: 0, rps30s: "", rps5min: "", rps30min: "", @@ -40,7 +42,7 @@ async function displayQueues(opts: GlobalOptions, workspace?: string) { const activeInstance = await pickInstance(opts, true); if (activeInstance) { try { - const queuedJobs = await wmill.listQueue({workspace: workspace ?? 'admins', allWorkspaces: workspace === undefined}); + const queuedJobs = await wmill.listQueue({workspace: workspace ?? 'admins', allWorkspaces: workspace === undefined, perPage: 100000}); const jobCounts30s = await wmill.countJobsByTag({ horizonSecs: 30, workspaceId: workspace, @@ -68,7 +70,11 @@ async function displayQueues(opts: GlobalOptions, workspace?: string) { createRow(job.tag, data); const scheduledFor = new Date(job.scheduled_for ?? ""); if (job.running) { - data[job.tag].running += 1; + if (job.suspend) { + data[job.tag].suspended += 1; + } else { + data[job.tag].running += 1; + } } else if (scheduledFor <= nowFromDb) { data[job.tag].waiting += 1; } else { @@ -104,6 +110,7 @@ async function displayQueues(opts: GlobalOptions, workspace?: string) { "Running", "Waiting", "Later", + "Suspended", "RPS (30s)", "RPS (5min)", "RPS (30min)", @@ -111,7 +118,7 @@ async function displayQueues(opts: GlobalOptions, workspace?: string) { ]); let body = [] for (const tag in data) { - body.push([tag, data[tag].running, data[tag].waiting, data[tag].later, data[tag].rps30s, data[tag].rps5min, data[tag].rps30min, data[tag].rps24h]); + body.push([tag, data[tag].running, data[tag].waiting, data[tag].later, data[tag].suspended, data[tag].rps30s, data[tag].rps5min, data[tag].rps30min, data[tag].rps24h]); } table.body(body).render();