Skip to content

Commit

Permalink
feat(cli): add queues, workers and worker-groups commands (#4439)
Browse files Browse the repository at this point in the history
* all

* all

* all

* all

* all

* all

* all

* all

* all

* all

* all

* all

* all

* all
  • Loading branch information
rubenfiszel authored Sep 26, 2024
1 parent c2ae4fc commit 610bb7b
Show file tree
Hide file tree
Showing 48 changed files with 2,229 additions and 1,058 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Add down migration script here
ALTER TABLE worker_ping
DROP COLUMN occupancy_rate_15s,
DROP COLUMN occupancy_rate_5m,
DROP COLUMN occupancy_rate_30m;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Add up migration script here
ALTER TABLE worker_ping
ADD COLUMN occupancy_rate_15s REAL,
ADD COLUMN occupancy_rate_5m REAL,
ADD COLUMN occupancy_rate_30m REAL;
2 changes: 2 additions & 0 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ async fn cache_hub_scripts(file_path: Option<String>) -> anyhow::Result<()> {
"global",
"global",
"",
&mut None,
)
.await?;
tokio::fs::remove_dir_all(job_dir).await?;
Expand All @@ -178,6 +179,7 @@ async fn cache_hub_scripts(file_path: Option<String>) -> anyhow::Result<()> {
"cache_init",
windmill_worker::get_common_bun_proc_envs(None).await,
false,
&mut None,
)
.await?;
} else {
Expand Down
87 changes: 41 additions & 46 deletions backend/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1071,73 +1071,68 @@ pub async fn monitor_db(
}

pub async fn expose_queue_metrics(db: &Pool<Postgres>) {
let tx = db.begin().await;
if let Ok(mut tx) = tx {
let last_check = sqlx::query_scalar!(
let last_check = sqlx::query_scalar!(
"SELECT created_at FROM metrics WHERE id LIKE 'queue_count_%' ORDER BY created_at DESC LIMIT 1"
)
.fetch_optional(db)
.await
.unwrap_or(Some(chrono::Utc::now()));

let metrics_enabled = METRICS_ENABLED.load(std::sync::atomic::Ordering::Relaxed);
let save_metrics = last_check
.map(|last_check| chrono::Utc::now() - last_check > chrono::Duration::seconds(25))
.unwrap_or(true);
let metrics_enabled = METRICS_ENABLED.load(std::sync::atomic::Ordering::Relaxed);
let save_metrics = last_check
.map(|last_check| chrono::Utc::now() - last_check > chrono::Duration::seconds(25))
.unwrap_or(true);

if metrics_enabled || save_metrics {
let queue_counts = sqlx::query!(
"SELECT tag, count(*) as count FROM queue WHERE
if metrics_enabled || save_metrics {
let queue_counts = sqlx::query!(
"SELECT tag, count(*) as count FROM queue WHERE
scheduled_for <= now() - ('3 seconds')::interval AND running = false
GROUP BY tag"
)
.fetch_all(&mut *tx)
.await
.ok()
.unwrap_or_else(|| vec![]);
)
.fetch_all(db)
.await
.ok()
.unwrap_or_else(|| vec![]);

for q in queue_counts {
let count = q.count.unwrap_or(0);
let tag = q.tag;
if metrics_enabled {
let metric = (*QUEUE_COUNT).with_label_values(&[&tag]);
metric.set(count as i64);
}
for q in queue_counts {
let count = q.count.unwrap_or(0);
let tag = q.tag;
if metrics_enabled {
let metric = (*QUEUE_COUNT).with_label_values(&[&tag]);
metric.set(count as i64);
}

// save queue_count and delay metrics per tag
if save_metrics {
// save queue_count and delay metrics per tag
if save_metrics {
sqlx::query!(
"INSERT INTO metrics (id, value) VALUES ($1, $2)",
format!("queue_count_{}", tag),
serde_json::json!(count)
)
.execute(db)
.await
.ok();
if count > 0 {
sqlx::query!(
"INSERT INTO metrics (id, value) VALUES ($1, $2)",
format!("queue_count_{}", tag),
serde_json::json!(count)
)
.execute(&mut *tx)
.await
.ok();
if count > 0 {
sqlx::query!(
"INSERT INTO metrics (id, value)
VALUES ($1, to_jsonb((SELECT EXTRACT(EPOCH FROM now() - scheduled_for)
FROM queue WHERE tag = $2 AND running = false AND scheduled_for <= now() - ('3 seconds')::interval
ORDER BY priority DESC NULLS LAST, scheduled_for, created_at LIMIT 1)))",
ORDER BY priority DESC NULLS LAST, scheduled_for LIMIT 1)))",
format!("queue_delay_{}", tag),
tag
).execute(&mut *tx).await.ok();
}
).execute(db).await.ok();
}
}
}

// clean queue metrics older than 14 days
sqlx::query!(
"DELETE FROM metrics WHERE id LIKE 'queue_%' AND created_at < NOW() - INTERVAL '14 day'"
)
.execute(&mut *tx)
.await
.ok();

tx.commit().await.ok();
}

// clean queue metrics older than 14 days
sqlx::query!(
"DELETE FROM metrics WHERE id LIKE 'queue_%' AND created_at < NOW() - INTERVAL '14 day'"
)
.execute(db)
.await
.ok();
}

pub async fn reload_smtp_config(db: &Pool<Postgres>) {
Expand Down
44 changes: 44 additions & 0 deletions backend/windmill-api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5964,6 +5964,43 @@ paths:
schema:
type: integer

/jobs/completed/count_by_tag:
get:
summary: Count jobs by tag
operationId: countJobsByTag
tags:
- job
parameters:
- name: horizon_secs
in: query
description: Past Time horizon in seconds (when to start the count = now - horizon) (default is 3600)
required: false
schema:
type: integer
- name: workspace_id
in: query
description: Specific workspace ID to filter results (optional)
required: false
schema:
type: string
responses:
"200":
description: Job counts by tag
content:
application/json:
schema:
type: array
items:
type: object
properties:
tag:
type: string
count:
type: integer
required:
- tag
- count

/w/{workspace}/jobs_u/get/{id}:
get:
summary: get job
Expand Down Expand Up @@ -6498,6 +6535,7 @@ paths:
schema:
type: string


/w/{workspace}/jobs_u/cancel/{id}/{resume_id}/{signature}:
get:
summary: cancel a job for a suspended flow
Expand Down Expand Up @@ -11158,6 +11196,12 @@ components:
type: string
occupancy_rate:
type: number
occupancy_rate_15s:
type: number
occupancy_rate_5m:
type: number
occupancy_rate_30m:
type: number
memory:
type: number
vcpus:
Expand Down
17 changes: 13 additions & 4 deletions backend/windmill-api/src/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,26 @@ pub fn global_service() -> Router {

#[derive(Serialize, Deserialize, FromRow)]
struct Config {
name: String,
name: Option<String>,
config: serde_json::Value,
}

async fn list_worker_groups(
authed: ApiAuthed,
Extension(db): Extension<DB>,
) -> error::JsonResult<Vec<Config>> {
let configs_raw = sqlx::query_as!(Config, "SELECT * FROM config WHERE name LIKE 'worker__%'")
.fetch_all(&db)
.await?;
let mut configs_raw =
sqlx::query_as!(Config, "SELECT * FROM config WHERE name LIKE 'worker__%'")
.fetch_all(&db)
.await?;
// Remove the 'worker__' prefix from all config names
for config in configs_raw.iter_mut() {
if let Some(name) = &config.name {
if name.starts_with("worker__") {
config.name = Some(name.strip_prefix("worker__").unwrap().to_string());
}
}
}
let configs = if !authed.is_admin {
let mut obfuscated_configs: Vec<Config> = vec![];
for config in configs_raw {
Expand Down
Loading

0 comments on commit 610bb7b

Please sign in to comment.