Skip to content

Commit

Permalink
Merge pull request #242 from superfly/fix-catchup
Browse files Browse the repository at this point in the history
pubsub: fix catch up logic
  • Loading branch information
pborzenkov authored Jul 24, 2024
2 parents 5f338b7 + a3276f9 commit 6b04f27
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion crates/corro-agent/src/api/public/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ pub async fn catch_up_sub(
}
};

let min_change_id = last_change_id + 1;
let mut min_change_id = last_change_id + 1;
info!(sub_id = %matcher.id(), "minimum expected change id: {min_change_id:?}");

let mut pending_event = None;
Expand Down Expand Up @@ -522,6 +522,8 @@ pub async fn catch_up_sub(
if let Some(change_id) = last_sub_change_id {
debug!(sub_id = %matcher.id(), "got a change to check: {change_id:?}");
for i in 0..5 {
min_change_id = last_change_id + 1;

if change_id > min_change_id {
// missed some updates!
info!(sub_id = %matcher.id(), "attempt #{} to catch up subcription from change id: {change_id:?} (last: {last_change_id:?})", i+1);
Expand Down Expand Up @@ -569,6 +571,8 @@ pub async fn catch_up_sub(
warn!(sub_id = %matcher.id(), "could not send buffered events to subscriber, receiver must be gone!");
return;
}

last_change_id = change_id;
}
}

Expand All @@ -586,6 +590,8 @@ pub async fn catch_up_sub(
warn!(sub_id = %matcher.id(), "could not send buffered events to subscriber, receiver must be gone!");
return;
}

last_change_id = change_id;
}
}

Expand Down

0 comments on commit 6b04f27

Please sign in to comment.