From f93d0a39bae2e117735150125ccbdfecad4b0658 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Tue, 6 Jan 2026 15:20:15 -0500 Subject: [PATCH 1/6] feat(store): Create a trait for the taskbroker store This trait currently only has a SQLite implementation, but in the future we can add other implementations like AlloyDB and the stateless taskbroker. Future iterations on this trait might also remove some functions (e.g. vacuum_db) but for now the trait simply used every public function. --- Cargo.lock | 1 + Cargo.toml | 1 + src/grpc/server.rs | 2 +- src/grpc/server_tests.rs | 4 +- src/kafka/inflight_activation_writer.rs | 21 +-- src/main.rs | 6 +- src/store/inflight_activation.rs | 229 +++++++++++++++++------- src/store/inflight_activation_tests.rs | 82 ++++----- src/test_utils.rs | 8 +- src/upkeep.rs | 17 +- 10 files changed, 235 insertions(+), 136 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cf3f6400..9254d010 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2873,6 +2873,7 @@ name = "taskbroker" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "bytes", "chrono", "clap", diff --git a/Cargo.toml b/Cargo.toml index 4ca8308f..442e7f7b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ debug = 1 [dependencies] anyhow = "1.0.92" +async-trait = "0.1" bytes = "1.10.0" chrono = { version = "0.4.26" } clap = { version = "4.5.20", features = ["derive"] } diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 99fe03d8..a80ce944 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -13,7 +13,7 @@ use crate::store::inflight_activation::{InflightActivationStatus, InflightActiva use tracing::{error, instrument}; pub struct TaskbrokerServer { - pub store: Arc, + pub store: Arc, } #[tonic::async_trait] diff --git a/src/grpc/server_tests.rs b/src/grpc/server_tests.rs index 6387b44d..99ae5c57 100644 --- a/src/grpc/server_tests.rs +++ b/src/grpc/server_tests.rs @@ -1,9 +1,9 @@ +use crate::grpc::server::TaskbrokerServer; +use crate::store::inflight_activation::InflightActivationStore; use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService; use sentry_protos::taskbroker::v1::{FetchNextTask, GetTaskRequest, SetTaskStatusRequest}; use tonic::{Code, Request}; -use crate::grpc::server::TaskbrokerServer; - use crate::test_utils::{create_test_store, make_activations}; #[tokio::test] diff --git a/src/kafka/inflight_activation_writer.rs b/src/kafka/inflight_activation_writer.rs index 3f498567..3bf93d35 100644 --- a/src/kafka/inflight_activation_writer.rs +++ b/src/kafka/inflight_activation_writer.rs @@ -44,12 +44,12 @@ impl ActivationWriterConfig { pub struct InflightActivationWriter { config: ActivationWriterConfig, - store: Arc, + store: Arc, batch: Option>, } impl InflightActivationWriter { - pub fn new(store: Arc, config: ActivationWriterConfig) -> Self { + pub fn new(store: Arc, config: ActivationWriterConfig) -> Self { Self { config, store, @@ -214,6 +214,7 @@ mod tests { use crate::store::inflight_activation::{ InflightActivationStatus, InflightActivationStore, InflightActivationStoreConfig, + SqliteActivationStore, }; use crate::test_utils::generate_unique_namespace; use crate::test_utils::make_activations; @@ -231,7 +232,7 @@ mod tests { }; let mut writer = InflightActivationWriter::new( Arc::new( - InflightActivationStore::new( + SqliteActivationStore::new( &generate_temp_filename(), InflightActivationStoreConfig::from_config(&create_integration_config()), ) @@ -342,7 +343,7 @@ mod tests { }; let mut writer = InflightActivationWriter::new( Arc::new( - InflightActivationStore::new( + SqliteActivationStore::new( &generate_temp_filename(), InflightActivationStoreConfig::from_config(&create_integration_config()), ) @@ -409,7 +410,7 @@ mod tests { }; let mut writer = InflightActivationWriter::new( Arc::new( - InflightActivationStore::new( + SqliteActivationStore::new( &generate_temp_filename(), InflightActivationStoreConfig::from_config(&create_integration_config()), ) @@ -480,7 +481,7 @@ mod tests { }; let mut writer = InflightActivationWriter::new( Arc::new( - InflightActivationStore::new( + SqliteActivationStore::new( &generate_temp_filename(), InflightActivationStoreConfig::from_config(&create_integration_config()), ) @@ -592,7 +593,7 @@ mod tests { }; let mut writer = InflightActivationWriter::new( Arc::new( - InflightActivationStore::new( + SqliteActivationStore::new( &generate_temp_filename(), InflightActivationStoreConfig::from_config(&create_integration_config()), ) @@ -703,7 +704,7 @@ mod tests { write_failure_backoff_ms: 4000, }; let store = Arc::new( - InflightActivationStore::new( + SqliteActivationStore::new( &generate_temp_filename(), InflightActivationStoreConfig::from_config(&create_integration_config()), ) @@ -857,7 +858,7 @@ mod tests { write_failure_backoff_ms: 4000, }; let store = Arc::new( - InflightActivationStore::new( + SqliteActivationStore::new( &generate_temp_filename(), InflightActivationStoreConfig::from_config(&create_integration_config()), ) @@ -891,7 +892,7 @@ mod tests { write_failure_backoff_ms: 4000, }; let store = Arc::new( - InflightActivationStore::new( + SqliteActivationStore::new( &generate_temp_filename(), InflightActivationStoreConfig::from_config(&create_integration_config()), ) diff --git a/src/main.rs b/src/main.rs index 3998e116..5f0eee52 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,7 +31,7 @@ use taskbroker::metrics; use taskbroker::processing_strategy; use taskbroker::runtime_config::RuntimeConfigManager; use taskbroker::store::inflight_activation::{ - InflightActivationStore, InflightActivationStoreConfig, + InflightActivationStore, InflightActivationStoreConfig, SqliteActivationStore, }; use taskbroker::{Args, get_version}; use tonic_health::ServingStatus; @@ -62,8 +62,8 @@ async fn main() -> Result<(), Error> { logging::init(logging::LoggingConfig::from_config(&config)); metrics::init(metrics::MetricsConfig::from_config(&config)); - let store = Arc::new( - InflightActivationStore::new( + let store: Arc = Arc::new( + SqliteActivationStore::new( &config.db_path, InflightActivationStoreConfig::from_config(&config), ) diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 5c911a19..8c21fdfe 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -1,6 +1,7 @@ use std::{str::FromStr, time::Instant}; use anyhow::{Error, anyhow}; +use async_trait::async_trait; use chrono::{DateTime, Utc}; use libsqlite3_sys::{ SQLITE_DBSTATUS_CACHE_HIT, SQLITE_DBSTATUS_CACHE_MISS, SQLITE_DBSTATUS_CACHE_SPILL, @@ -265,22 +266,103 @@ impl InflightActivationStoreConfig { } } -pub struct InflightActivationStore { +#[async_trait] +pub trait InflightActivationStore: Send + Sync { + /// Trigger incremental vacuum to reclaim free pages in the database + async fn vacuum_db(&self) -> Result<(), Error>; + + /// Perform a full vacuum on the database + async fn full_vacuum_db(&self) -> Result<(), Error>; + + /// Get the size of the database in bytes + async fn db_size(&self) -> Result; + + /// Get an activation by id + async fn get_by_id(&self, id: &str) -> Result, Error>; + + /// Store a batch of activations + async fn store(&self, batch: Vec) -> Result; + + /// Get a single pending activation, optionally filtered by namespace + async fn get_pending_activation( + &self, + namespace: Option<&str>, + ) -> Result, Error>; + + /// Get pending activations from specified namespaces + async fn get_pending_activations_from_namespaces( + &self, + namespaces: Option<&[String]>, + limit: Option, + ) -> Result, Error>; + + /// Get the age of the oldest pending activation in seconds + async fn pending_activation_max_lag(&self, now: &DateTime) -> f64; + + /// Count activations with Pending status + async fn count_pending_activations(&self) -> Result; + + /// Count activations by status + async fn count_by_status(&self, status: InflightActivationStatus) -> Result; + + /// Count all activations + async fn count(&self) -> Result; + + /// Update the status of a specific activation + async fn set_status( + &self, + id: &str, + status: InflightActivationStatus, + ) -> Result, Error>; + + /// Set the processing deadline for a specific activation + async fn set_processing_deadline( + &self, + id: &str, + deadline: Option>, + ) -> Result<(), Error>; + + /// Delete an activation by id + async fn delete_activation(&self, id: &str) -> Result<(), Error>; + + /// Get all activations with status Retry + async fn get_retry_activations(&self) -> Result, Error>; + + /// Clear all activations from the store + async fn clear(&self) -> Result<(), Error>; + + /// Update tasks that exceeded their processing deadline + async fn handle_processing_deadline(&self) -> Result; + + /// Update tasks that exceeded max processing attempts + async fn handle_processing_attempts(&self) -> Result; + + /// Delete tasks past their expires_at deadline + async fn handle_expires_at(&self) -> Result; + + /// Update delayed tasks past their delay_until deadline to Pending + async fn handle_delay_until(&self) -> Result; + + /// Process failed tasks for discard or deadletter + async fn handle_failed_tasks(&self) -> Result; + + /// Mark tasks as complete by id + async fn mark_completed(&self, ids: Vec) -> Result; + + /// Remove completed tasks + async fn remove_completed(&self) -> Result; + + /// Remove killswitched tasks + async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result; +} + +pub struct SqliteActivationStore { read_pool: SqlitePool, write_pool: SqlitePool, config: InflightActivationStoreConfig, } -impl InflightActivationStore { - async fn acquire_write_conn_metric( - &self, - caller: &'static str, - ) -> Result, Error> { - let start = Instant::now(); - let conn = self.write_pool.acquire().await?; - metrics::histogram!("sqlite.write.acquire_conn", "fn" => caller).record(start.elapsed()); - Ok(conn) - } +impl SqliteActivationStore { pub async fn new(url: &str, config: InflightActivationStoreConfig) -> Result { let (read_pool, write_pool) = create_sqlite_pool(url).await?; @@ -293,40 +375,14 @@ impl InflightActivationStore { }) } - /// Trigger incremental vacuum to reclaim free pages in the database. - /// Depending on config data, will either vacuum a set number of - /// pages or attempt to reclaim all free pages. - #[instrument(skip_all)] - pub async fn vacuum_db(&self) -> Result<(), Error> { - let timer = Instant::now(); - - if let Some(page_count) = self.config.vacuum_page_count { - let mut conn = self.acquire_write_conn_metric("vacuum_db").await?; - sqlx::query(format!("PRAGMA incremental_vacuum({page_count})").as_str()) - .execute(&mut *conn) - .await?; - } else { - let mut conn = self.acquire_write_conn_metric("vacuum_db").await?; - sqlx::query("PRAGMA incremental_vacuum") - .execute(&mut *conn) - .await?; - } - let freelist_count: i32 = sqlx::query("PRAGMA freelist_count") - .fetch_one(&self.read_pool) - .await? - .get("freelist_count"); - - metrics::histogram!("store.vacuum", "database" => "meta").record(timer.elapsed()); - metrics::gauge!("store.vacuum.freelist", "database" => "meta").set(freelist_count); - Ok(()) - } - - /// Perform a full vacuum on the database. - pub async fn full_vacuum_db(&self) -> Result<(), Error> { - let mut conn = self.acquire_write_conn_metric("full_vacuum_db").await?; - sqlx::query("VACUUM").execute(&mut *conn).await?; - self.emit_db_status_metrics().await; - Ok(()) + async fn acquire_write_conn_metric( + &self, + caller: &'static str, + ) -> Result, Error> { + let start = Instant::now(); + let conn = self.write_pool.acquire().await?; + metrics::histogram!("sqlite.write.acquire_conn", "fn" => caller).record(start.elapsed()); + Ok(conn) } async fn emit_db_status_metrics(&self) { @@ -473,9 +529,48 @@ impl InflightActivationStore { } } } +} + +#[async_trait] +impl InflightActivationStore for SqliteActivationStore { + /// Trigger incremental vacuum to reclaim free pages in the database. + /// Depending on config data, will either vacuum a set number of + /// pages or attempt to reclaim all free pages. + #[instrument(skip_all)] + async fn vacuum_db(&self) -> Result<(), Error> { + let timer = Instant::now(); + + if let Some(page_count) = self.config.vacuum_page_count { + let mut conn = self.acquire_write_conn_metric("vacuum_db").await?; + sqlx::query(format!("PRAGMA incremental_vacuum({page_count})").as_str()) + .execute(&mut *conn) + .await?; + } else { + let mut conn = self.acquire_write_conn_metric("vacuum_db").await?; + sqlx::query("PRAGMA incremental_vacuum") + .execute(&mut *conn) + .await?; + } + let freelist_count: i32 = sqlx::query("PRAGMA freelist_count") + .fetch_one(&self.read_pool) + .await? + .get("freelist_count"); + + metrics::histogram!("store.vacuum", "database" => "meta").record(timer.elapsed()); + metrics::gauge!("store.vacuum.freelist", "database" => "meta").set(freelist_count); + Ok(()) + } + + /// Perform a full vacuum on the database. + async fn full_vacuum_db(&self) -> Result<(), Error> { + let mut conn = self.acquire_write_conn_metric("full_vacuum_db").await?; + sqlx::query("VACUUM").execute(&mut *conn).await?; + self.emit_db_status_metrics().await; + Ok(()) + } /// Get the size of the database in bytes based on SQLite metadata queries. - pub async fn db_size(&self) -> Result { + async fn db_size(&self) -> Result { let result: u64 = sqlx::query( "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()", ) @@ -487,7 +582,7 @@ impl InflightActivationStore { } /// Get an activation by id. Primarily used for testing - pub async fn get_by_id(&self, id: &str) -> Result, Error> { + async fn get_by_id(&self, id: &str) -> Result, Error> { let row_result: Option = sqlx::query_as( " SELECT id, @@ -522,7 +617,7 @@ impl InflightActivationStore { } #[instrument(skip_all)] - pub async fn store(&self, batch: Vec) -> Result { + async fn store(&self, batch: Vec) -> Result { if batch.is_empty() { return Ok(QueryResult { rows_affected: 0 }); } @@ -605,7 +700,7 @@ impl InflightActivationStore { } #[instrument(skip_all)] - pub async fn get_pending_activation( + async fn get_pending_activation( &self, namespace: Option<&str>, ) -> Result, Error> { @@ -624,7 +719,7 @@ impl InflightActivationStore { /// If namespaces is None, gets from any namespace /// If namespaces is Some(&[...]), gets from those namespaces #[instrument(skip_all)] - pub async fn get_pending_activations_from_namespaces( + async fn get_pending_activations_from_namespaces( &self, namespaces: Option<&[String]>, limit: Option, @@ -687,7 +782,7 @@ impl InflightActivationStore { /// as we are interested in latency to the *first* attempt. /// Tasks with delay_until set, will have their age adjusted based on their /// delay time. No tasks = 0 lag - pub async fn pending_activation_max_lag(&self, now: &DateTime) -> f64 { + async fn pending_activation_max_lag(&self, now: &DateTime) -> f64 { let result = sqlx::query( "SELECT received_at, delay_until FROM inflight_taskactivations @@ -718,13 +813,13 @@ impl InflightActivationStore { } #[instrument(skip_all)] - pub async fn count_pending_activations(&self) -> Result { + async fn count_pending_activations(&self) -> Result { self.count_by_status(InflightActivationStatus::Pending) .await } #[instrument(skip_all)] - pub async fn count_by_status(&self, status: InflightActivationStatus) -> Result { + async fn count_by_status(&self, status: InflightActivationStatus) -> Result { let result = sqlx::query("SELECT COUNT(*) as count FROM inflight_taskactivations WHERE status = $1") .bind(status) @@ -733,7 +828,7 @@ impl InflightActivationStore { Ok(result.get::("count") as usize) } - pub async fn count(&self) -> Result { + async fn count(&self) -> Result { let result = sqlx::query("SELECT COUNT(*) as count FROM inflight_taskactivations") .fetch_one(&self.read_pool) .await?; @@ -742,7 +837,7 @@ impl InflightActivationStore { /// Update the status of a specific activation #[instrument(skip_all)] - pub async fn set_status( + async fn set_status( &self, id: &str, status: InflightActivationStatus, @@ -764,7 +859,7 @@ impl InflightActivationStore { } #[instrument(skip_all)] - pub async fn set_processing_deadline( + async fn set_processing_deadline( &self, id: &str, deadline: Option>, @@ -781,7 +876,7 @@ impl InflightActivationStore { } #[instrument(skip_all)] - pub async fn delete_activation(&self, id: &str) -> Result<(), Error> { + async fn delete_activation(&self, id: &str) -> Result<(), Error> { let mut conn = self.acquire_write_conn_metric("delete_activation").await?; sqlx::query("DELETE FROM inflight_taskactivations WHERE id = $1") .bind(id) @@ -791,7 +886,7 @@ impl InflightActivationStore { } #[instrument(skip_all)] - pub async fn get_retry_activations(&self) -> Result, Error> { + async fn get_retry_activations(&self) -> Result, Error> { Ok(sqlx::query_as( " SELECT id, @@ -822,7 +917,7 @@ impl InflightActivationStore { .collect()) } - pub async fn clear(&self) -> Result<(), Error> { + async fn clear(&self) -> Result<(), Error> { let mut conn = self.acquire_write_conn_metric("clear").await?; sqlx::query("DELETE FROM inflight_taskactivations") .execute(&mut *conn) @@ -834,7 +929,7 @@ impl InflightActivationStore { /// Exceeding a processing deadline does not consume a retry as we don't know /// if a worker took the task and was killed, or failed. #[instrument(skip_all)] - pub async fn handle_processing_deadline(&self) -> Result { + async fn handle_processing_deadline(&self) -> Result { let now = Utc::now(); let mut atomic = self.write_pool.begin().await?; @@ -882,7 +977,7 @@ impl InflightActivationStore { /// Update tasks that have exceeded their max processing attempts. /// These tasks are set to status=failure and will be handled by handle_failed_tasks accordingly. #[instrument(skip_all)] - pub async fn handle_processing_attempts(&self) -> Result { + async fn handle_processing_attempts(&self) -> Result { let mut conn = self .acquire_write_conn_metric("handle_processing_attempts") .await?; @@ -911,7 +1006,7 @@ impl InflightActivationStore { /// /// The number of impacted records is returned in a Result. #[instrument(skip_all)] - pub async fn handle_expires_at(&self) -> Result { + async fn handle_expires_at(&self) -> Result { let now = Utc::now(); let mut conn = self.acquire_write_conn_metric("handle_expires_at").await?; let query = sqlx::query( @@ -932,7 +1027,7 @@ impl InflightActivationStore { /// /// The number of impacted records is returned in a Result. #[instrument(skip_all)] - pub async fn handle_delay_until(&self) -> Result { + async fn handle_delay_until(&self) -> Result { let now = Utc::now(); let mut conn = self.acquire_write_conn_metric("handle_delay_until").await?; let update_result = sqlx::query( @@ -957,7 +1052,7 @@ impl InflightActivationStore { /// Once dead-lettered tasks have been added to Kafka those tasks can have their status set to /// complete. #[instrument(skip_all)] - pub async fn handle_failed_tasks(&self) -> Result { + async fn handle_failed_tasks(&self) -> Result { let mut atomic = self.write_pool.begin().await?; let failed_tasks: Vec = @@ -1013,7 +1108,7 @@ impl InflightActivationStore { /// Mark a collection of tasks as complete by id #[instrument(skip_all)] - pub async fn mark_completed(&self, ids: Vec) -> Result { + async fn mark_completed(&self, ids: Vec) -> Result { let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); query_builder .push("SET status = ") @@ -1034,7 +1129,7 @@ impl InflightActivationStore { /// Remove completed tasks. /// This method is a garbage collector for the inflight task store. #[instrument(skip_all)] - pub async fn remove_completed(&self) -> Result { + async fn remove_completed(&self) -> Result { let mut conn = self.acquire_write_conn_metric("remove_completed").await?; let query = sqlx::query("DELETE FROM inflight_taskactivations WHERE status = $1") .bind(InflightActivationStatus::Complete) @@ -1046,7 +1141,7 @@ impl InflightActivationStore { /// Remove killswitched tasks. #[instrument(skip_all)] - pub async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result { + async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result { let mut query_builder = QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE taskname IN ("); let mut separated = query_builder.separated(", "); diff --git a/src/store/inflight_activation_tests.rs b/src/store/inflight_activation_tests.rs index 2ff605e3..3f32fc78 100644 --- a/src/store/inflight_activation_tests.rs +++ b/src/store/inflight_activation_tests.rs @@ -8,7 +8,7 @@ use std::time::Duration; use crate::config::Config; use crate::store::inflight_activation::{ InflightActivation, InflightActivationStatus, InflightActivationStore, - InflightActivationStoreConfig, QueryResult, create_sqlite_pool, + InflightActivationStoreConfig, QueryResult, SqliteActivationStore, create_sqlite_pool, }; use crate::test_utils::{ StatusCount, assert_counts, create_integration_config, create_test_store, @@ -66,7 +66,7 @@ fn test_inflightactivation_status_from() { #[tokio::test] async fn test_create_db() { assert!( - InflightActivationStore::new( + SqliteActivationStore::new( &generate_temp_filename(), InflightActivationStoreConfig::from_config(&create_integration_config()) ) @@ -142,7 +142,7 @@ async fn test_get_pending_activation() { processing: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } @@ -255,7 +255,7 @@ async fn test_get_pending_activation_skip_expires() { pending: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } @@ -291,7 +291,7 @@ async fn test_count_pending_activations() { processing: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } @@ -307,7 +307,7 @@ async fn set_activation_status() { pending: 2, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -323,7 +323,7 @@ async fn set_activation_status() { failure: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -338,7 +338,7 @@ async fn set_activation_status() { pending: 2, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; assert!( @@ -359,7 +359,7 @@ async fn set_activation_status() { failure: 2, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; assert!(store.get_pending_activation(None).await.unwrap().is_none()); @@ -444,7 +444,7 @@ async fn test_get_retry_activations() { pending: 2, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -460,7 +460,7 @@ async fn test_get_retry_activations() { retry: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -493,7 +493,7 @@ async fn test_handle_processing_deadline() { processing: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -505,7 +505,7 @@ async fn test_handle_processing_deadline() { pending: 2, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -533,7 +533,7 @@ async fn test_handle_processing_deadline_multiple_tasks() { processing: 2, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -546,7 +546,7 @@ async fn test_handle_processing_deadline_multiple_tasks() { processing: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } @@ -580,7 +580,7 @@ async fn test_handle_processing_at_most_once() { processing: 2, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -593,7 +593,7 @@ async fn test_handle_processing_at_most_once() { failure: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -626,7 +626,7 @@ async fn test_handle_processing_deadline_discard_after() { processing: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -638,7 +638,7 @@ async fn test_handle_processing_deadline_discard_after() { pending: 2, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } @@ -668,7 +668,7 @@ async fn test_handle_processing_deadline_deadletter_after() { processing: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -680,7 +680,7 @@ async fn test_handle_processing_deadline_deadletter_after() { pending: 2, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } @@ -709,7 +709,7 @@ async fn test_handle_processing_deadline_no_retries_remaining() { pending: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -722,7 +722,7 @@ async fn test_handle_processing_deadline_no_retries_remaining() { pending: 2, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } @@ -751,7 +751,7 @@ async fn test_processing_attempts_exceeded() { pending: 2, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -764,7 +764,7 @@ async fn test_processing_attempts_exceeded() { failure: 2, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } @@ -787,7 +787,7 @@ async fn test_remove_completed() { pending: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -821,7 +821,7 @@ async fn test_remove_completed() { pending: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } @@ -850,7 +850,7 @@ async fn test_remove_completed_multiple_gaps() { failure: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -892,7 +892,7 @@ async fn test_remove_completed_multiple_gaps() { failure: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } @@ -948,7 +948,7 @@ async fn test_handle_failed_tasks() { failure: 4, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -978,7 +978,7 @@ async fn test_handle_failed_tasks() { complete: 2, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } @@ -994,7 +994,7 @@ async fn test_mark_completed() { pending: 3, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -1010,7 +1010,7 @@ async fn test_mark_completed() { complete: 3, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } @@ -1031,7 +1031,7 @@ async fn test_handle_expires_at() { pending: 3, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -1043,7 +1043,7 @@ async fn test_handle_expires_at() { pending: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } @@ -1063,7 +1063,7 @@ async fn test_remove_killswitched() { pending: 6, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; @@ -1081,7 +1081,7 @@ async fn test_remove_killswitched() { pending: 3, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } @@ -1133,14 +1133,14 @@ async fn test_clear() { pending: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; assert_eq!(store.count().await.unwrap(), 1); assert!(store.clear().await.is_ok()); assert_eq!(store.count().await.unwrap(), 0); - assert_counts(StatusCount::default(), &store).await; + assert_counts(StatusCount::default(), store.as_ref()).await; } #[tokio::test] @@ -1171,7 +1171,7 @@ async fn test_vacuum_db_incremental() { vacuum_page_count: Some(10), ..Config::default() }; - let store = InflightActivationStore::new( + let store = SqliteActivationStore::new( &generate_temp_filename(), InflightActivationStoreConfig::from_config(&config), ) @@ -1280,7 +1280,7 @@ async fn test_db_status_calls_ok() { let url = format!("sqlite:{db_path}"); // Initialize a store to create the database and run migrations - InflightActivationStore::new( + SqliteActivationStore::new( &url, InflightActivationStoreConfig { max_processing_attempts: 3, diff --git a/src/test_utils.rs b/src/test_utils.rs index 9ae23de5..616c51a9 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -14,7 +14,7 @@ use crate::{ config::Config, store::inflight_activation::{ InflightActivation, InflightActivationStatus, InflightActivationStore, - InflightActivationStoreConfig, + InflightActivationStoreConfig, SqliteActivationStore, }, }; use chrono::{Timelike, Utc}; @@ -88,9 +88,9 @@ pub fn create_config() -> Arc { } /// Create an InflightActivationStore instance -pub async fn create_test_store() -> Arc { +pub async fn create_test_store() -> Arc { Arc::new( - InflightActivationStore::new( + SqliteActivationStore::new( &generate_temp_filename(), InflightActivationStoreConfig::from_config(&create_integration_config()), ) @@ -214,7 +214,7 @@ pub struct StatusCount { } /// Assert the state of all counts in the inflight activation store. -pub async fn assert_counts(expected: StatusCount, store: &InflightActivationStore) { +pub async fn assert_counts(expected: StatusCount, store: &dyn InflightActivationStore) { assert_eq!( expected.pending, store diff --git a/src/upkeep.rs b/src/upkeep.rs index 1ee91215..d2d352b1 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -29,7 +29,7 @@ use crate::{ /// on the inflight store pub async fn upkeep( config: Arc, - store: Arc, + store: Arc, startup_time: DateTime, runtime_config_manager: Arc, health_reporter: HealthReporter, @@ -110,7 +110,7 @@ impl UpkeepResults { )] pub async fn do_upkeep( config: Arc, - store: Arc, + store: Arc, producer: Arc, startup_time: DateTime, runtime_config_manager: Arc, @@ -522,6 +522,7 @@ mod tests { runtime_config::RuntimeConfigManager, store::inflight_activation::{ InflightActivationStatus, InflightActivationStore, InflightActivationStoreConfig, + SqliteActivationStore, }, test_utils::{ StatusCount, assert_counts, consume_topic, create_config, create_integration_config, @@ -531,12 +532,12 @@ mod tests { upkeep::{create_retry_activation, do_upkeep}, }; - async fn create_inflight_store() -> Arc { + async fn create_inflight_store() -> Arc { let url = generate_temp_filename(); let config = create_integration_config(); Arc::new( - InflightActivationStore::new(&url, InflightActivationStoreConfig::from_config(&config)) + SqliteActivationStore::new(&url, InflightActivationStoreConfig::from_config(&config)) .await .unwrap(), ) @@ -784,7 +785,7 @@ mod tests { processing: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; assert_eq!(result_context.processing_deadline_reset, 0); @@ -833,7 +834,7 @@ mod tests { pending: 2, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } @@ -883,7 +884,7 @@ mod tests { pending: 1, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } @@ -1322,7 +1323,7 @@ demoted_namespaces: pending: 2, ..StatusCount::default() }, - &store, + store.as_ref(), ) .await; } From 60ccc626c8c4580027627793d125978a38c44cf7 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Tue, 6 Jan 2026 16:17:10 -0500 Subject: [PATCH 2/6] fix --- benches/store_bench.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/benches/store_bench.rs b/benches/store_bench.rs index af5ce5f5..db2a770c 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -6,6 +6,7 @@ use rand::Rng; use taskbroker::{ store::inflight_activation::{ InflightActivationStatus, InflightActivationStore, InflightActivationStoreConfig, + SqliteActivationStore, }, test_utils::{ generate_temp_filename, generate_unique_namespace, make_activations_with_namespace, @@ -25,7 +26,7 @@ async fn get_pending_activations(num_activations: u32, num_workers: u32) { generate_temp_filename() }; let store = Arc::new( - InflightActivationStore::new( + SqliteActivationStore::new( &url, InflightActivationStoreConfig { max_processing_attempts: 1, @@ -89,7 +90,7 @@ async fn set_status(num_activations: u32, num_workers: u32) { }; let store = Arc::new( - InflightActivationStore::new( + SqliteActivationStore::new( &url, InflightActivationStoreConfig { max_processing_attempts: 1, From 1bfba8f54fa7e24ec5e46b62315b7c00d6865d0a Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Wed, 7 Jan 2026 10:41:15 -0500 Subject: [PATCH 3/6] try different ports --- python/integration_tests/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/integration_tests/helpers.py b/python/integration_tests/helpers.py index e371223a..9399f0ab 100644 --- a/python/integration_tests/helpers.py +++ b/python/integration_tests/helpers.py @@ -216,7 +216,7 @@ def get_available_ports(count: int) -> list[int]: MAX = 65535 res = [] for i in range(count): - for candidate in range(MIN + i, MAX, count): + for candidate in range(MIN + (i * count), MAX, count): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("0.0.0.0", candidate)) From 0ecc1b15372659d3f7d3c3eba97c5bdef435fce4 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Wed, 7 Jan 2026 12:40:14 -0500 Subject: [PATCH 4/6] ports --- python/integration_tests/helpers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/integration_tests/helpers.py b/python/integration_tests/helpers.py index 9399f0ab..e5db237d 100644 --- a/python/integration_tests/helpers.py +++ b/python/integration_tests/helpers.py @@ -212,7 +212,8 @@ def get_num_tasks_group_by_status( def get_available_ports(count: int) -> list[int]: - MIN = 49152 + # MIN = 49152 + MIN = 50000 MAX = 65535 res = [] for i in range(count): From a85a5ace4c8166e9d897015ecc09151567778218 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Thu, 8 Jan 2026 10:27:53 -0500 Subject: [PATCH 5/6] ports --- python/integration_tests/helpers.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/integration_tests/helpers.py b/python/integration_tests/helpers.py index e5db237d..9399f0ab 100644 --- a/python/integration_tests/helpers.py +++ b/python/integration_tests/helpers.py @@ -212,8 +212,7 @@ def get_num_tasks_group_by_status( def get_available_ports(count: int) -> list[int]: - # MIN = 49152 - MIN = 50000 + MIN = 49152 MAX = 65535 res = [] for i in range(count): From 5452f3afffa8bf358b2d1a1045b30a6f0c66174a Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Thu, 8 Jan 2026 10:49:08 -0500 Subject: [PATCH 6/6] port revert --- python/integration_tests/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/integration_tests/helpers.py b/python/integration_tests/helpers.py index 9399f0ab..e371223a 100644 --- a/python/integration_tests/helpers.py +++ b/python/integration_tests/helpers.py @@ -216,7 +216,7 @@ def get_available_ports(count: int) -> list[int]: MAX = 65535 res = [] for i in range(count): - for candidate in range(MIN + (i * count), MAX, count): + for candidate in range(MIN + i, MAX, count): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("0.0.0.0", candidate))