Skip to content

Commit

Permalink
Merge pull request #252 from superfly/drop-items
Browse files Browse the repository at this point in the history
Drop items when processing queue is full
  • Loading branch information
somtochiama authored Aug 26, 2024
2 parents ddc4184 + 8c7d3b7 commit 589ce3c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
30 changes: 30 additions & 0 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ pub async fn handle_changes(
mut tripwire: Tripwire,
) {
let max_changes_chunk: usize = agent.config().perf.apply_queue_len;
let max_queue_len: usize = agent.config().perf.processing_queue_len;
let mut queue: VecDeque<(ChangeV1, ChangeSource, Instant)> = VecDeque::new();
let mut buf = vec![];
let mut buf_cost = 0;
Expand All @@ -746,6 +747,7 @@ pub async fn handle_changes(
const KEEP_SEEN_CACHE_SIZE: usize = 1000;
let mut seen: IndexMap<_, RangeInclusiveSet<CrsqlSeq>> = IndexMap::new();

let mut drop_log_count: u64 = 0;
// complicated loop to process changes efficiently w/ a max concurrency
// and a minimum chunk size for bigger and faster SQLite transactions
loop {
Expand Down Expand Up @@ -861,6 +863,26 @@ pub async fn handle_changes(
continue;
}

// drop items when the queue is full.
if queue.len() > max_queue_len {
drop_log_count += 1;
if is_pow_10(drop_log_count) {
if drop_log_count == 1 {
warn!("dropping a change because changes queue is full");
} else {
warn!(
"dropping {} changes because changes queue is full",
drop_log_count
);
}
}
// reset count
if drop_log_count == 100000000 {
drop_log_count = 0;
}
continue;
}

if let Some(mut seqs) = change.seqs().cloned() {
let v = *change.versions().start();
if let Some(seen_seqs) = seen.get(&(change.actor_id, v)) {
Expand Down Expand Up @@ -1142,3 +1164,11 @@ mod tests {
Ok(())
}
}

#[inline]
fn is_pow_10(i: u64) -> bool {
matches!(
i,
1 | 10 | 100 | 1000 | 10000 | 1000000 | 10000000 | 100000000
)
}
6 changes: 6 additions & 0 deletions crates/corro-types/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ const fn default_apply_queue() -> usize {
const fn default_wal_threshold() -> usize {
10
}
const fn default_processing_queue() -> usize {
100000
}

/// Used for the apply channel
const fn default_huge_channel() -> usize {
Expand Down Expand Up @@ -187,6 +190,8 @@ pub struct PerfConfig {
pub apply_queue_len: usize,
#[serde(default = "default_wal_threshold")]
pub wal_threshold_gb: usize,
#[serde(default = "default_processing_queue")]
pub processing_queue_len: usize,
}

impl Default for PerfConfig {
Expand All @@ -204,6 +209,7 @@ impl Default for PerfConfig {
apply_queue_timeout: default_apply_timeout(),
apply_queue_len: default_apply_queue(),
wal_threshold_gb: default_wal_threshold(),
processing_queue_len: default_processing_queue(),
}
}
}
Expand Down

0 comments on commit 589ce3c

Please sign in to comment.