Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
6bf130d
start
evanh Nov 17, 2025
4f54a15
start
evanh Nov 17, 2025
da304d2
first commit
evanh Nov 18, 2025
60863db
fixes
evanh Nov 18, 2025
c88bc5f
get writing working
evanh Nov 20, 2025
99d4ee1
add partition rebalancing
evanh Nov 20, 2025
7a433b7
get reads written
evanh Nov 21, 2025
e95c1d9
get reads working
evanh Nov 24, 2025
48b6638
processing deadlines, expired, retry
evanh Nov 26, 2025
712ae3e
refactor
evanh Nov 26, 2025
9e431ef
delay until and tests
evanh Nov 27, 2025
da6be31
debugging
evanh Nov 27, 2025
59f93e5
logging
evanh Dec 1, 2025
d00c250
use square brackets
evanh Dec 1, 2025
41c272c
add random load balancer
evanh Dec 1, 2025
2dfedb3
randomize buckets
evanh Dec 1, 2025
439ab24
add metrics
evanh Dec 2, 2025
fd99b43
add complete metrics
evanh Dec 2, 2025
fa4aaa2
histogram
evanh Dec 2, 2025
d4ab059
fix
evanh Dec 2, 2025
ee89845
cleanup
evanh Dec 2, 2025
6100436
add logging
evanh Dec 3, 2025
6adb3fd
try removing write lock
evanh Dec 3, 2025
723b8c0
add logging
evanh Dec 3, 2025
2deca56
roll back logging
evanh Dec 3, 2025
843f7a6
don't delete
evanh Dec 3, 2025
2337d80
refactor locks
evanh Dec 3, 2025
ef10a87
try to speed up calls
evanh Dec 3, 2025
e249c32
some cleanup
evanh Dec 4, 2025
970be70
add mre logging
evanh Dec 4, 2025
02fe03c
try using async backtrace
evanh Dec 4, 2025
f54a3b9
add retries
evanh Dec 5, 2025
7e14e89
refactor connections
evanh Dec 5, 2025
ba31ce5
fix processing bug for expired keys
evanh Dec 5, 2025
78e4863
remove
evanh Dec 5, 2025
dd45f27
fix potential bug
evanh Dec 5, 2025
7e40900
remove delete
evanh Dec 5, 2025
b07c76d
handle panic
evanh Dec 8, 2025
ce0aee5
print error
evanh Dec 8, 2025
8ed8bba
temp
evanh Dec 8, 2025
24fe656
some logging
evanh Dec 9, 2025
284c367
try to fix empty activation bug
evanh Dec 9, 2025
580963c
attempt
evanh Dec 9, 2025
dd94f84
no print
evanh Dec 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 239 additions & 19 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ debug = 1

[dependencies]
anyhow = "1.0.92"
async-backtrace = "0.2.7"
base64 = "0.21.0"
bytes = "1.10.0"
chrono = { version = "0.4.26" }
clap = { version = "4.5.20", features = ["derive"] }
deadpool-redis = { version = "0.22.0", features = ["cluster"] }
elegant-departure = { version = "0.3.1", features = ["tokio"] }
cityhasher = "0.1.0"
figment = { version = "0.10.19", features = ["env", "yaml", "test"] }
futures = "0.3.31"
futures-util = "0.3.31"
Expand All @@ -28,6 +32,7 @@ prost = "0.13"
prost-types = "0.13.3"
rand = "0.8.5"
rdkafka = { version = "0.37.0", features = ["cmake-build", "ssl"] }
redis = "0.32.7"
sentry = { version = "0.41.0", default-features = false, features = [
# default features, except `release-health` is disabled
"backtrace",
Expand All @@ -44,6 +49,7 @@ serde = "1.0.214"
serde_yaml = "0.9.34"
sha2 = "0.10.8"
sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "chrono"] }
thiserror = "2.0.17"
tokio = { version = "1.43.1", features = ["full"] }
tokio-stream = { version = "0.1.16", features = ["full"] }
tokio-util = "0.7.12"
Expand Down
14 changes: 14 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,15 @@ pub struct Config {

/// Enable additional metrics for the sqlite.
pub enable_sqlite_status_metrics: bool,

/// Redis cluster information
pub redis_cluster_urls: Vec<String>,

pub namespaces: Vec<String>,

pub num_redis_buckets: usize,

pub payload_ttl_seconds: u64,
}

