Skip to content

Commit

Permalink
compute: fix advancement of replica-targeted subscribe frontiers
Browse files Browse the repository at this point in the history
This commit changes how the compute controller handles frontier updates
for replica-targeted subscribes. Previously, each replica could advance
the global frontier of a subscribe, targeted or not. With this commit,
only the targeted replica can cause a global frontier advancement.

This fixes the following error scenario:

 * A non-targeted replica advances to the empty frontier.
 * The compute controller downgrades the subscribe's global frontier to
   the empty frontier.
 * The compute controller sends an `AllowCompaction([])` command to all
   replicas.
 * The targeted replica receives the `AllowCompaction([])` and considers
   the subscribe dropped.
 * The compute controller never receives a subscribe response from the
   targeted replica.
  • Loading branch information
teskje committed Feb 7, 2024
1 parent 1024606 commit c0eb76d
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 23 deletions.
60 changes: 37 additions & 23 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,25 @@ where
/// Panics if any of the `updates` references an absent collection.
/// Panics if any of the `updates` regresses an existing write frontier.
#[tracing::instrument(level = "debug", skip(self))]
fn update_write_frontiers(
&mut self,
replica_id: ReplicaId,
updates: &[(GlobalId, Antichain<T>)],
) {
// Apply advancements of replica frontiers.
self.update_replica_write_frontiers(replica_id, updates);

// Apply advancements of global collection frontiers.
self.maybe_update_global_write_frontiers(updates);
}

/// Apply replica write frontier updates.
///
/// # Panics
///
/// Panics if any of the `updates` references an absent collection.
/// Panics if any of the `updates` regresses an existing replica write frontier.
#[tracing::instrument(level = "debug", skip(self))]
fn update_replica_write_frontiers(
&mut self,
replica_id: ReplicaId,
Expand Down Expand Up @@ -1239,17 +1258,6 @@ where
self.storage_controller
.update_read_capabilities(&mut storage_read_capability_changes);
}

// Apply advancements of global collection frontiers to the controller state.
let global_updates: Vec<_> = updates
.iter()
.filter(|(id, new_upper)| {
let collection = self.compute.expect_collection(*id);
PartialOrder::less_than(&collection.write_frontier, new_upper)
})
.cloned()
.collect();
self.update_global_write_frontiers(&global_updates);
}

/// Remove frontier tracking state for the given replica.
Expand Down Expand Up @@ -1277,12 +1285,13 @@ where

/// Apply global write frontier updates.
///
/// Frontier regressions are gracefully ignored.
///
/// # Panics
///
/// Panics if any of the `updates` references an absent collection.
/// Panics if any of the `updates` regresses an existing write frontier.
#[tracing::instrument(level = "debug", skip(self))]
fn update_global_write_frontiers(&mut self, updates: &[(GlobalId, Antichain<T>)]) {
fn maybe_update_global_write_frontiers(&mut self, updates: &[(GlobalId, Antichain<T>)]) {
// Compute and apply read capability downgrades that result from collection frontier
// advancements.
let mut read_capability_changes = BTreeMap::new();
Expand All @@ -1292,11 +1301,9 @@ where
let old_upper = std::mem::replace(&mut collection.write_frontier, new_upper.clone());
let old_since = &collection.implied_capability;

// Safety check against frontier regressions.
assert!(
PartialOrder::less_equal(&old_upper, new_upper),
"global frontier regression: {old_upper:?} -> {new_upper:?}, collection={id}",
);
if !PartialOrder::less_than(&old_upper, new_upper) {
continue; // frontier has not advanced
}

let new_since = match &collection.read_policy {
Some(read_policy) => {
Expand Down Expand Up @@ -1495,7 +1502,7 @@ where

self.compute
.update_hydration_status(id, replica_id, &new_frontier);
self.update_replica_write_frontiers(replica_id, &[(id, new_frontier)]);
self.update_write_frontiers(replica_id, &[(id, new_frontier)]);
}

fn handle_peek_response(
Expand Down Expand Up @@ -1541,17 +1548,17 @@ where
return None;
}

// Always apply write frontier updates. Even if the subscribe is not tracked anymore, there
// might still be replicas reading from its inputs, so we need to track the frontiers until
// all replicas have advanced to the empty one.
// Always apply replica write frontier updates. Even if the subscribe is not tracked
// anymore, there might still be replicas reading from its inputs, so we need to track the
// frontiers until all replicas have advanced to the empty one.
let write_frontier = match &response {
SubscribeResponse::Batch(batch) => batch.upper.clone(),
SubscribeResponse::DroppedAt(_) => Antichain::new(),
};

self.compute
.update_hydration_status(subscribe_id, replica_id, &write_frontier);
self.update_replica_write_frontiers(replica_id, &[(subscribe_id, write_frontier)]);
self.update_replica_write_frontiers(replica_id, &[(subscribe_id, write_frontier.clone())]);

// If the subscribe is not tracked, or targets a different replica, there is nothing to do.
let mut subscribe = self.compute.subscribes.get(&subscribe_id)?.clone();
Expand All @@ -1560,6 +1567,13 @@ where
return None;
}

// Apply a global frontier update.
// If this is a replica-targeted subscribe, it is important that we advance the global
// frontier only based on responses from the targeted replica. Otherwise, another replica
// could advance to the empty frontier, making us drop the subscribe on the targeted
// replica prematurely.
self.maybe_update_global_write_frontiers(&[(subscribe_id, write_frontier)]);

match response {
SubscribeResponse::Batch(batch) => {
let upper = batch.upper;
Expand Down
33 changes: 33 additions & 0 deletions test/testdrive/replica-targeting.td
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,36 @@ $ set-regex match=\d{13} replacement=<TIMESTAMP>
> FETCH c
<TIMESTAMP> 1 4
> COMMIT

# Test that replica-targeted subscribes work when the subscribed collection
# advances to the empty frontier. Regression test for #24981.

> DROP CLUSTER test CASCADE
> CREATE CLUSTER test SIZE '4-4', REPLICATION FACTOR 4
> SET cluster_replica = r1

> CREATE MATERIALIZED VIEW mv AS SELECT 1

> BEGIN
> DECLARE c CURSOR FOR SUBSCRIBE mv
> FETCH c
<TIMESTAMP> 1 1
> COMMIT

# We want to provoke the case where a non-targeted replica returns a response
# first, so try multiple times to be sure.

> BEGIN
> DECLARE c CURSOR FOR SUBSCRIBE mv
> FETCH c
<TIMESTAMP> 1 1
> COMMIT

> BEGIN
> DECLARE c CURSOR FOR SUBSCRIBE mv
> FETCH c
<TIMESTAMP> 1 1
> COMMIT

# Cleanup
> DROP CLUSTER test CASCADE

0 comments on commit c0eb76d

Please sign in to comment.