Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pg_migrations/0001_create_inflight_activations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ CREATE TABLE IF NOT EXISTS inflight_taskactivations (
taskname TEXT NOT NULL,
on_attempts_exceeded INTEGER NOT NULL DEFAULT 1
);

CREATE INDEX idx_activation_partition ON inflight_taskactivations (partition);
10 changes: 10 additions & 0 deletions src/kafka/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::store::inflight_activation::InflightActivationStore;
use anyhow::{Error, anyhow};
use futures::{
Stream, StreamExt,
Expand Down Expand Up @@ -44,6 +45,7 @@ use tracing::{debug, error, info, instrument, warn};
pub async fn start_consumer(
topics: &[&str],
kafka_client_config: &ClientConfig,
activation_store: Arc<dyn InflightActivationStore>,
spawn_actors: impl FnMut(
Arc<StreamConsumer<KafkaContext>>,
&BTreeSet<(String, i32)>,
Expand All @@ -68,6 +70,7 @@ pub async fn start_consumer(
handle_events(
consumer,
event_receiver,
activation_store,
client_shutdown_sender,
spawn_actors,
)
Expand Down Expand Up @@ -340,6 +343,7 @@ enum ConsumerState {
pub async fn handle_events(
consumer: Arc<StreamConsumer<KafkaContext>>,
events: UnboundedReceiver<(Event, SyncSender<()>)>,
activation_store: Arc<dyn InflightActivationStore>,
shutdown_client: oneshot::Sender<()>,
mut spawn_actors: impl FnMut(
Arc<StreamConsumer<KafkaContext>>,
Expand Down Expand Up @@ -372,6 +376,12 @@ pub async fn handle_events(
state = match (state, event) {
(ConsumerState::Ready, Event::Assign(tpl)) => {
metrics::gauge!("arroyo.consumer.current_partitions").set(tpl.len() as f64);
// Note: This assumes we only process one topic per consumer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far that has always been true.

let mut partitions = Vec::<i32>::new();
for (_, partition) in tpl.iter() {
partitions.push(*partition);
}
activation_store.assign_partitions(partitions).unwrap();
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
}
(ConsumerState::Ready, Event::Revoke(_)) => {
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ async fn main() -> Result<(), Error> {
start_consumer(
&[&consumer_config.kafka_topic],
&consumer_config.kafka_consumer_config(),
consumer_store.clone(),
processing_strategy!({
err:
OsStreamWriter::new(
Expand Down
56 changes: 35 additions & 21 deletions src/store/inflight_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,21 +337,13 @@ impl InflightActivationStoreConfig {

#[async_trait]
pub trait InflightActivationStore: Send + Sync {
/// Trigger incremental vacuum to reclaim free pages in the database
async fn vacuum_db(&self) -> Result<(), Error>;

/// Perform a full vacuum on the database
async fn full_vacuum_db(&self) -> Result<(), Error>;

/// Get the size of the database in bytes
async fn db_size(&self) -> Result<u64, Error>;

/// Get an activation by id
async fn get_by_id(&self, id: &str) -> Result<Option<InflightActivation>, Error>;

/// CONSUMER OPERATIONS
/// Store a batch of activations
async fn store(&self, batch: Vec<InflightActivation>) -> Result<QueryResult, Error>;

fn assign_partitions(&self, partitions: Vec<i32>) -> Result<(), Error>;

/// SERVER OPERATIONS
/// Get a single pending activation, optionally filtered by namespace
async fn get_pending_activation(
&self,
Expand Down Expand Up @@ -385,6 +377,14 @@ pub trait InflightActivationStore: Send + Sync {
limit: Option<i32>,
) -> Result<Vec<InflightActivation>, Error>;

/// Update the status of a specific activation
async fn set_status(
&self,
id: &str,
status: InflightActivationStatus,
) -> Result<Option<InflightActivation>, Error>;

/// COUNT OPERATIONS
/// Get the age of the oldest pending activation in seconds
async fn pending_activation_max_lag(&self, now: &DateTime<Utc>) -> f64;

Expand All @@ -400,12 +400,9 @@ pub trait InflightActivationStore: Send + Sync {
/// Count all activations
async fn count(&self) -> Result<usize, Error>;

/// Update the status of a specific activation
async fn set_status(
&self,
id: &str,
status: InflightActivationStatus,
) -> Result<Option<InflightActivation>, Error>;
/// ACTIVATION OPERATIONS
/// Get an activation by id
async fn get_by_id(&self, id: &str) -> Result<Option<InflightActivation>, Error>;

/// Set the processing deadline for a specific activation
async fn set_processing_deadline(
Expand All @@ -417,12 +414,20 @@ pub trait InflightActivationStore: Send + Sync {
/// Delete an activation by id
async fn delete_activation(&self, id: &str) -> Result<(), Error>;

/// DATABASE OPERATIONS
/// Trigger incremental vacuum to reclaim free pages in the database
async fn vacuum_db(&self) -> Result<(), Error>;

/// Perform a full vacuum on the database
async fn full_vacuum_db(&self) -> Result<(), Error>;

/// Get the size of the database in bytes
async fn db_size(&self) -> Result<u64, Error>;

/// UPKEEP OPERATIONS
/// Get all activations with status Retry
async fn get_retry_activations(&self) -> Result<Vec<InflightActivation>, Error>;

/// Clear all activations from the store
async fn clear(&self) -> Result<(), Error>;

/// Update tasks that exceeded their processing deadline
async fn handle_processing_deadline(&self) -> Result<u64, Error>;

Expand All @@ -447,6 +452,10 @@ pub trait InflightActivationStore: Send + Sync {
/// Remove killswitched tasks
async fn remove_killswitched(&self, killswitched_tasks: Vec<String>) -> Result<u64, Error>;

/// TEST OPERATIONS
/// Clear all activations from the store
async fn clear(&self) -> Result<(), Error>;

/// Remove the database, used only in tests
async fn remove_db(&self) -> Result<(), Error> {
Ok(())
Expand Down Expand Up @@ -714,6 +723,11 @@ impl InflightActivationStore for SqliteActivationStore {
Ok(Some(row.into()))
}

fn assign_partitions(&self, partitions: Vec<i32>) -> Result<(), Error> {
warn!("assign_partitions: {:?}", partitions);
Ok(())
}

#[instrument(skip_all)]
async fn store(&self, batch: Vec<InflightActivation>) -> Result<QueryResult, Error> {
if batch.is_empty() {
Expand Down
99 changes: 99 additions & 0 deletions src/store/inflight_activation_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,105 @@ async fn test_set_activation_status(#[case] adapter: &str) {
store.remove_db().await.unwrap();
}

#[tokio::test]
#[rstest]
#[case::postgres("postgres")]
async fn test_set_activation_status_with_partitions(#[case] adapter: &str) {
let store = create_test_store(adapter).await;

let mut batch = make_activations(2);
batch[1].partition = 1;
assert!(store.store(batch).await.is_ok());
assert_counts(
StatusCount {
pending: 1,
..StatusCount::default()
},
store.as_ref(),
)
.await;

assert!(
store
.set_status("id_0", InflightActivationStatus::Failure)
.await
.is_ok()
);
assert_counts(
StatusCount {
failure: 1,
..StatusCount::default()
},
store.as_ref(),
)
.await;

assert!(
store
.set_status("id_0", InflightActivationStatus::Pending)
.await
.is_ok()
);
assert_counts(
StatusCount {
pending: 1,
..StatusCount::default()
},
store.as_ref(),
)
.await;
assert!(
store
.set_status("id_0", InflightActivationStatus::Failure)
.await
.is_ok()
);
assert!(
store
.set_status("id_1", InflightActivationStatus::Failure)
.await
.is_ok()
);
// The broker can update the status of an activation in a different partition, but
// it still should not be counted in its upkeep.
assert_counts(
StatusCount {
pending: 0,
failure: 1,
..StatusCount::default()
},
store.as_ref(),
)
.await;
assert!(
store
.get_pending_activation(None, None)
.await
.unwrap()
.is_none()
);

let result = store
.set_status("not_there", InflightActivationStatus::Complete)
.await;
assert!(result.is_ok(), "no query error");

let activation = result.unwrap();
assert!(activation.is_none(), "no activation found");

let result = store
.set_status("id_0", InflightActivationStatus::Complete)
.await;
assert!(result.is_ok(), "no query error");

let result_opt = result.unwrap();
assert!(result_opt.is_some(), "activation should be returned");
let inflight = result_opt.unwrap();
assert_eq!(inflight.id, "id_0");
assert_eq!(inflight.status, InflightActivationStatus::Complete);
store.remove_db().await.unwrap();
}

#[tokio::test]
#[rstest]
#[case::sqlite("sqlite")]
Expand Down
Loading
Loading