Skip to content

Commit

Permalink
lower threshold ot process changes buffer. use a sleep as a deadline …
Browse files Browse the repository at this point in the history
…instead of an interval to process buffer
  • Loading branch information
jeromegn committed Aug 29, 2024
1 parent b107ecf commit 2168f4a
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions crates/corro-types/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1117,8 +1117,9 @@ impl Matcher {
}

async fn cmd_loop(mut self, mut state_conn: CrConn, mut tripwire: Tripwire) {
const PROCESS_CHANGES_THRESHOLD: usize = 400;
const PROCESS_CHANGES_THRESHOLD: usize = 250;
const PROCESSING_WARN_THRESHOLD: Duration = Duration::from_secs(5);
const PROCESS_BUFFER_DEADLINE: Duration = Duration::from_millis(600);

info!(sub_id = %self.id, "Starting loop to run the subscription");
{
Expand All @@ -1137,7 +1138,8 @@ impl Matcher {
let mut purge_changes_interval = tokio::time::interval(Duration::from_secs(300));

// max duration of aggregating candidates
let mut process_changes_interval = tokio::time::interval(Duration::from_millis(600));
let process_changes_deadline = tokio::time::sleep(PROCESS_BUFFER_DEADLINE);
tokio::pin!(process_changes_deadline);

loop {
enum Branch {
Expand Down Expand Up @@ -1174,7 +1176,7 @@ impl Matcher {
continue;
}
},
_ = process_changes_interval.tick() => {
_ = process_changes_deadline.as_mut() => {
if buf_count == 0 {
continue;
}
Expand Down Expand Up @@ -1213,8 +1215,14 @@ impl Matcher {
warn!(sub_id = %self.id, "processed {buf_count} changes (very slowly) for subscription in {elapsed:?}");
}
buf_count = 0;

// reset the deadline
process_changes_deadline
.as_mut()
.reset((Instant::now() + PROCESS_BUFFER_DEADLINE).into());
}
Branch::PurgeOldChanges => {
let start = Instant::now();
let res = block_in_place(|| {
let tx = self.conn.transaction()?;

Expand All @@ -1229,7 +1237,7 @@ impl Matcher {

match res {
Ok(deleted) => {
info!(sub_id = %self.id, "Deleted {deleted} old changes row")
info!(sub_id = %self.id, "Deleted {deleted} old changes row in {:?}", start.elapsed());
}
Err(e) => {
error!(sub_id = %self.id, "could not delete old changes: {e}");
Expand Down

0 comments on commit 2168f4a

Please sign in to comment.