Skip to content

Commit

Permalink
Avoid FuturesUnordered in compute controller (#29725)
Browse files Browse the repository at this point in the history
Avoid the use of FuturesUnordered in the compute controller for
gathering
responses from replicas. Instead, share a channel with each replica
task.

Fixes MaterializeInc/database-issues#8587

### Checklist

- [ ] This PR has adequate test coverage / QA involvement has been duly
considered. ([trigger-ci for additional test/nightly
runs](https://trigger-ci.dev.materialize.com/))
- [ ] This PR has an associated up-to-date [design
doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md),
is a design doc
([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)),
or is sufficiently small to not require a design.
  <!-- Reference the design in the description. -->
- [ ] If this PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way), then it is tagged with a
`T-proto` label.
- [ ] If this PR will require changes to cloud orchestration or tests,
there is a companion cloud PR to account for those changes that is
tagged with the release-blocker label
([example](MaterializeInc/cloud#5021)).
<!-- Ask in #team-cloud on Slack if you need help preparing the cloud
PR. -->
- [ ] If this PR includes major [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note),
I have pinged the relevant PM to schedule a changelog post.

---------

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Sep 30, 2024
1 parent 2ba8920 commit 0bb2b32
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 96 deletions.
3 changes: 2 additions & 1 deletion src/cluster-client/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ fn main() {
let mut config = prost_build::Config::new();
config
.protoc_executable(mz_build_tools::protoc())
.btree_map(["."]);
.btree_map(["."])
.type_attribute(".", "#[allow(missing_docs)]");

tonic_build::configure()
// Enabling `emit_rerun_if_changed` will rerun the build script when
Expand Down
8 changes: 6 additions & 2 deletions src/cluster-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

#![allow(missing_docs)]

//! Types for commands to clusters.

use std::num::NonZeroI64;
Expand All @@ -32,7 +30,9 @@ include!(concat!(env!("OUT_DIR"), "/mz_cluster_client.client.rs"));
/// another in-memory and local to the current incarnation of environmentd)
#[derive(PartialEq, Eq, Debug, Copy, Clone, Serialize, Deserialize)]
pub struct ClusterStartupEpoch {
/// The environment incarnation.
envd: NonZeroI64,
/// The replica incarnation.
replica: u64,
}

Expand Down Expand Up @@ -76,6 +76,7 @@ impl Arbitrary for ClusterStartupEpoch {
}

impl ClusterStartupEpoch {
/// Construct a new cluster startup epoch, from the environment epoch and replica incarnation.
pub fn new(envd: NonZeroI64, replica: u64) -> Self {
Self { envd, replica }
}
Expand All @@ -100,10 +101,12 @@ impl ClusterStartupEpoch {
}
}

/// The environment epoch.
pub fn envd(&self) -> NonZeroI64 {
self.envd
}

/// The replica incarnation.
pub fn replica(&self) -> u64 {
self.replica
}
Expand Down Expand Up @@ -174,6 +177,7 @@ impl RustType<ProtoTimelyConfig> for TimelyConfig {
}

impl TimelyConfig {
/// Split the timely configuration into `parts` pieces, each with a different `process` number.
pub fn split_command(&self, parts: usize) -> Vec<Self> {
(0..parts)
.map(|part| TimelyConfig {
Expand Down
110 changes: 58 additions & 52 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ use std::num::NonZeroI64;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use futures::stream::FuturesUnordered;
use futures::{future, StreamExt};
use mz_build_info::BuildInfo;
use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig};
use mz_cluster_client::WallclockLagFn;
Expand All @@ -32,6 +30,7 @@ use mz_controller_types::dyncfgs::WALLCLOCK_LAG_REFRESH_INTERVAL;
use mz_dyncfg::ConfigSet;
use mz_expr::RowSetFinishing;
use mz_ore::cast::CastFrom;
use mz_ore::channel::instrumented_unbounded_channel;
use mz_ore::now::NowFn;
use mz_ore::tracing::OpenTelemetryContext;
use mz_ore::{soft_assert_or_log, soft_panic_or_log};
Expand All @@ -58,6 +57,7 @@ use crate::controller::{
StorageCollections,
};
use crate::logging::LogVariant;
use crate::metrics::IntCounter;
use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge};
use crate::protocol::command::{
ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
Expand Down Expand Up @@ -187,6 +187,10 @@ where
}
}

/// A response from a replica, composed of a replica ID, the replica's current epoch, and the
/// compute response itself.
pub(super) type ReplicaResponse<T> = (ReplicaId, u64, ComputeResponse<T>);

/// The state we keep for a compute instance.
pub(super) struct Instance<T: ComputeControllerTimestamp> {
/// Build info for spawning replicas
Expand Down Expand Up @@ -277,6 +281,10 @@ pub(super) struct Instance<T: ComputeControllerTimestamp> {
///
/// Received updates are applied by [`Instance::apply_read_hold_changes`].
read_holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
/// A sender for responses from replicas.
replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse<T>, IntCounter>,
/// A receiver for responses from replicas.
replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver<ReplicaResponse<T>, IntCounter>,
}

impl<T: ComputeControllerTimestamp> Instance<T> {
Expand Down Expand Up @@ -389,12 +397,19 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
id: ReplicaId,
client: ReplicaClient<T>,
config: ReplicaConfig,
epoch: ClusterStartupEpoch,
) {
let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();

let metrics = self.metrics.for_replica(id);
let mut replica =
ReplicaState::new(id, client, config, metrics, self.introspection_tx.clone());
let mut replica = ReplicaState::new(
id,
client,
config,
metrics,
self.introspection_tx.clone(),
epoch,
);

// Add per-replica collection state.
for (collection_id, collection) in &self.collections {
Expand Down Expand Up @@ -750,6 +765,8 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
wallclock_lag_last_refresh,
read_holds_tx: _,
read_holds_rx: _,
replica_tx: _,
replica_rx: _,
} = self;

fn field(
Expand Down Expand Up @@ -834,6 +851,10 @@ where

let history = ComputeCommandHistory::new(metrics.for_history());

let send_count = metrics.response_send_count.clone();
let recv_count = metrics.response_recv_count.clone();
let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count);

Self {
build_info,
storage_collections: storage,
Expand All @@ -858,6 +879,8 @@ where
wallclock_lag_last_refresh: Instant::now(),
read_holds_tx,
read_holds_rx,
replica_tx,
replica_rx,
}
}

Expand All @@ -878,8 +901,9 @@ where
Some(cmd) => cmd(&mut self),
None => break,
},
(replica_id, response) = Self::recv(&mut self.replicas) => {
self.handle_response(response, replica_id);
response = self.replica_rx.recv() => match response {
Some(response) => self.handle_response(response),
None => unreachable!("self owns a sender side of the channel"),
}
}
}
Expand Down Expand Up @@ -956,43 +980,8 @@ where

// Clone the command for each active replica.
for replica in self.replicas.values_mut() {
// If sending the command fails, the replica requires rehydration.
if replica.client.send(cmd.clone()).is_err() {
replica.failed = true;
}
}
}

/// Receives the next response from the given replicas.
///
/// This method is cancellation safe.
async fn recv(
replicas: &mut BTreeMap<ReplicaId, ReplicaState<T>>,
) -> (ReplicaId, ComputeResponse<T>) {
loop {
let live_replicas = replicas.iter_mut().filter(|(_, r)| !r.failed);
let response = live_replicas
.map(|(id, replica)| async { (*id, replica.client.recv().await) })
.collect::<FuturesUnordered<_>>()
.next()
.await;

match response {
None => {
// There are no live replicas left.
// Block forever to communicate that no response is ready.
future::pending().await
}
Some((replica_id, None)) => {
// A replica has failed and requires rehydration.
let replica = replicas.get_mut(&replica_id).unwrap();
replica.failed = true;
}
Some((replica_id, Some(response))) => {
// A replica has produced a response. Return it.
return (replica_id, response);
}
}
// Swallow error, we'll notice because the replica task has stopped.
let _ = replica.client.send(cmd.clone());
}
}

Expand All @@ -1012,13 +1001,15 @@ where
let replica_epoch = self.replica_epochs.entry(id).or_default();
*replica_epoch += 1;
let metrics = self.metrics.for_replica(id);
let epoch = ClusterStartupEpoch::new(self.envd_epoch, *replica_epoch);
let client = ReplicaClient::spawn(
id,
self.build_info,
config.clone(),
ClusterStartupEpoch::new(self.envd_epoch, *replica_epoch),
epoch,
metrics.clone(),
Arc::clone(&self.dyncfg),
self.replica_tx.clone(),
);

// Take this opportunity to clean up the history we should present.
Expand All @@ -1035,7 +1026,7 @@ where
}

// Add replica to tracked state.
self.add_replica_state(id, client, config);
self.add_replica_state(id, client, config, epoch);

Ok(())
}
Expand Down Expand Up @@ -1104,7 +1095,7 @@ where
fn rehydrate_failed_replicas(&mut self) {
let replicas = self.replicas.iter();
let failed_replicas: Vec<_> = replicas
.filter_map(|(id, replica)| replica.failed.then_some(*id))
.filter_map(|(id, replica)| replica.client.is_failed().then_some(*id))
.collect();

for replica_id in failed_replicas {
Expand Down Expand Up @@ -1692,7 +1683,21 @@ where
drop(peek);
}

fn handle_response(&mut self, response: ComputeResponse<T>, replica_id: ReplicaId) {
/// Handles a response from a replica. Replica IDs are re-used across replica restarts, so we
/// use the replica incarnation to drop stale responses.
fn handle_response(&mut self, (replica_id, incarnation, response): ReplicaResponse<T>) {
// Filter responses from non-existing or stale replicas.
if self
.replicas
.get(&replica_id)
.filter(|replica| replica.epoch.replica() == incarnation)
.is_none()
{
return;
}

// Invariant: the replica exists and has the expected incarnation.

match response {
ComputeResponse::Frontiers(id, frontiers) => {
self.handle_frontiers_response(id, frontiers, replica_id);
Expand Down Expand Up @@ -2547,8 +2552,8 @@ struct ReplicaState<T: ComputeControllerTimestamp> {
introspection_tx: crossbeam_channel::Sender<IntrospectionUpdates>,
/// Per-replica collection state.
collections: BTreeMap<GlobalId, ReplicaCollectionState<T>>,
/// Whether the replica has failed and requires rehydration.
failed: bool,
/// The epoch of the replica.
epoch: ClusterStartupEpoch,
}

impl<T: ComputeControllerTimestamp> ReplicaState<T> {
Expand All @@ -2558,15 +2563,16 @@ impl<T: ComputeControllerTimestamp> ReplicaState<T> {
config: ReplicaConfig,
metrics: ReplicaMetrics,
introspection_tx: crossbeam_channel::Sender<IntrospectionUpdates>,
epoch: ClusterStartupEpoch,
) -> Self {
Self {
id,
client,
config,
metrics,
introspection_tx,
epoch,
collections: Default::default(),
failed: false,
}
}

Expand Down Expand Up @@ -2643,8 +2649,8 @@ impl<T: ComputeControllerTimestamp> ReplicaState<T> {
config: _,
metrics: _,
introspection_tx: _,
epoch,
collections,
failed,
} = self;

fn field(
Expand All @@ -2663,7 +2669,7 @@ impl<T: ComputeControllerTimestamp> ReplicaState<T> {
let map = serde_json::Map::from_iter([
field("id", id.to_string())?,
field("collections", collections)?,
field("failed", failed)?,
field("epoch", epoch)?,
]);
Ok(serde_json::Value::Object(map))
}
Expand Down
Loading

0 comments on commit 0bb2b32

Please sign in to comment.