diff --git a/crates/corro-types/src/pubsub.rs b/crates/corro-types/src/pubsub.rs index c64627ff..4327efe2 100644 --- a/crates/corro-types/src/pubsub.rs +++ b/crates/corro-types/src/pubsub.rs @@ -1117,8 +1117,9 @@ impl Matcher { } async fn cmd_loop(mut self, mut state_conn: CrConn, mut tripwire: Tripwire) { - const PROCESS_CHANGES_THRESHOLD: usize = 400; + const PROCESS_CHANGES_THRESHOLD: usize = 250; const PROCESSING_WARN_THRESHOLD: Duration = Duration::from_secs(5); + const PROCESS_BUFFER_DEADLINE: Duration = Duration::from_millis(600); info!(sub_id = %self.id, "Starting loop to run the subscription"); { @@ -1137,7 +1138,8 @@ impl Matcher { let mut purge_changes_interval = tokio::time::interval(Duration::from_secs(300)); // max duration of aggregating candidates - let mut process_changes_interval = tokio::time::interval(Duration::from_millis(600)); + let process_changes_deadline = tokio::time::sleep(PROCESS_BUFFER_DEADLINE); + tokio::pin!(process_changes_deadline); loop { enum Branch { @@ -1174,7 +1176,7 @@ impl Matcher { continue; } }, - _ = process_changes_interval.tick() => { + _ = process_changes_deadline.as_mut() => { if buf_count == 0 { continue; } @@ -1213,8 +1215,14 @@ impl Matcher { warn!(sub_id = %self.id, "processed {buf_count} changes (very slowly) for subscription in {elapsed:?}"); } buf_count = 0; + + // reset the deadline + process_changes_deadline + .as_mut() + .reset((Instant::now() + PROCESS_BUFFER_DEADLINE).into()); } Branch::PurgeOldChanges => { + let start = Instant::now(); let res = block_in_place(|| { let tx = self.conn.transaction()?; @@ -1229,7 +1237,7 @@ impl Matcher { match res { Ok(deleted) => { - info!(sub_id = %self.id, "Deleted {deleted} old changes row") + info!(sub_id = %self.id, "Deleted {deleted} old changes row in {:?}", start.elapsed()); } Err(e) => { error!(sub_id = %self.id, "could not delete old changes: {e}");