Skip to content

Commit

Permalink
improve list queue api
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Sep 26, 2024
1 parent 9f91b19 commit 1656c1c
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 4 deletions.
11 changes: 10 additions & 1 deletion backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1254,13 +1254,16 @@ pub fn list_queue_jobs_query(
w_id: &str,
lq: &ListQueueQuery,
fields: &[&str],
pagination: Pagination,
join_outstanding_wait_times: bool,
tags: Option<Vec<&str>>,
) -> SqlBuilder {
let (limit, offset) = paginate(pagination);
let mut sqlb = SqlBuilder::select_from("queue")
.fields(fields)
.order_by("created_at", lq.order_desc.unwrap_or(true))
.limit(1000)
.limit(limit)
.offset(offset)
.clone();

if let Some(tags) = tags {
Expand All @@ -1273,6 +1276,7 @@ pub fn list_queue_jobs_query(
#[derive(Serialize, FromRow)]
struct ListableQueuedJob {
pub id: Uuid,
pub running: bool,
pub created_by: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub started_at: Option<chrono::DateTime<chrono::Utc>>,
Expand All @@ -1295,13 +1299,15 @@ async fn list_queue_jobs(
authed: ApiAuthed,
Extension(user_db): Extension<UserDB>,
Path(w_id): Path<String>,
Query(pagination): Query<Pagination>,
Query(lq): Query<ListQueueQuery>,
) -> error::JsonResult<Vec<ListableQueuedJob>> {
let sql = list_queue_jobs_query(
&w_id,
&lq,
&[
"id",
"running",
"created_by",
"created_at",
"started_at",
Expand All @@ -1321,6 +1327,7 @@ async fn list_queue_jobs(
"priority",
"workspace_id",
],
pagination,
false,
get_scope_tags(&authed),
)
Expand Down Expand Up @@ -1576,6 +1583,7 @@ async fn list_jobs(
) -> error::JsonResult<Vec<Job>> {
check_scopes(&authed, || format!("jobs:listjobs"))?;

let limit = pagination.per_page.unwrap_or(1000);
let (per_page, offset) = paginate(pagination);
let lqc = lq.clone();

Expand Down Expand Up @@ -1607,6 +1615,7 @@ async fn list_jobs(
&w_id,
&ListQueueQuery { order_desc: Some(true), ..lq.into() },
UnifiedJob::queued_job_fields(),
Pagination { per_page: Some(limit), page: None },
true,
get_scope_tags(&authed),
);
Expand Down
2 changes: 1 addition & 1 deletion backend/windmill-common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ lazy_static::lazy_static! {
.build().unwrap();
}

#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct Pagination {
pub page: Option<usize>,
pub per_page: Option<usize>,
Expand Down
12 changes: 10 additions & 2 deletions cli/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type Data = {
later: number;
waiting: number;
running: number;
runningFlows: number;
rps30s: string;
rps5min: string;
rps30min: string;
Expand All @@ -29,6 +30,7 @@ function createRow(tag: string, data: Record<string, Data>) {
waiting: 0,
later: 0,
running: 0,
runningFlows: 0,
rps30s: "",
rps5min: "",
rps30min: "",
Expand Down Expand Up @@ -68,7 +70,12 @@ async function displayQueues(opts: GlobalOptions, workspace?: string) {
createRow(job.tag, data);
const scheduledFor = new Date(job.scheduled_for ?? "");
if (job.running) {
data[job.tag].running += 1;
if (job.job_kind === "flow" || job.job_kind === "flowpreview") {
data[job.tag].runningFlows += 1;
} else {
log.info(job)
data[job.tag].running += 1;
}
} else if (scheduledFor <= nowFromDb) {
data[job.tag].waiting += 1;
} else {
Expand Down Expand Up @@ -101,7 +108,8 @@ async function displayQueues(opts: GlobalOptions, workspace?: string) {
const table = new Table();
table.header([
"",
"Running",
"Jobs",
"Active Flows",
"Waiting",
"Later",
"RPS (30s)",
Expand Down

0 comments on commit 1656c1c

Please sign in to comment.