From d0ed82ff2c6e64ae7c8d25c3401fd8f1b45a8813 Mon Sep 17 00:00:00 2001 From: James Barford-Evans Date: Thu, 10 Jul 2025 14:56:36 +0100 Subject: [PATCH 1/3] Feat; job_queue table definition & mark a request as complete if all jobs are finished --- database/schema.md | 16 +- database/src/lib.rs | 68 +++++++ database/src/pool.rs | 324 +++++++++++++++++++++++++++++++++- database/src/pool/postgres.rs | 162 ++++++++++++++++- database/src/pool/sqlite.rs | 28 ++- site/src/job_queue/mod.rs | 18 +- 6 files changed, 604 insertions(+), 12 deletions(-) diff --git a/database/schema.md b/database/schema.md index 6c8194d7d..468ea0ac9 100644 --- a/database/schema.md +++ b/database/schema.md @@ -259,7 +259,6 @@ aid benchmark error 1 syn-1.0.89 Failed to compile... ``` - ## New benchmarking design We are currently implementing a new design for dispatching benchmarks to collector(s) and storing them in the database. It will support new use-cases, like backfilling of new benchmarks into a parent @@ -296,3 +295,18 @@ Columns: * `completed`: Completed request. * **backends** (`text NOT NULL`): Comma-separated list of codegen backends to benchmark. If empty, the default set of codegen backends will be benchmarked. * **profiles** (`text NOT NULL`): Comma-separated list of profiles to benchmark. If empty, the default set of profiles will be benchmarked. + +### job_queue + +This table stores ephemeral benchmark jobs, which specifically tell the +collector which benchmarks it should execute. The jobs will be kept in the +table for ~30 days after being completed, so that we can quickly figure out +what master parent jobs we need to backfill when handling try builds. + +``` +psql# SELECT * FROM job_queue limit 1; + +id request_id target backend benchmark_set collector_id created_at started_at completed_at status retry error +--- ----------- -------- -------- ------------- -------------- --------------------------- --------------------------- --------------------------- --------- ------ -------- +23 7 AArch64 llvm 5 collector-1 2025-07-10 09:00:00.123+00 2025-07-10 09:05:02.456+00 2025-07-10 09:15:17.890+00 complete 0 +``` diff --git a/database/src/lib.rs b/database/src/lib.rs index b171f47ee..8c5b4bc85 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -988,6 +988,10 @@ impl BenchmarkRequest { pub fn is_release(&self) -> bool { matches!(self.commit_type, BenchmarkRequestType::Release { .. }) } + + pub fn is_completed(&self) -> bool { + matches!(self.status, BenchmarkRequestStatus::Completed { .. }) + } } /// Cached information about benchmark requests in the DB @@ -1009,4 +1013,68 @@ impl BenchmarkRequestIndex { pub fn completed_requests(&self) -> &HashSet { &self.completed } + + pub fn add_tag(&mut self, tag: &str) { + self.completed.insert(tag.to_string()); + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum BenchmarkJobStatus { + Queued, + InProgress, + Success, + Failure, +} + +impl BenchmarkJobStatus { + pub fn as_str(&self) -> &str { + match self { + BenchmarkJobStatus::Queued => "queued", + BenchmarkJobStatus::InProgress => "in_progress", + BenchmarkJobStatus::Success => "success", + BenchmarkJobStatus::Failure => "failure", + } + } +} + +impl fmt::Display for BenchmarkJobStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct BenchmarkJob { + pub target: Target, + pub backend: CodegenBackend, + pub benchmark_set: u32, + pub collector_id: String, + pub created_at: Option>, + pub started_at: Option>, + pub completed_at: Option>, + pub status: BenchmarkJobStatus, + pub retry: u32, +} + +impl BenchmarkJob { + pub fn new( + target: Target, + backend: CodegenBackend, + benchmark_set: u32, + collector_id: &str, + status: BenchmarkJobStatus, + ) -> Self { + BenchmarkJob { + target, + backend, + benchmark_set, + collector_id: collector_id.to_string(), + created_at: None, + started_at: None, + completed_at: None, + status, + retry: 0, + } + } } diff --git a/database/src/pool.rs b/database/src/pool.rs index ac2fe31ac..a28d5e803 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -1,6 +1,6 @@ use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkRequest, BenchmarkRequestIndex, - BenchmarkRequestStatus, CodegenBackend, CompileBenchmark, Target, + ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkJob, BenchmarkRequest, + BenchmarkRequestIndex, BenchmarkRequestStatus, CodegenBackend, CompileBenchmark, Target, }; use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step}; use chrono::{DateTime, Utc}; @@ -210,6 +210,26 @@ pub trait Connection: Send + Sync { sha: &str, parent_sha: &str, ) -> anyhow::Result<()>; + + /// Try and mark the benchmark_request as completed. Will return `true` if + /// it has been marked as completed else `false` meaning there was no change + async fn try_mark_benchmark_request_as_completed( + &self, + benchmark_request: &mut BenchmarkRequest, + ) -> anyhow::Result; + + /// Given a benchmark request get the id + async fn get_benchmark_request_id( + &self, + benchmark_request: &BenchmarkRequest, + ) -> anyhow::Result; + + /// Insert a benchmark job into the `job_queue` + async fn insert_benchmark_job( + &self, + benchmark_request_id: u32, + job: &BenchmarkJob, + ) -> anyhow::Result<()>; } #[async_trait::async_trait] @@ -335,7 +355,7 @@ mod tests { use super::*; use crate::{ tests::{run_db_test, run_postgres_test}, - BenchmarkRequestStatus, BenchmarkRequestType, Commit, CommitType, Date, + BenchmarkJobStatus, BenchmarkRequestStatus, BenchmarkRequestType, Commit, CommitType, Date, }; /// Create a Commit @@ -347,6 +367,43 @@ mod tests { } } + async fn db_insert_jobs(conn: &dyn Connection, request_id: u32, jobs: &[BenchmarkJob]) { + for job in jobs { + conn.insert_benchmark_job(request_id, job).await.unwrap(); + } + } + + /// Create a try + fn create_try( + sha: Option<&str>, + parent_sha: Option<&str>, + pr: u32, + created_at: DateTime, + status: BenchmarkRequestStatus, + backends: &str, + profiles: &str, + ) -> BenchmarkRequest { + BenchmarkRequest { + commit_type: BenchmarkRequestType::Try { + sha: sha.map(|it| it.to_string()), + parent_sha: parent_sha.map(|it| it.to_string()), + pr, + }, + created_at, + status, + backends: backends.to_string(), + profiles: profiles.to_string(), + } + } + + async fn request_is_complete(conn: &dyn Connection, tag: &str) -> bool { + conn.load_benchmark_request_index() + .await + .unwrap() + .completed_requests() + .contains(tag) + } + #[tokio::test] async fn pstat_returns_empty_vector_when_empty() { run_db_test(|ctx| async { @@ -584,4 +641,265 @@ mod tests { }) .await; } + + // We can't insert jobs unless there is a corresponding benchmark request + #[tokio::test] + async fn insert_benchmark_job_fk_violation() { + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let db = db.connection().await; + let job = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 3, + "collector 1", + BenchmarkJobStatus::Queued, + ); + + assert!(db.insert_benchmark_job(1, &job).await.is_err()); + + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn insert_benchmark_job() { + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let db = db.connection().await; + let job = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 3, + "collector 1", + BenchmarkJobStatus::Queued, + ); + let request = create_try( + Some("s1"), + Some("p1"), + 3, + Utc::now(), + BenchmarkRequestStatus::InProgress, + "", + "", + ); + db.insert_benchmark_request(&request).await.unwrap(); + + assert!(db.insert_benchmark_job(1, &job).await.is_ok()); + + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn mark_request_completed_no_tag() { + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let db = db.connection().await; + let request_id = 1; + + let mut request = create_try( + None, + None, + 3, + Utc::now(), + BenchmarkRequestStatus::InProgress, + "", + "", + ); + db.insert_benchmark_request(&request).await.unwrap(); + + let job_1 = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 1, + "collector 1", + BenchmarkJobStatus::Success, + ); + let job_2 = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 2, + "collector 2", + BenchmarkJobStatus::Success, + ); + + db_insert_jobs(&*db, request_id, &[job_1, job_2]).await; + + assert!(db + .try_mark_benchmark_request_as_completed(&mut request) + .await + .is_err()); + + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn mark_request_completed_not_inprogress() { + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let db = db.connection().await; + let request_id = 1; + + let mut request = create_try( + Some("s1"), + Some("p1"), + 3, + Utc::now(), + BenchmarkRequestStatus::ArtifactsReady, + "", + "", + ); + db.insert_benchmark_request(&request).await.unwrap(); + + let job_1 = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 1, + "collector 1", + BenchmarkJobStatus::Success, + ); + let job_2 = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 2, + "collector 2", + BenchmarkJobStatus::Success, + ); + + db_insert_jobs(&*db, request_id, &[job_1, job_2]).await; + + assert!(db + .try_mark_benchmark_request_as_completed(&mut request) + .await + .is_err()); + + Ok(ctx) + }) + .await; + } + + // The case where the job is not complete + #[tokio::test] + async fn mark_request_completed_nop() { + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let db = db.connection().await; + let request_id = 1; + + let mut request = create_try( + Some("s1"), + Some("p1"), + 3, + Utc::now(), + BenchmarkRequestStatus::InProgress, + "", + "", + ); + db.insert_benchmark_request(&request).await.unwrap(); + + let job_1 = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 1, + "collector 1", + BenchmarkJobStatus::InProgress, + ); + let job_2 = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 2, + "collector 2", + BenchmarkJobStatus::Success, + ); + + db_insert_jobs(&*db, request_id, &[job_1, job_2]).await; + + assert!(db + .try_mark_benchmark_request_as_completed(&mut request) + .await + .is_ok()); + assert_eq!(request.status(), BenchmarkRequestStatus::InProgress); + + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn mark_request_completed() { + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let db = db.connection().await; + let request_id = 1; + + let mut request = create_try( + Some("s1"), + Some("p1"), + 3, + Utc::now(), + BenchmarkRequestStatus::InProgress, + "", + "", + ); + db.insert_benchmark_request(&request).await.unwrap(); + + let job_1 = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 1, + "collector 1", + BenchmarkJobStatus::Success, + ); + let job_2 = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 2, + "collector 2", + BenchmarkJobStatus::Success, + ); + + db_insert_jobs(&*db, request_id, &[job_1, job_2]).await; + + assert!(db + .try_mark_benchmark_request_as_completed(&mut request) + .await + .is_ok()); + // The struct should have been mutated + assert!(request.is_completed()); + // The tag should exist in the completed set + assert!(request_is_complete(&*db, request.tag().unwrap()).await); + + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn get_benchmark_request_id() { + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let db = db.connection().await; + let request = create_try( + Some("s1"), + Some("p1"), + 3, + Utc::now(), + BenchmarkRequestStatus::InProgress, + "", + "", + ); + db.insert_benchmark_request(&request).await.unwrap(); + + let retrieved = db.get_benchmark_request_id(&request).await.unwrap(); + assert_eq!(1, retrieved); + + Ok(ctx) + }) + .await; + } } diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index b8079f5a7..ed0435089 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1,9 +1,9 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkRequest, - BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestType, CodegenBackend, - CollectionId, Commit, CommitType, CompileBenchmark, Date, Index, Profile, QueuedCommit, - Scenario, Target, BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR, + ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkJob, BenchmarkJobStatus, + BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestType, + CodegenBackend, CollectionId, Commit, CommitType, CompileBenchmark, Date, Index, Profile, + QueuedCommit, Scenario, Target, BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR, BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR, BENCHMARK_REQUEST_STATUS_COMPLETED_STR, BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR, BENCHMARK_REQUEST_TRY_STR, }; @@ -309,6 +309,28 @@ static MIGRATIONS: &[&str] = &[ // Prevent multiple try commits without a `sha` and the same `pr` number // being added to the table r#"CREATE UNIQUE INDEX benchmark_request_pr_commit_type_idx ON benchmark_request (pr, commit_type) WHERE status != 'completed';"#, + r#" + CREATE TABLE IF NOT EXISTS job_queue ( + id SERIAL PRIMARY KEY, + request_id INTEGER NOT NULL, + target TEXT NOT NULL, + backend TEXT NOT NULL, + benchmark_set INTEGER NOT NULL, + collector_id TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + status TEXT NOT NULL, + retry INTEGER DEFAULT 0, + error TEXT, + + CONSTRAINT job_queue_request_fk + FOREIGN KEY (request_id) + REFERENCES benchmark_request(id) + ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS job_queue_request_id_idx ON job_queue (request_id); + "#, ]; #[async_trait::async_trait] @@ -1593,6 +1615,135 @@ where .collect(); Ok(requests) } + + async fn try_mark_benchmark_request_as_completed( + &self, + benchmark_request: &mut BenchmarkRequest, + ) -> anyhow::Result { + anyhow::ensure!( + benchmark_request.tag().is_some(), + "Benchmark request has no tag" + ); + anyhow::ensure!( + benchmark_request.status == BenchmarkRequestStatus::InProgress, + "Can only mark benchmark request whos status is in_progress as complete" + ); + + // Find if the benchmark is completed and update it's status to completed + // in one SQL block + let row = self + .conn() + .query_opt( + " + UPDATE + benchmark_request + SET + status = $1, + completed_at = NOW() + WHERE + benchmark_request.tag = $2 + AND benchmark_request.commit_type = $3 + AND benchmark_request.status = $4 + AND NOT EXISTS ( + SELECT + 1 + FROM + job_queue + WHERE job_queue.request_id = benchmark_request.id + AND job_queue.status NOT IN ($5, $6) + ) + RETURNING benchmark_request.id, benchmark_request.completed_at; + ", + &[ + &BENCHMARK_REQUEST_STATUS_COMPLETED_STR, + &benchmark_request.tag(), + &benchmark_request.commit_type, + &benchmark_request.status, + &BenchmarkJobStatus::Success, + &BenchmarkJobStatus::Failure, + ], + ) + .await + .context("Failed to get id for benchmark_request")?; + // The affected id is returned by the query thus we can use the row's + // presence to determine if the request was marked as completed + if let Some(row) = row { + let completed_at = row.get::<_, DateTime>(1); + // Also mutate our object + benchmark_request.status = BenchmarkRequestStatus::Completed { completed_at }; + Ok(true) + } else { + Ok(false) + } + } + + async fn get_benchmark_request_id( + &self, + benchmark_request: &BenchmarkRequest, + ) -> anyhow::Result { + anyhow::ensure!( + benchmark_request.tag().is_some(), + "Benchmark request has no tag" + ); + + let row = self + .conn() + .query_opt( + " + SELECT + id + FROM + benchmark_request + WHERE + tag = $1 + AND commit_type = $2 + AND status = $3;", + &[ + &benchmark_request.tag(), + &benchmark_request.commit_type, + &benchmark_request.status, + ], + ) + .await + .context("Failed to get id for benchmark_request")?; + + if let Some(row) = row { + Ok(row.get::<_, i32>(0) as u32) + } else { + Ok(1) + } + } + + async fn insert_benchmark_job( + &self, + request_id: u32, + benchmark_job: &BenchmarkJob, + ) -> anyhow::Result<()> { + self.conn() + .execute( + " + INSERT INTO job_queue( + request_id, + target, + backend, + benchmark_set, + collector_id, + status + ) + VALUES ($1, $2, $3, $4, $5, $6);", + &[ + &(request_id as i32), + &benchmark_job.target, + &benchmark_job.backend, + &(benchmark_job.benchmark_set as i32), + &benchmark_job.collector_id, + &benchmark_job.status, + ], + ) + .await + .context("Failed to insert benchmark job")?; + Ok(()) + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option>) -> ArtifactId { @@ -1638,6 +1789,9 @@ macro_rules! impl_to_postgresql_via_to_string { impl_to_postgresql_via_to_string!(BenchmarkRequestType); impl_to_postgresql_via_to_string!(BenchmarkRequestStatus); +impl_to_postgresql_via_to_string!(BenchmarkJobStatus); +impl_to_postgresql_via_to_string!(Target); +impl_to_postgresql_via_to_string!(CodegenBackend); #[cfg(test)] mod tests { diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index ecc28484f..4bf24da2c 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1,8 +1,8 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::{ - ArtifactCollection, ArtifactId, Benchmark, BenchmarkRequest, BenchmarkRequestIndex, - BenchmarkRequestStatus, CodegenBackend, CollectionId, Commit, CommitType, CompileBenchmark, - Date, Profile, Target, + ArtifactCollection, ArtifactId, Benchmark, BenchmarkJob, BenchmarkRequest, + BenchmarkRequestIndex, BenchmarkRequestStatus, CodegenBackend, CollectionId, Commit, + CommitType, CompileBenchmark, Date, Profile, Target, }; use crate::{ArtifactIdNumber, Index, QueuedCommit}; use chrono::{DateTime, TimeZone, Utc}; @@ -1296,6 +1296,28 @@ impl Connection for SqliteConnection { ) -> anyhow::Result<()> { no_queue_implementation_abort!() } + + async fn try_mark_benchmark_request_as_completed( + &self, + _benchmark_request: &mut BenchmarkRequest, + ) -> anyhow::Result { + no_queue_implementation_abort!() + } + + async fn get_benchmark_request_id( + &self, + _benchmark_request: &BenchmarkRequest, + ) -> anyhow::Result { + no_queue_implementation_abort!() + } + + async fn insert_benchmark_job( + &self, + _benchmark_request_id: u32, + _job: &BenchmarkJob, + ) -> anyhow::Result<()> { + no_queue_implementation_abort!() + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option) -> ArtifactId { diff --git a/site/src/job_queue/mod.rs b/site/src/job_queue/mod.rs index 43ac10986..e967bc70b 100644 --- a/site/src/job_queue/mod.rs +++ b/site/src/job_queue/mod.rs @@ -179,7 +179,23 @@ async fn enqueue_next_job( conn: &dyn database::pool::Connection, index: &mut BenchmarkRequestIndex, ) -> anyhow::Result<()> { - let _queue = build_queue(conn, index).await?; + let queue = build_queue(conn, index).await?; + for mut request in queue { + if request.status() != BenchmarkRequestStatus::InProgress { + // TODO: + // - Uncomment + // - Actually enqueue the jobs + // conn.update_benchmark_request_status(&request, BenchmarkRequestStatus::InProgress) + // .await?; + break; + } else if conn + .try_mark_benchmark_request_as_completed(&mut request) + .await? + { + index.add_tag(request.tag().unwrap()); + continue; + } + } Ok(()) } From 00603e2265bebc11dad08b7e23f6b58b1d834c63 Mon Sep 17 00:00:00 2001 From: James Barford-Evans Date: Mon, 14 Jul 2025 11:33:00 +0100 Subject: [PATCH 2/3] Use string constants --- database/src/lib.rs | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/database/src/lib.rs b/database/src/lib.rs index 8c5b4bc85..810242ec4 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -1027,13 +1027,18 @@ pub enum BenchmarkJobStatus { Failure, } +const BENCHMARK_JOB_STATUS_QUEUED_STR: &str = "queued"; +const BENCHMARK_JOB_STATUS_IN_PROGRESS_STR: &str = "in_progress"; +const BENCHMARK_JOB_STATUS_SUCCESS_STR: &str = "success"; +const BENCHMARK_JOB_STATUS_FAILURE_STR: &str = "failure"; + impl BenchmarkJobStatus { pub fn as_str(&self) -> &str { match self { - BenchmarkJobStatus::Queued => "queued", - BenchmarkJobStatus::InProgress => "in_progress", - BenchmarkJobStatus::Success => "success", - BenchmarkJobStatus::Failure => "failure", + BenchmarkJobStatus::Queued => BENCHMARK_JOB_STATUS_QUEUED_STR, + BenchmarkJobStatus::InProgress => BENCHMARK_JOB_STATUS_IN_PROGRESS_STR, + BenchmarkJobStatus::Success => BENCHMARK_JOB_STATUS_SUCCESS_STR, + BenchmarkJobStatus::Failure => BENCHMARK_JOB_STATUS_FAILURE_STR, } } } @@ -1046,15 +1051,15 @@ impl fmt::Display for BenchmarkJobStatus { #[derive(Debug, Clone, PartialEq)] pub struct BenchmarkJob { - pub target: Target, - pub backend: CodegenBackend, - pub benchmark_set: u32, - pub collector_id: String, - pub created_at: Option>, - pub started_at: Option>, - pub completed_at: Option>, - pub status: BenchmarkJobStatus, - pub retry: u32, + target: Target, + backend: CodegenBackend, + benchmark_set: u32, + collector_id: String, + created_at: Option>, + started_at: Option>, + completed_at: Option>, + status: BenchmarkJobStatus, + retry: u32, } impl BenchmarkJob { From f7f9747369f7f30aa8d72e6a1b8e4fa096dcb925 Mon Sep 17 00:00:00 2001 From: James Barford-Evans Date: Mon, 14 Jul 2025 15:41:12 +0100 Subject: [PATCH 3/3] PR Feedback; remove `get_request_id()`, reformat schema description, benchmark set made into a a struct --- database/schema.md | 30 ++++++++++++++++----- database/src/lib.rs | 13 ++++++--- database/src/pool.rs | 50 +++++++++++------------------------ database/src/pool/postgres.rs | 41 ++-------------------------- database/src/pool/sqlite.rs | 9 +------ site/src/job_queue/mod.rs | 2 +- 6 files changed, 52 insertions(+), 93 deletions(-) diff --git a/database/schema.md b/database/schema.md index 468ea0ac9..dbf58044f 100644 --- a/database/schema.md +++ b/database/schema.md @@ -303,10 +303,28 @@ collector which benchmarks it should execute. The jobs will be kept in the table for ~30 days after being completed, so that we can quickly figure out what master parent jobs we need to backfill when handling try builds. -``` -psql# SELECT * FROM job_queue limit 1; +Columns: -id request_id target backend benchmark_set collector_id created_at started_at completed_at status retry error ---- ----------- -------- -------- ------------- -------------- --------------------------- --------------------------- --------------------------- --------- ------ -------- -23 7 AArch64 llvm 5 collector-1 2025-07-10 09:00:00.123+00 2025-07-10 09:05:02.456+00 2025-07-10 09:15:17.890+00 complete 0 -``` +- **id** (`bigint` / `serial`): Primary-key identifier for the job row; + auto-increments with each new job. +- **request_id** (`bigint`): References the parent benchmark request that + spawned this job. +- **target** (`text NOT NULL`): Hardware/ISA the benchmarks must run on + (e.g. AArch64, x86_64). +- **backend** (`text NOT NULL`): Code generation backend the collector should + test (e.g. llvm, cranelift). +- **benchmark_set** (`int NOT NULL`): ID of the predefined benchmark suite to + execute. +- **collector_id** (`text`): Id of the collector that claimed the job + (populated once the job is started). +- **created_at** (`timestamptz NOT NULL`): Datetime when the job was queued. +- **started_at** (`timestamptz`): Datetime when the collector actually began + running the benchmarks; NULL until the job is claimed. +- **completed_at** (`timestampt`): Datetime when the collector finished + (successfully or otherwise); used to purge rows after ~30 days. +- **status** (`text NOT NULL`): Current job state. `queued`, `in_progress`, + `success`, or `failure`. +- **retry** (`int NOT NULL`): Number of times the job has been re-queued after + a failure; 0 on the first attempt. +- **error** (`text`): Optional error message or stack trace from the last + failed run; NULL when the job succeeded. diff --git a/database/src/lib.rs b/database/src/lib.rs index 810242ec4..910f0a173 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -1015,6 +1015,7 @@ impl BenchmarkRequestIndex { } pub fn add_tag(&mut self, tag: &str) { + self.all.insert(tag.to_string()); self.completed.insert(tag.to_string()); } } @@ -1049,13 +1050,16 @@ impl fmt::Display for BenchmarkJobStatus { } } +#[derive(Debug, Clone, PartialEq)] +pub struct BenchmarkSet(u32); + #[derive(Debug, Clone, PartialEq)] pub struct BenchmarkJob { target: Target, backend: CodegenBackend, - benchmark_set: u32, + benchmark_set: BenchmarkSet, collector_id: String, - created_at: Option>, + created_at: DateTime, started_at: Option>, completed_at: Option>, status: BenchmarkJobStatus, @@ -1068,14 +1072,15 @@ impl BenchmarkJob { backend: CodegenBackend, benchmark_set: u32, collector_id: &str, + created_at: DateTime, status: BenchmarkJobStatus, ) -> Self { BenchmarkJob { target, backend, - benchmark_set, + benchmark_set: BenchmarkSet(benchmark_set), collector_id: collector_id.to_string(), - created_at: None, + created_at, started_at: None, completed_at: None, status, diff --git a/database/src/pool.rs b/database/src/pool.rs index a28d5e803..965f92195 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -213,17 +213,11 @@ pub trait Connection: Send + Sync { /// Try and mark the benchmark_request as completed. Will return `true` if /// it has been marked as completed else `false` meaning there was no change - async fn try_mark_benchmark_request_as_completed( + async fn mark_benchmark_request_as_completed( &self, benchmark_request: &mut BenchmarkRequest, ) -> anyhow::Result; - /// Given a benchmark request get the id - async fn get_benchmark_request_id( - &self, - benchmark_request: &BenchmarkRequest, - ) -> anyhow::Result; - /// Insert a benchmark job into the `job_queue` async fn insert_benchmark_job( &self, @@ -653,6 +647,7 @@ mod tests { CodegenBackend::Llvm, 3, "collector 1", + Utc::now(), BenchmarkJobStatus::Queued, ); @@ -673,6 +668,7 @@ mod tests { CodegenBackend::Llvm, 3, "collector 1", + Utc::now(), BenchmarkJobStatus::Queued, ); let request = create_try( @@ -716,6 +712,7 @@ mod tests { CodegenBackend::Llvm, 1, "collector 1", + Utc::now(), BenchmarkJobStatus::Success, ); let job_2 = BenchmarkJob::new( @@ -723,13 +720,14 @@ mod tests { CodegenBackend::Llvm, 2, "collector 2", + Utc::now(), BenchmarkJobStatus::Success, ); db_insert_jobs(&*db, request_id, &[job_1, job_2]).await; assert!(db - .try_mark_benchmark_request_as_completed(&mut request) + .mark_benchmark_request_as_completed(&mut request) .await .is_err()); @@ -761,6 +759,7 @@ mod tests { CodegenBackend::Llvm, 1, "collector 1", + Utc::now(), BenchmarkJobStatus::Success, ); let job_2 = BenchmarkJob::new( @@ -768,13 +767,14 @@ mod tests { CodegenBackend::Llvm, 2, "collector 2", + Utc::now(), BenchmarkJobStatus::Success, ); db_insert_jobs(&*db, request_id, &[job_1, job_2]).await; assert!(db - .try_mark_benchmark_request_as_completed(&mut request) + .mark_benchmark_request_as_completed(&mut request) .await .is_err()); @@ -807,6 +807,7 @@ mod tests { CodegenBackend::Llvm, 1, "collector 1", + Utc::now(), BenchmarkJobStatus::InProgress, ); let job_2 = BenchmarkJob::new( @@ -814,13 +815,14 @@ mod tests { CodegenBackend::Llvm, 2, "collector 2", + Utc::now(), BenchmarkJobStatus::Success, ); db_insert_jobs(&*db, request_id, &[job_1, job_2]).await; assert!(db - .try_mark_benchmark_request_as_completed(&mut request) + .mark_benchmark_request_as_completed(&mut request) .await .is_ok()); assert_eq!(request.status(), BenchmarkRequestStatus::InProgress); @@ -853,6 +855,7 @@ mod tests { CodegenBackend::Llvm, 1, "collector 1", + Utc::now(), BenchmarkJobStatus::Success, ); let job_2 = BenchmarkJob::new( @@ -860,13 +863,14 @@ mod tests { CodegenBackend::Llvm, 2, "collector 2", + Utc::now(), BenchmarkJobStatus::Success, ); db_insert_jobs(&*db, request_id, &[job_1, job_2]).await; assert!(db - .try_mark_benchmark_request_as_completed(&mut request) + .mark_benchmark_request_as_completed(&mut request) .await .is_ok()); // The struct should have been mutated @@ -878,28 +882,4 @@ mod tests { }) .await; } - - #[tokio::test] - async fn get_benchmark_request_id() { - run_postgres_test(|ctx| async { - let db = ctx.db_client(); - let db = db.connection().await; - let request = create_try( - Some("s1"), - Some("p1"), - 3, - Utc::now(), - BenchmarkRequestStatus::InProgress, - "", - "", - ); - db.insert_benchmark_request(&request).await.unwrap(); - - let retrieved = db.get_benchmark_request_id(&request).await.unwrap(); - assert_eq!(1, retrieved); - - Ok(ctx) - }) - .await; - } } diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index ed0435089..6f554982f 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1616,7 +1616,7 @@ where Ok(requests) } - async fn try_mark_benchmark_request_as_completed( + async fn mark_benchmark_request_as_completed( &self, benchmark_request: &mut BenchmarkRequest, ) -> anyhow::Result { @@ -1677,43 +1677,6 @@ where } } - async fn get_benchmark_request_id( - &self, - benchmark_request: &BenchmarkRequest, - ) -> anyhow::Result { - anyhow::ensure!( - benchmark_request.tag().is_some(), - "Benchmark request has no tag" - ); - - let row = self - .conn() - .query_opt( - " - SELECT - id - FROM - benchmark_request - WHERE - tag = $1 - AND commit_type = $2 - AND status = $3;", - &[ - &benchmark_request.tag(), - &benchmark_request.commit_type, - &benchmark_request.status, - ], - ) - .await - .context("Failed to get id for benchmark_request")?; - - if let Some(row) = row { - Ok(row.get::<_, i32>(0) as u32) - } else { - Ok(1) - } - } - async fn insert_benchmark_job( &self, request_id: u32, @@ -1735,7 +1698,7 @@ where &(request_id as i32), &benchmark_job.target, &benchmark_job.backend, - &(benchmark_job.benchmark_set as i32), + &(benchmark_job.benchmark_set.0 as i32), &benchmark_job.collector_id, &benchmark_job.status, ], diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index 4bf24da2c..2d80f734a 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1297,20 +1297,13 @@ impl Connection for SqliteConnection { no_queue_implementation_abort!() } - async fn try_mark_benchmark_request_as_completed( + async fn mark_benchmark_request_as_completed( &self, _benchmark_request: &mut BenchmarkRequest, ) -> anyhow::Result { no_queue_implementation_abort!() } - async fn get_benchmark_request_id( - &self, - _benchmark_request: &BenchmarkRequest, - ) -> anyhow::Result { - no_queue_implementation_abort!() - } - async fn insert_benchmark_job( &self, _benchmark_request_id: u32, diff --git a/site/src/job_queue/mod.rs b/site/src/job_queue/mod.rs index e967bc70b..3e51fcb26 100644 --- a/site/src/job_queue/mod.rs +++ b/site/src/job_queue/mod.rs @@ -189,7 +189,7 @@ async fn enqueue_next_job( // .await?; break; } else if conn - .try_mark_benchmark_request_as_completed(&mut request) + .mark_benchmark_request_as_completed(&mut request) .await? { index.add_tag(request.tag().unwrap());