impl Default for Config {
Expand Down Expand Up @@ -279,6 +288,11 @@ impl Default for Config {
full_vacuum_on_upkeep: true,
vacuum_interval_ms: 30000,
enable_sqlite_status_metrics: true,
// Redis information
redis_cluster_urls: vec!["redis://127.0.0.1:6379".to_owned()],
namespaces: vec!["default".to_owned()],
num_redis_buckets: 256,
payload_ttl_seconds: 60 * 60 * 24,
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use std::sync::Arc;
use std::time::Instant;
use tonic::{Request, Response, Status};

use crate::store::inflight_activation::{InflightActivationStatus, InflightActivationStore};
use crate::store::inflight_activation::InflightActivationStatus;
use crate::store::inflight_redis_activation::RedisActivationStore;
use tracing::{error, instrument};

pub struct TaskbrokerServer {
pub store: Arc<InflightActivationStore>,
pub store: Arc<RedisActivationStore>,
}

#[tonic::async_trait]
Expand All @@ -29,7 +30,6 @@ impl ConsumerService for TaskbrokerServer {
.store
.get_pending_activation(namespace.as_deref())
.await;

match inflight {
Ok(Some(inflight)) => {
let now = Utc::now();
Expand Down
51 changes: 38 additions & 13 deletions src/grpc/server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use tonic::{Code, Request};

use crate::grpc::server::TaskbrokerServer;

use crate::test_utils::{create_test_store, make_activations};
use crate::test_utils::{create_redis_test_store, make_activations};

#[tokio::test]
async fn test_get_task() {
let store = create_test_store().await;
let store = create_redis_test_store().await;
store.delete_all_keys().await.unwrap();
let service = TaskbrokerServer { store };
let request = GetTaskRequest { namespace: None };
let response = service.get_task(Request::new(request)).await;
Expand All @@ -21,7 +22,8 @@ async fn test_get_task() {
#[tokio::test]
#[allow(deprecated)]
async fn test_set_task_status() {
let store = create_test_store().await;
let store = create_redis_test_store().await;
store.delete_all_keys().await.unwrap();
let service = TaskbrokerServer { store };
let request = SetTaskStatusRequest {
id: "test_task".to_string(),
Expand All @@ -37,7 +39,8 @@ async fn test_set_task_status() {
#[tokio::test]
#[allow(deprecated)]
async fn test_set_task_status_invalid() {
let store = create_test_store().await;
let store = create_redis_test_store().await;
store.delete_all_keys().await.unwrap();
let service = TaskbrokerServer { store };
let request = SetTaskStatusRequest {
id: "test_task".to_string(),
Expand All @@ -57,46 +60,68 @@ async fn test_set_task_status_invalid() {
#[tokio::test]
#[allow(deprecated)]
async fn test_get_task_success() {
let store = create_test_store().await;
let store = create_redis_test_store().await;
store.delete_all_keys().await.unwrap();
let activations = make_activations(1);
store.store(activations).await.unwrap();

let service = TaskbrokerServer { store };
let service = TaskbrokerServer {
store: store.clone(),
};
let request = GetTaskRequest { namespace: None };
let response = service.get_task(Request::new(request)).await;
assert!(response.is_ok());
let resp = response.unwrap();
assert!(resp.get_ref().task.is_some());
let task = resp.get_ref().task.as_ref().unwrap();
assert!(task.id == "id_0");
assert!(store.count_pending_activations().await.unwrap() == 0);
assert!(store.count_processing_activations().await.unwrap() == 1);
}

#[tokio::test]
#[allow(deprecated)]
async fn test_set_task_status_success() {
let store = create_test_store().await;
let store = create_redis_test_store().await;
store.delete_all_keys().await.unwrap();
let activations = make_activations(2);
store.store(activations).await.unwrap();

let service = TaskbrokerServer { store };
let service = TaskbrokerServer {
store: store.clone(),
};

let request = GetTaskRequest { namespace: None };
let response = service.get_task(Request::new(request)).await;
assert!(response.is_ok());
let resp = response.unwrap();
assert!(resp.get_ref().task.is_some());
let task = resp.get_ref().task.as_ref().unwrap();
assert!(task.id == "id_0");

assert!(task.id == "id_0" || task.id == "id_1");
let first_task_id = task.id.clone();
let request = SetTaskStatusRequest {
id: "id_0".to_string(),
id: first_task_id.clone(),
status: 5, // Complete
fetch_next_task: Some(FetchNextTask { namespace: None }),
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_ok());
println!("response: {:?}", response);
assert!(response.is_ok(), "response: {:?}", response);
let resp = response.unwrap();
assert!(resp.get_ref().task.is_some());
let task = resp.get_ref().task.as_ref().unwrap();
assert_eq!(task.id, "id_1");
let second_task_id = if first_task_id == "id_0" {
"id_1"
} else {
"id_0"
};
assert_eq!(task.id, second_task_id);
let pending_count = store.count_pending_activations().await.unwrap();
let processing_count = store.count_processing_activations().await.unwrap();
assert!(pending_count == 0, "pending_count: {:?}", pending_count);
assert!(
processing_count == 1,
"processing_count: {:?}",
processing_count
);
}
34 changes: 27 additions & 7 deletions src/kafka/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::store::inflight_redis_activation::RedisActivationStore;
use anyhow::{Error, anyhow};
use futures::{
Stream, StreamExt,
Expand All @@ -21,10 +22,8 @@ use std::{
future::Future,
iter,
mem::take,
sync::{
Arc,
mpsc::{SyncSender, sync_channel},
},
sync::Arc,
sync::mpsc::{SyncSender, sync_channel},
time::Duration,
};
use tokio::{
Expand All @@ -44,14 +43,15 @@ use tracing::{debug, error, info, instrument, warn};
pub async fn start_consumer(
topics: &[&str],
kafka_client_config: &ClientConfig,
redis_store: Arc<RedisActivationStore>,
spawn_actors: impl FnMut(
Arc<StreamConsumer<KafkaContext>>,
&BTreeSet<(String, i32)>,
) -> ActorHandles,
) -> Result<(), Error> {
let (client_shutdown_sender, client_shutdown_receiver) = oneshot::channel();
let (event_sender, event_receiver) = unbounded_channel();
let context = KafkaContext::new(event_sender.clone());
let context = KafkaContext::new(event_sender.clone(), redis_store.clone());
let consumer: Arc<StreamConsumer<KafkaContext>> = Arc::new(
kafka_client_config
.create_with_context(context)
Expand All @@ -67,6 +67,7 @@ pub async fn start_consumer(
metrics::gauge!("arroyo.consumer.current_partitions").set(0);
handle_events(
consumer,
redis_store,
event_receiver,
client_shutdown_sender,
spawn_actors,
Expand Down Expand Up @@ -118,11 +119,18 @@ pub fn poll_consumer_client(
#[derive(Debug)]
pub struct KafkaContext {
event_sender: UnboundedSender<(Event, SyncSender<()>)>,
redis_store: Arc<RedisActivationStore>,
}

impl KafkaContext {
pub fn new(event_sender: UnboundedSender<(Event, SyncSender<()>)>) -> Self {
Self { event_sender }
pub fn new(
event_sender: UnboundedSender<(Event, SyncSender<()>)>,
redis_store: Arc<RedisActivationStore>,
) -> Self {
Self {
event_sender,
redis_store,
}
}
}

Expand Down Expand Up @@ -339,6 +347,7 @@ enum ConsumerState {
#[instrument(skip_all)]
pub async fn handle_events(
consumer: Arc<StreamConsumer<KafkaContext>>,
redis_store: Arc<RedisActivationStore>,
events: UnboundedReceiver<(Event, SyncSender<()>)>,
shutdown_client: oneshot::Sender<()>,
mut spawn_actors: impl FnMut(
Expand Down Expand Up @@ -372,6 +381,17 @@ pub async fn handle_events(
state = match (state, event) {
(ConsumerState::Ready, Event::Assign(tpl)) => {
metrics::gauge!("arroyo.consumer.current_partitions").set(tpl.len() as f64);
let mut topics = HashMap::<String, Vec<i32>>::new();
for (topic, partition) in tpl.iter() {
if !topics.contains_key(topic) {
topics.insert(topic.clone(), vec![*partition]);
} else {
topics.get_mut(topic).unwrap().push(*partition);
}
}
for (topic, partitions) in topics.iter() {
redis_store.rebalance_partitions(topic.clone(), partitions.clone());
}
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
}
(ConsumerState::Ready, Event::Revoke(_)) => {
Expand Down
1 change: 1 addition & 0 deletions src/kafka/deserialize_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub fn new(
id: activation.id.clone(),
activation: payload.to_vec(),
status,
topic: msg.topic().to_string(),
partition: msg.partition(),
offset: msg.offset(),
added_at: Utc::now(),
Expand Down
7 changes: 7 additions & 0 deletions src/kafka/inflight_activation_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ demoted_namespaces:
}
.encode_to_vec(),
status: InflightActivationStatus::Pending,
topic: "test_topic".to_string(),
partition: 0,
offset: 0,
added_at: Utc::now(),
Expand Down Expand Up @@ -306,6 +307,7 @@ demoted_namespaces:
}
.encode_to_vec(),
status: InflightActivationStatus::Pending,
topic: "test_topic".to_string(),
partition: 0,
offset: 0,
added_at: Utc::now(),
Expand Down Expand Up @@ -355,6 +357,7 @@ demoted_namespaces:
}
.encode_to_vec(),
status: InflightActivationStatus::Pending,
topic: "test_topic".to_string(),
partition: 0,
offset: 0,
added_at: Utc::now(),
Expand Down Expand Up @@ -406,6 +409,7 @@ demoted_namespaces:
}
.encode_to_vec(),
status: InflightActivationStatus::Pending,
topic: "test_topic".to_string(),
partition: 0,
offset: 0,
added_at: Utc::now(),
Expand Down Expand Up @@ -437,6 +441,7 @@ demoted_namespaces:
}
.encode_to_vec(),
status: InflightActivationStatus::Pending,
topic: "test_topic".to_string(),
partition: 0,
offset: 0,
added_at: Utc::now(),
Expand Down Expand Up @@ -497,6 +502,7 @@ demoted_topic: taskworker-demoted"#;
}
.encode_to_vec(),
status: InflightActivationStatus::Pending,
topic: "test_topic".to_string(),
partition: 0,
offset: 0,
added_at: Utc::now(),
Expand Down Expand Up @@ -528,6 +534,7 @@ demoted_topic: taskworker-demoted"#;
}
.encode_to_vec(),
status: InflightActivationStatus::Pending,
topic: "test_topic".to_string(),
partition: 0,
offset: 0,
added_at: Utc::now(),
Expand Down
Loading
Loading