diff --git a/Cargo.lock b/Cargo.lock index 2f33ebb..ae5cb48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -578,6 +578,41 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" +dependencies = [ + "darling_core", + "quote", + "syn", +] + [[package]] name = "debugid" version = "0.8.0" @@ -608,6 +643,37 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "derive_builder" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "507dfb09ea8b7fa618fcf76e953f4f5e192547945816d5358edffe39f6f94947" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "derive_builder_macro" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" +dependencies = [ + "derive_builder_core", + "syn", +] + [[package]] name = "digest" version = "0.10.7" @@ -1259,6 +1325,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "1.1.0" @@ -2910,6 +2982,7 @@ dependencies = [ "chrono", "clap", "criterion", + "derive_builder", "elegant-departure", "figment", "futures", diff --git a/Cargo.toml b/Cargo.toml index 6ef48e2..62732bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ anyhow = "1.0.92" bytes = "1.10.0" chrono = { version = "0.4.26" } clap = { version = "4.5.20", features = ["derive"] } +derive_builder = "0.20.2" elegant-departure = { version = "0.3.1", features = ["tokio"] } figment = { version = "0.10.19", features = ["env", "yaml", "test"] } futures = "0.3.31" diff --git a/src/kafka/inflight_activation_batcher.rs b/src/kafka/inflight_activation_batcher.rs index 94043d0..8848535 100644 --- a/src/kafka/inflight_activation_batcher.rs +++ b/src/kafka/inflight_activation_batcher.rs @@ -213,19 +213,18 @@ impl Reducer for InflightActivationBatcher { #[cfg(test)] mod tests { use super::{ - ActivationBatcherConfig, Config, InflightActivation, InflightActivationBatcher, Reducer, - RuntimeConfigManager, + ActivationBatcherConfig, Config, InflightActivationBatcher, Reducer, RuntimeConfigManager, }; use chrono::Utc; - use std::collections::HashMap; use tokio::fs; - use prost::Message; - use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivation}; use std::sync::Arc; use crate::{ - store::inflight_activation::InflightActivationStatus, test_utils::generate_unique_namespace, + store::{ + inflight_activation::InflightActivationBuilder, task_activation::TaskActivationBuilder, + }, + test_utils::generate_unique_namespace, }; #[tokio::test] @@ -248,36 +247,11 @@ demoted_namespaces: let namespace = generate_unique_namespace(); - let inflight_activation_0 = InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "task_to_be_filtered".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: None, - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: Utc::now(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "task_to_be_filtered".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }; + let inflight_activation_0 = InflightActivationBuilder::new() + .id("0") + .taskname("task_to_be_filtered") + .namespace(&namespace) + .build(TaskActivationBuilder::new()); batcher.reduce(inflight_activation_0).await.unwrap(); assert_eq!(batcher.batch.len(), 0); @@ -296,36 +270,12 @@ demoted_namespaces: let namespace = generate_unique_namespace(); - let inflight_activation_0 = InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "task_to_be_filtered".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: None, - retry_state: None, - processing_deadline_duration: 0, - expires: Some(0), - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: Utc::now(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: Some(Utc::now()), - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "task_to_be_filtered".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }; + let inflight_activation_0 = InflightActivationBuilder::new() + .id("0") + .taskname("task_to_be_filtered") + .namespace(&namespace) + .expires_at(Utc::now()) + .build(TaskActivationBuilder::new()); batcher.reduce(inflight_activation_0).await.unwrap(); assert_eq!(batcher.batch.len(), 0); @@ -347,36 +297,11 @@ demoted_namespaces: let namespace = generate_unique_namespace(); - let inflight_activation_0 = InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "taskname".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: None, - retry_state: None, - processing_deadline_duration: 0, - expires: Some(0), - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: Utc::now(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "taskname".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }; + let inflight_activation_0 = InflightActivationBuilder::new() + .id("0") + .taskname("taskname") + .namespace(&namespace) + .build(TaskActivationBuilder::new()); batcher.reduce(inflight_activation_0).await.unwrap(); assert!(batcher.is_full().await); @@ -400,67 +325,17 @@ demoted_namespaces: let namespace = generate_unique_namespace(); - let inflight_activation_0 = InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "taskname".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: None, - retry_state: None, - processing_deadline_duration: 0, - expires: Some(0), - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: Utc::now(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "taskname".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }; - - let inflight_activation_1 = InflightActivation { - id: "1".to_string(), - activation: TaskActivation { - id: "1".to_string(), - namespace: namespace.clone(), - taskname: "taskname".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: None, - retry_state: None, - processing_deadline_duration: 0, - expires: Some(0), - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: Utc::now(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "taskname".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }; + let inflight_activation_0 = InflightActivationBuilder::new() + .id("0") + .taskname("taskname") + .namespace(&namespace) + .build(TaskActivationBuilder::new()); + + let inflight_activation_1 = InflightActivationBuilder::new() + .id("1") + .taskname("taskname") + .namespace(&namespace) + .build(TaskActivationBuilder::new()); batcher.reduce(inflight_activation_0).await.unwrap(); batcher.reduce(inflight_activation_1).await.unwrap(); @@ -491,67 +366,17 @@ demoted_topic: taskworker-demoted"#; assert_eq!(batcher.producer_cluster, config.kafka_cluster.clone()); - let inflight_activation_0 = InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: "bad_namespace".to_string(), - taskname: "task_to_be_filtered".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: None, - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: Utc::now(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: "bad_namespace".to_string(), - taskname: "taskname".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }; - - let inflight_activation_1 = InflightActivation { - id: "1".to_string(), - activation: TaskActivation { - id: "1".to_string(), - namespace: "good_namespace".to_string(), - taskname: "good_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: None, - retry_state: None, - processing_deadline_duration: 0, - expires: Some(0), - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: Utc::now(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: "good_namespace".to_string(), - taskname: "good_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }; + let inflight_activation_0 = InflightActivationBuilder::new() + .id("0") + .taskname("task_to_be_filtered") + .namespace("bad_namespace") + .build(TaskActivationBuilder::new()); + + let inflight_activation_1 = InflightActivationBuilder::new() + .id("1") + .taskname("good_task") + .namespace("good_namespace") + .build(TaskActivationBuilder::new()); batcher.reduce(inflight_activation_0).await.unwrap(); batcher.reduce(inflight_activation_1).await.unwrap(); diff --git a/src/kafka/inflight_activation_writer.rs b/src/kafka/inflight_activation_writer.rs index 3f49856..4cdf716 100644 --- a/src/kafka/inflight_activation_writer.rs +++ b/src/kafka/inflight_activation_writer.rs @@ -202,19 +202,16 @@ impl Reducer for InflightActivationWriter { #[cfg(test)] mod tests { - use super::{ActivationWriterConfig, InflightActivation, InflightActivationWriter, Reducer}; - use chrono::{DateTime, Utc}; - use prost::Message; - use prost_types::Timestamp; - use std::collections::HashMap; - - use sentry_protos::taskbroker::v1::OnAttemptsExceeded; - use sentry_protos::taskbroker::v1::TaskActivation; + use super::{ActivationWriterConfig, InflightActivationWriter, Reducer}; + + use chrono::DateTime; use std::sync::Arc; + use crate::store::inflight_activation::InflightActivationBuilder; use crate::store::inflight_activation::{ InflightActivationStatus, InflightActivationStore, InflightActivationStoreConfig, }; + use crate::store::task_activation::TaskActivationBuilder; use crate::test_utils::generate_unique_namespace; use crate::test_utils::make_activations; use crate::test_utils::{create_integration_config, generate_temp_filename}; @@ -241,82 +238,22 @@ mod tests { writer_config, ); - let received_at = Timestamp { - seconds: 0, - nanos: 0, - }; - + let received_at = DateTime::from_timestamp_nanos(0); let namespace = generate_unique_namespace(); let batch = vec![ - InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp( - received_at.seconds, - received_at.nanos as u32, - ) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }, - InflightActivation { - id: "1".to_string(), - activation: TaskActivation { - id: "1".to_string(), - namespace: namespace.clone(), - taskname: "delay_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Delay, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp( - received_at.seconds, - received_at.nanos as u32, - ) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "delay_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }, + InflightActivationBuilder::new() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .build(TaskActivationBuilder::new()), + InflightActivationBuilder::new() + .id("1") + .taskname("delay_task") + .namespace(&namespace) + .received_at(received_at) + .build(TaskActivationBuilder::new()), ]; writer.reduce(batch).await.unwrap(); @@ -352,44 +289,17 @@ mod tests { writer_config, ); - let received_at = Timestamp { - seconds: 0, - nanos: 0, - }; - + let received_at = DateTime::from_timestamp_nanos(0); let namespace = generate_unique_namespace(); - let batch = vec![InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp(received_at.seconds, received_at.nanos as u32) - .unwrap(), - processing_attempts: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - processing_deadline_duration: 0, - at_most_once: false, - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }]; + let batch = vec![ + InflightActivationBuilder::new() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .build(TaskActivationBuilder::new()), + ]; writer.reduce(batch).await.unwrap(); writer.flush().await.unwrap(); @@ -419,44 +329,18 @@ mod tests { writer_config, ); - let received_at = Timestamp { - seconds: 0, - nanos: 0, - }; - + let received_at = DateTime::from_timestamp_nanos(0); let namespace = generate_unique_namespace(); - let batch = vec![InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Delay, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp(received_at.seconds, received_at.nanos as u32) - .unwrap(), - processing_attempts: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - processing_deadline_duration: 0, - at_most_once: false, - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }]; + let batch = vec![ + InflightActivationBuilder::new() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .status(InflightActivationStatus::Delay) + .build(TaskActivationBuilder::new()), + ]; writer.reduce(batch).await.unwrap(); writer.flush().await.unwrap(); @@ -490,82 +374,22 @@ mod tests { writer_config, ); - let received_at = Timestamp { - seconds: 0, - nanos: 0, - }; - + let received_at = DateTime::from_timestamp_nanos(0); let namespace = generate_unique_namespace(); let batch = vec![ - InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp( - received_at.seconds, - received_at.nanos as u32, - ) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }, - InflightActivation { - id: "1".to_string(), - activation: TaskActivation { - id: "1".to_string(), - namespace: namespace.clone(), - taskname: "delay_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Delay, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp( - received_at.seconds, - received_at.nanos as u32, - ) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "delay_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }, + InflightActivationBuilder::new() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .build(TaskActivationBuilder::new()), + InflightActivationBuilder::new() + .id("1") + .taskname("delay_task") + .namespace(&namespace) + .received_at(received_at) + .build(TaskActivationBuilder::new()), ]; writer.reduce(batch).await.unwrap(); @@ -602,82 +426,22 @@ mod tests { writer_config, ); - let received_at = Timestamp { - seconds: 0, - nanos: 0, - }; - + let received_at = DateTime::from_timestamp_nanos(0); let namespace = generate_unique_namespace(); let batch = vec![ - InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp( - received_at.seconds, - received_at.nanos as u32, - ) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }, - InflightActivation { - id: "1".to_string(), - activation: TaskActivation { - id: "1".to_string(), - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp( - received_at.seconds, - received_at.nanos as u32, - ) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }, + InflightActivationBuilder::new() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .build(TaskActivationBuilder::new()), + InflightActivationBuilder::new() + .id("1") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .build(TaskActivationBuilder::new()), ]; writer.reduce(batch).await.unwrap(); @@ -711,116 +475,33 @@ mod tests { .unwrap(), ); - let received_at = Timestamp { - seconds: 0, - nanos: 0, - }; - + let received_at = DateTime::from_timestamp_nanos(0); let namespace = generate_unique_namespace(); - let existing_activation = InflightActivation { - id: "existing".to_string(), - activation: TaskActivation { - id: "existing".to_string(), - namespace: namespace.clone(), - taskname: "existing_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Processing, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp(received_at.seconds, received_at.nanos as u32) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "existing_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }; + let existing_activation = InflightActivationBuilder::new() + .id("existing") + .taskname("existing_task") + .namespace(&namespace) + .received_at(received_at) + .status(InflightActivationStatus::Processing) + .build(TaskActivationBuilder::new()); + store.store(vec![existing_activation]).await.unwrap(); let mut writer = InflightActivationWriter::new(store.clone(), writer_config); let batch = vec![ - InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp( - received_at.seconds, - received_at.nanos as u32, - ) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }, - InflightActivation { - id: "1".to_string(), - activation: TaskActivation { - id: "1".to_string(), - namespace: namespace.clone(), - taskname: "delay_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp( - received_at.seconds, - received_at.nanos as u32, - ) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "delay_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }, + InflightActivationBuilder::new() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .build(TaskActivationBuilder::new()), + InflightActivationBuilder::new() + .id("1") + .taskname("delay_task") + .namespace(&namespace) + .received_at(received_at) + .build(TaskActivationBuilder::new()), ]; writer.reduce(batch).await.unwrap(); diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 5c911a1..e40a215 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -1,7 +1,6 @@ -use std::{str::FromStr, time::Instant}; - use anyhow::{Error, anyhow}; use chrono::{DateTime, Utc}; +use derive_builder::Builder; use libsqlite3_sys::{ SQLITE_DBSTATUS_CACHE_HIT, SQLITE_DBSTATUS_CACHE_MISS, SQLITE_DBSTATUS_CACHE_SPILL, SQLITE_DBSTATUS_CACHE_USED, SQLITE_DBSTATUS_CACHE_USED_SHARED, SQLITE_DBSTATUS_CACHE_WRITE, @@ -10,6 +9,8 @@ use libsqlite3_sys::{ SQLITE_DBSTATUS_LOOKASIDE_USED, SQLITE_DBSTATUS_SCHEMA_USED, SQLITE_DBSTATUS_STMT_USED, SQLITE_OK, sqlite3_db_status, }; +use prost::Message; +use prost_types::Timestamp; use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivationStatus}; use sqlx::{ ConnectOptions, FromRow, Pool, QueryBuilder, Row, Sqlite, Type, @@ -20,9 +21,13 @@ use sqlx::{ SqliteRow, SqliteSynchronous, }, }; +use std::{ + str::FromStr, + time::{Instant, SystemTime}, +}; use tracing::instrument; -use crate::config::Config; +use crate::{config::Config, store::task_activation::TaskActivationBuilder}; /// The members of this enum should be synced with the members /// of InflightActivationStatus in sentry_protos @@ -63,57 +68,133 @@ impl From for InflightActivationStatus { } } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Builder)] +#[builder(pattern = "owned")] +#[builder(build_fn(name = "_build", private))] pub struct InflightActivation { + #[builder(setter(into))] pub id: String, - /// The protobuf activation that was received from kafka + + /// The task namespace. + #[builder(setter(into))] + pub namespace: String, + + /// The task name. + #[builder(setter(into))] + pub taskname: String, + + /// The Protobuf activation that was received from Kafka. + #[builder(setter(custom))] pub activation: Vec, /// The current status of the activation + #[builder(default = InflightActivationStatus::Pending)] pub status: InflightActivationStatus, /// The partition the activation was received from + #[builder(default = 0)] pub partition: i32, /// The offset the activation had + #[builder(default = 0)] pub offset: i64, /// The timestamp when the activation was stored in activation store. + #[builder(default = Utc::now())] pub added_at: DateTime, /// The timestamp a task was stored in Kafka + #[builder(default = Utc::now())] pub received_at: DateTime, /// The number of times the activation has been attempted to be processed. This counter is /// incremented everytime a task is reset from processing back to pending. When this /// exceeds max_processing_attempts, the task is discarded/deadlettered. + #[builder(default = 0)] pub processing_attempts: i32, /// The duration in seconds that a worker has to complete task execution. /// When an activation is moved from pending -> processing a result is expected /// in this many seconds. + #[builder(default = 0)] pub processing_deadline_duration: u32, /// If the task has specified an expiry, this is the timestamp after which the task should be removed from inflight store + #[builder(default = None, setter(strip_option))] pub expires_at: Option>, /// If the task has specified a delay, this is the timestamp after which the task can be sent to workers + #[builder(default = None, setter(strip_option))] pub delay_until: Option>, /// The timestamp for when processing should be complete + #[builder(default = None, setter(strip_option))] pub processing_deadline: Option>, /// What to do when the maximum number of attempts to complete a task is exceeded + #[builder(default = OnAttemptsExceeded::Discard)] pub on_attempts_exceeded: OnAttemptsExceeded, /// Whether or not the activation uses at_most_once. /// When enabled activations are not retried when processing_deadlines /// are exceeded. + #[builder(default = false)] pub at_most_once: bool, +} - /// Details about the task - pub namespace: String, - pub taskname: String, +impl InflightActivationBuilder { + pub fn new() -> Self { + Self::default() + } + + pub fn build(mut self, builder: TaskActivationBuilder) -> InflightActivation { + // Grab required fields + let id = self.id.as_ref().expect("field 'id' is required"); + let namespace = self + .namespace + .as_ref() + .expect("field 'namespace' is required"); + let taskname = self + .taskname + .as_ref() + .expect("field 'taskname' is required"); + + // Grab fields with defaults + let received_at = self.received_at.unwrap_or_default(); + let processing_deadline_duration = self.processing_deadline_duration.unwrap_or_default(); + + // Infer 'expires' field + let expires = self + .expires_at + .flatten() + .map(|date_time| (date_time - received_at).num_seconds() as u64); + + // Infer 'delay' field + let delay = self + .delay_until + .flatten() + .map(|date_time| (date_time - received_at).num_seconds() as u64); + + // Build the activation + let mut activation = builder + .id(id) + .taskname(taskname) + .namespace(namespace) + .received_at(Timestamp::from(SystemTime::from(received_at))) + .processing_deadline_duration(processing_deadline_duration as u64) + .build(); + + // Set 'expiration' and 'delay' fields manually after activation has been build + activation.expires = expires; + activation.delay = delay; + + self.activation = Some(activation.encode_to_vec()); + + match self._build() { + Ok(activation) => activation, + Err(e) => panic!("Failed to build InflightActivation: {}", e), + } + } } impl InflightActivation { diff --git a/src/store/inflight_activation_tests.rs b/src/store/inflight_activation_tests.rs index 2ff605e..fd5fd3e 100644 --- a/src/store/inflight_activation_tests.rs +++ b/src/store/inflight_activation_tests.rs @@ -1,5 +1,4 @@ -use prost::Message; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::io::Error; use std::path::Path; use std::sync::Arc; @@ -7,18 +6,17 @@ use std::time::Duration; use crate::config::Config; use crate::store::inflight_activation::{ - InflightActivation, InflightActivationStatus, InflightActivationStore, + InflightActivationBuilder, InflightActivationStatus, InflightActivationStore, InflightActivationStoreConfig, QueryResult, create_sqlite_pool, }; +use crate::store::task_activation::TaskActivationBuilder; use crate::test_utils::{ StatusCount, assert_counts, create_integration_config, create_test_store, generate_temp_filename, generate_unique_namespace, make_activations, make_activations_with_namespace, replace_retry_state, }; use chrono::{DateTime, SubsecRound, TimeZone, Utc}; -use sentry_protos::taskbroker::v1::{ - OnAttemptsExceeded, RetryState, TaskActivation, TaskActivationStatus, -}; +use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, RetryState, TaskActivationStatus}; use sqlx::{QueryBuilder, Sqlite}; use std::fs; use tokio::sync::broadcast; @@ -1089,44 +1087,22 @@ async fn test_remove_killswitched() { #[tokio::test] async fn test_clear() { let store = create_test_store().await; + + let received_at = DateTime::from_timestamp_nanos(0); + let expires_at = received_at + Duration::from_secs(1); + let namespace = generate_unique_namespace(); - #[allow(deprecated)] - let received_at = prost_types::Timestamp { - seconds: 0, - nanos: 0, - }; - let batch = vec![InflightActivation { - id: "id_0".into(), - activation: TaskActivation { - id: "id_0".into(), - namespace: namespace.clone(), - taskname: "taskname".into(), - parameters: "{}".into(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: Some(1), - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp(received_at.seconds, received_at.nanos as u32) - .expect(""), - processing_attempts: 0, - processing_deadline_duration: 0, - on_attempts_exceeded: OnAttemptsExceeded::Discard, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "taskname".into(), - }]; + let batch = vec![ + InflightActivationBuilder::new() + .id("id_0") + .taskname("taskname") + .namespace(&namespace) + .received_at(received_at) + .expires_at(expires_at) + .build(TaskActivationBuilder::new()), + ]; + assert!(store.store(batch).await.is_ok()); assert_counts( StatusCount { diff --git a/src/store/mod.rs b/src/store/mod.rs index dcc0f25..1c60fd9 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -1,3 +1,5 @@ pub mod inflight_activation; +pub mod task_activation; + #[cfg(test)] pub mod inflight_activation_tests; diff --git a/src/store/task_activation.rs b/src/store/task_activation.rs new file mode 100644 index 0000000..5567528 --- /dev/null +++ b/src/store/task_activation.rs @@ -0,0 +1,86 @@ +use prost_types::Timestamp; +use sentry_protos::taskbroker::v1; +use std::collections::HashMap; + +macro_rules! builder_setter { + // For types that should accept `impl Into` + ($field:ident: impl Into<$ty:ty>) => { + pub fn $field>(mut self, $field: T) -> Self { + self.$field = Some($field.into()); + self + } + }; + + // For types that should be used directly + ($field:ident: $ty:ty) => { + pub fn $field(mut self, $field: $ty) -> Self { + self.$field = Some($field); + self + } + }; +} + +pub struct TaskActivationBuilder { + pub id: Option, + pub namespace: Option, + pub taskname: Option, + pub parameters: Option, + pub headers: Option>, + pub received_at: Option, + pub retry_state: Option, + pub processing_deadline_duration: Option, + pub expires: Option, + pub delay: Option, +} + +impl TaskActivationBuilder { + pub fn new() -> Self { + Self { + id: None, + namespace: None, + taskname: None, + parameters: None, + headers: None, + received_at: None, + retry_state: None, + processing_deadline_duration: None, + expires: None, + delay: None, + } + } + + // String fields that accept `impl Into` + builder_setter!(id: impl Into); + builder_setter!(namespace: impl Into); + builder_setter!(taskname: impl Into); + builder_setter!(parameters: impl Into); + + // Other fields + builder_setter!(headers: HashMap); + builder_setter!(received_at: Timestamp); + builder_setter!(retry_state: v1::RetryState); + builder_setter!(processing_deadline_duration: u64); + builder_setter!(expires: u64); + builder_setter!(delay: u64); + + pub fn build(self) -> v1::TaskActivation { + v1::TaskActivation { + id: self.id.expect("id is required"), + namespace: self.namespace.expect("namespace is required"), + taskname: self.taskname.expect("taskname is required"), + parameters: self.parameters.unwrap_or_else(|| "{}".to_string()), + headers: self.headers.unwrap_or_default(), + processing_deadline_duration: self.processing_deadline_duration.unwrap_or(0), + received_at: self.received_at, + retry_state: self.retry_state, + expires: self.expires, + delay: self.delay, + } + } +} + +impl Default for TaskActivationBuilder { + fn default() -> Self { + Self::new() + } +} diff --git a/src/test_utils.rs b/src/test_utils.rs index 9ae23de..3cd9f2b 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -7,17 +7,20 @@ use rdkafka::{ consumer::{Consumer, StreamConsumer}, producer::FutureProducer, }; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use uuid::Uuid; use crate::{ config::Config, - store::inflight_activation::{ - InflightActivation, InflightActivationStatus, InflightActivationStore, - InflightActivationStoreConfig, + store::{ + inflight_activation::{ + InflightActivation, InflightActivationBuilder, InflightActivationStatus, + InflightActivationStore, InflightActivationStoreConfig, + }, + task_activation::TaskActivationBuilder, }, }; -use chrono::{Timelike, Utc}; +use chrono::Utc; use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, RetryState, TaskActivation}; /// Generate a unique filename for isolated SQLite databases. @@ -37,40 +40,17 @@ pub fn make_activations_with_namespace(namespace: String, count: u32) -> Vec