From 18c00dff7b930580ebd52daeae06d67235704f04 Mon Sep 17 00:00:00 2001 From: Preslav Le Date: Thu, 23 May 2024 17:37:18 -0700 Subject: [PATCH] Sync Worker: Always compute transition at latest timestamp (#26258) I am converting sync worker to work with Usher, and if I keep the code as is, we will make multiple calls to latest_timestamp() per transition. One to determined the target timestamp and one to check it. Making update_scheduled a Option is generally a good idea, but we never actually leveraged it. So it is easier to revert for now. I have sent a separate RFC on how to finish the swing, but this provides little benefit until we have sync workers running on the edge. GitOrigin-RevId: 8b40ebb2d5c83b50dd7ed8cac26f6da8b088cc29 --- .../src/node_action_callbacks.rs | 4 ++ crates/sync/src/worker.rs | 41 ++++++++++--------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/crates/local_backend/src/node_action_callbacks.rs b/crates/local_backend/src/node_action_callbacks.rs index 4485bea9..c9c8c706 100644 --- a/crates/local_backend/src/node_action_callbacks.rs +++ b/crates/local_backend/src/node_action_callbacks.rs @@ -607,6 +607,7 @@ mod tests { .uri("/api/actions/schedule_job") .method("POST") .header("Authorization", backend.admin_auth_header.0.encode()) + .header("Host", "localhost") .header("Content-Type", "application/json") .header("Convex-Action-Callback-Token", callback_token.clone()) .body(schedule_body.clone().into())?; @@ -624,6 +625,7 @@ mod tests { .uri("/api/query") .method("POST") .header("Authorization", backend.admin_auth_header.0.encode()) + .header("Host", "localhost") .header("Content-Type", "application/json") .body(body)?; let result: JsonValue = backend.expect_success(req).await?; @@ -653,6 +655,7 @@ mod tests { .uri("/api/actions/schedule_job") .method("POST") .header("Authorization", backend.admin_auth_header.0.encode()) + .header("Host", "localhost") .header("Content-Type", "application/json") .header("Convex-Action-Callback-Token", callback_token.clone()) .header("Convex-Parent-Scheduled-Job", system_job_id.clone()) @@ -687,6 +690,7 @@ mod tests { .uri("/api/query") .method("POST") .header("Authorization", backend.admin_auth_header.0.encode()) + .header("Host", "localhost") .header("Content-Type", "application/json") .body(body)?; let result: JsonValue = backend.expect_success(req).await?; diff --git a/crates/sync/src/worker.rs b/crates/sync/src/worker.rs index 1d2a7194..7708ec42 100644 --- a/crates/sync/src/worker.rs +++ b/crates/sync/src/worker.rs @@ -1,5 +1,4 @@ use std::{ - cmp, collections::BTreeMap, sync::{ atomic::{ @@ -190,7 +189,6 @@ impl SingleFlightReceiver { } const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(15); -const MAX_TRANSITION_AGE: Duration = Duration::from_secs(30); pub struct SyncWorker { api: Arc, @@ -214,8 +212,7 @@ pub struct SyncWorker { transition_future: Option>>>, // Has an update been scheduled for the future? - // If so, what is the minimum timestamp at which we should compute the transition. - update_scheduled: Option, + update_scheduled: bool, connect_timer: Option, } @@ -264,16 +261,13 @@ impl SyncWorker { mutation_sender, action_futures: FuturesUnordered::new(), transition_future: None, - update_scheduled: None, + update_scheduled: false, connect_timer: Some(connect_timer()), } } fn schedule_update(&mut self) { - self.update_scheduled = cmp::max( - self.update_scheduled, - Some(*self.application.now_ts_for_reads()), - ); + self.update_scheduled = true; } /// Run the sync protocol worker, returning `Ok(())` on clean exit and `Err` @@ -303,8 +297,7 @@ impl SyncWorker { // We need to provide a guarantee that we can't transition to a // timestamp past a pending mutation or otherwise optimistic updates // might be flaky. To do that, we need to behave differently if we - // have pending operation future or not. We should also make update_scheduled - // be a min target timestamp instead of a boolean. + // have pending operation future or not. result = self.mutation_futures.next().fuse() => { let message = match result { Some(m) => m?, @@ -327,7 +320,7 @@ impl SyncWorker { }, _ = self.tx.message_consumed().fuse() => { // Wake up if any message is consumed from the send buffer - // in case we update_scheduled is True. + // in case update_scheduled is True. None } _ = ping_timeout => Some(ServerMessage::Ping {}), @@ -351,15 +344,20 @@ impl SyncWorker { } // Send update unless the send channel already contains enough transitions, // and unless we are already computing an update. - if let Some(mut target_ts) = self.update_scheduled + if self.update_scheduled && self.tx.transition_count() < *SYNC_MAX_SEND_TRANSITION_COUNT && self.transition_future.is_none() { - // If target_ts is too old, bump it to latest. - let now_ts = *self.application.now_ts_for_reads(); - if now_ts.sub(MAX_TRANSITION_AGE)? > target_ts { - target_ts = now_ts; - } + // Always transition to the latest timestamp. In the future, + // when we have Sync Worker running on the edge, we can remove this + // call by making self.update_scheduled to be a Option, + // and set it accordingly based on the operation that triggered the + // Transition. We would choose the latest timestamp available at + // the edge for the initial sync. + let target_ts = *self + .api + .latest_timestamp(self.host.as_deref(), RequestId::new()) + .await?; let new_transition_future = self.begin_update_queries(target_ts)?; self.transition_future = Some( async move { @@ -373,7 +371,7 @@ impl SyncWorker { .boxed() .fuse(), ); - self.update_scheduled = None; + self.update_scheduled = false; } } Ok(()) @@ -397,7 +395,10 @@ impl SyncWorker { } self.state.set_session_id(session_id); if let Some(max_observed_timestamp) = max_observed_timestamp { - let latest_timestamp = *self.application.now_ts_for_reads(); + let latest_timestamp = *self + .api + .latest_timestamp(self.host.as_deref(), RequestId::new()) + .await?; if max_observed_timestamp > latest_timestamp { // Unless there is a bug, this means the client have communicated // with a backend that have database writes we are not aware of. If