From a3276f932b1fcf7c936b1eedcbf5ec003e73874a Mon Sep 17 00:00:00 2001 From: Pavel Borzenkov Date: Tue, 23 Jul 2024 17:29:07 +0200 Subject: [PATCH] pubsub: fix catch up logic Make sure to update minimum expected change ID when retrying catch up so that it can actually proceed if the first attempt fails. --- crates/corro-agent/src/api/public/pubsub.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/corro-agent/src/api/public/pubsub.rs b/crates/corro-agent/src/api/public/pubsub.rs index 73b7f6b2..1f7775c4 100644 --- a/crates/corro-agent/src/api/public/pubsub.rs +++ b/crates/corro-agent/src/api/public/pubsub.rs @@ -485,7 +485,7 @@ pub async fn catch_up_sub( } }; - let min_change_id = last_change_id + 1; + let mut min_change_id = last_change_id + 1; info!(sub_id = %matcher.id(), "minimum expected change id: {min_change_id:?}"); let mut pending_event = None; @@ -522,6 +522,8 @@ pub async fn catch_up_sub( if let Some(change_id) = last_sub_change_id { debug!(sub_id = %matcher.id(), "got a change to check: {change_id:?}"); for i in 0..5 { + min_change_id = last_change_id + 1; + if change_id > min_change_id { // missed some updates! info!(sub_id = %matcher.id(), "attempt #{} to catch up subcription from change id: {change_id:?} (last: {last_change_id:?})", i+1); @@ -569,6 +571,8 @@ pub async fn catch_up_sub( warn!(sub_id = %matcher.id(), "could not send buffered events to subscriber, receiver must be gone!"); return; } + + last_change_id = change_id; } } @@ -586,6 +590,8 @@ pub async fn catch_up_sub( warn!(sub_id = %matcher.id(), "could not send buffered events to subscriber, receiver must be gone!"); return; } + + last_change_id = change_id; } }