diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 56b3245f..76c12743 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -731,6 +731,7 @@ pub async fn handle_changes( mut tripwire: Tripwire, ) { let max_changes_chunk: usize = agent.config().perf.apply_queue_len; + let max_queue_len: usize = agent.config().perf.processing_queue_len; let mut queue: VecDeque<(ChangeV1, ChangeSource, Instant)> = VecDeque::new(); let mut buf = vec![]; let mut buf_cost = 0; @@ -746,6 +747,7 @@ pub async fn handle_changes( const KEEP_SEEN_CACHE_SIZE: usize = 1000; let mut seen: IndexMap<_, RangeInclusiveSet> = IndexMap::new(); + let mut drop_log_count: u64 = 0; // complicated loop to process changes efficiently w/ a max concurrency // and a minimum chunk size for bigger and faster SQLite transactions loop { @@ -861,6 +863,26 @@ pub async fn handle_changes( continue; } + // drop items when the queue is full. + if queue.len() > max_queue_len { + drop_log_count += 1; + if is_pow_10(drop_log_count) { + if drop_log_count == 1 { + warn!("dropping a change because changes queue is full"); + } else { + warn!( + "dropping {} changes because changes queue is full", + drop_log_count + ); + } + } + // reset count + if drop_log_count == 100000000 { + drop_log_count = 0; + } + continue; + } + if let Some(mut seqs) = change.seqs().cloned() { let v = *change.versions().start(); if let Some(seen_seqs) = seen.get(&(change.actor_id, v)) { @@ -1142,3 +1164,11 @@ mod tests { Ok(()) } } + +#[inline] +fn is_pow_10(i: u64) -> bool { + matches!( + i, + 1 | 10 | 100 | 1000 | 10000 | 1000000 | 10000000 | 100000000 + ) +} diff --git a/crates/corro-types/src/config.rs b/crates/corro-types/src/config.rs index 348bd362..7681bfb9 100644 --- a/crates/corro-types/src/config.rs +++ b/crates/corro-types/src/config.rs @@ -14,6 +14,9 @@ const fn default_apply_queue() -> usize { const fn default_wal_threshold() -> usize { 10 } +const fn default_processing_queue() -> usize { + 100000 +} /// Used for the apply channel const fn default_huge_channel() -> usize { @@ -187,6 +190,8 @@ pub struct PerfConfig { pub apply_queue_len: usize, #[serde(default = "default_wal_threshold")] pub wal_threshold_gb: usize, + #[serde(default = "default_processing_queue")] + pub processing_queue_len: usize, } impl Default for PerfConfig { @@ -204,6 +209,7 @@ impl Default for PerfConfig { apply_queue_timeout: default_apply_timeout(), apply_queue_len: default_apply_queue(), wal_threshold_gb: default_wal_threshold(), + processing_queue_len: default_processing_queue(), } } }