From 0bb2b32071aab2ba9921db08bfd0920d8c732052 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 30 Sep 2024 22:07:41 +0200 Subject: [PATCH] Avoid FuturesUnordered in compute controller (#29725) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Avoid the use of FuturesUnordered in the compute controller for gathering responses from replicas. Instead, share a channel with each replica task. Fixes MaterializeInc/database-issues#8587 ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](https://github.com/MaterializeInc/cloud/pull/5021)). - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post. --------- Signed-off-by: Moritz Hoffmann --- src/cluster-client/build.rs | 3 +- src/cluster-client/src/client.rs | 8 +- src/compute-client/src/controller/instance.rs | 110 +++++++++--------- src/compute-client/src/controller/replica.rs | 41 +++---- src/compute-client/src/metrics.rs | 37 +++--- src/ore/src/channel.rs | 2 +- src/ore/src/metrics/delete_on_drop.rs | 2 +- test/cluster/mzcompose.py | 7 +- 8 files changed, 114 insertions(+), 96 deletions(-) diff --git a/src/cluster-client/build.rs b/src/cluster-client/build.rs index 960d95418466..bf682b82c5fe 100644 --- a/src/cluster-client/build.rs +++ b/src/cluster-client/build.rs @@ -11,7 +11,8 @@ fn main() { let mut config = prost_build::Config::new(); config .protoc_executable(mz_build_tools::protoc()) - .btree_map(["."]); + .btree_map(["."]) + .type_attribute(".", "#[allow(missing_docs)]"); tonic_build::configure() // Enabling `emit_rerun_if_changed` will rerun the build script when diff --git a/src/cluster-client/src/client.rs b/src/cluster-client/src/client.rs index 1e4318ef1662..23ef25ab5d9b 100644 --- a/src/cluster-client/src/client.rs +++ b/src/cluster-client/src/client.rs @@ -7,8 +7,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -#![allow(missing_docs)] - //! Types for commands to clusters. use std::num::NonZeroI64; @@ -32,7 +30,9 @@ include!(concat!(env!("OUT_DIR"), "/mz_cluster_client.client.rs")); /// another in-memory and local to the current incarnation of environmentd) #[derive(PartialEq, Eq, Debug, Copy, Clone, Serialize, Deserialize)] pub struct ClusterStartupEpoch { + /// The environment incarnation. envd: NonZeroI64, + /// The replica incarnation. replica: u64, } @@ -76,6 +76,7 @@ impl Arbitrary for ClusterStartupEpoch { } impl ClusterStartupEpoch { + /// Construct a new cluster startup epoch, from the environment epoch and replica incarnation. pub fn new(envd: NonZeroI64, replica: u64) -> Self { Self { envd, replica } } @@ -100,10 +101,12 @@ impl ClusterStartupEpoch { } } + /// The environment epoch. pub fn envd(&self) -> NonZeroI64 { self.envd } + /// The replica incarnation. pub fn replica(&self) -> u64 { self.replica } @@ -174,6 +177,7 @@ impl RustType for TimelyConfig { } impl TimelyConfig { + /// Split the timely configuration into `parts` pieces, each with a different `process` number. pub fn split_command(&self, parts: usize) -> Vec { (0..parts) .map(|part| TimelyConfig { diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index 76fb46d7956b..912cb5346909 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -15,8 +15,6 @@ use std::num::NonZeroI64; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use futures::stream::FuturesUnordered; -use futures::{future, StreamExt}; use mz_build_info::BuildInfo; use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig}; use mz_cluster_client::WallclockLagFn; @@ -32,6 +30,7 @@ use mz_controller_types::dyncfgs::WALLCLOCK_LAG_REFRESH_INTERVAL; use mz_dyncfg::ConfigSet; use mz_expr::RowSetFinishing; use mz_ore::cast::CastFrom; +use mz_ore::channel::instrumented_unbounded_channel; use mz_ore::now::NowFn; use mz_ore::tracing::OpenTelemetryContext; use mz_ore::{soft_assert_or_log, soft_panic_or_log}; @@ -58,6 +57,7 @@ use crate::controller::{ StorageCollections, }; use crate::logging::LogVariant; +use crate::metrics::IntCounter; use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge}; use crate::protocol::command::{ ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget, @@ -187,6 +187,10 @@ where } } +/// A response from a replica, composed of a replica ID, the replica's current epoch, and the +/// compute response itself. +pub(super) type ReplicaResponse = (ReplicaId, u64, ComputeResponse); + /// The state we keep for a compute instance. pub(super) struct Instance { /// Build info for spawning replicas @@ -277,6 +281,10 @@ pub(super) struct Instance { /// /// Received updates are applied by [`Instance::apply_read_hold_changes`]. read_holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch)>, + /// A sender for responses from replicas. + replica_tx: mz_ore::channel::InstrumentedUnboundedSender, IntCounter>, + /// A receiver for responses from replicas. + replica_rx: mz_ore::channel::InstrumentedUnboundedReceiver, IntCounter>, } impl Instance { @@ -389,12 +397,19 @@ impl Instance { id: ReplicaId, client: ReplicaClient, config: ReplicaConfig, + epoch: ClusterStartupEpoch, ) { let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect(); let metrics = self.metrics.for_replica(id); - let mut replica = - ReplicaState::new(id, client, config, metrics, self.introspection_tx.clone()); + let mut replica = ReplicaState::new( + id, + client, + config, + metrics, + self.introspection_tx.clone(), + epoch, + ); // Add per-replica collection state. for (collection_id, collection) in &self.collections { @@ -750,6 +765,8 @@ impl Instance { wallclock_lag_last_refresh, read_holds_tx: _, read_holds_rx: _, + replica_tx: _, + replica_rx: _, } = self; fn field( @@ -834,6 +851,10 @@ where let history = ComputeCommandHistory::new(metrics.for_history()); + let send_count = metrics.response_send_count.clone(); + let recv_count = metrics.response_recv_count.clone(); + let (replica_tx, replica_rx) = instrumented_unbounded_channel(send_count, recv_count); + Self { build_info, storage_collections: storage, @@ -858,6 +879,8 @@ where wallclock_lag_last_refresh: Instant::now(), read_holds_tx, read_holds_rx, + replica_tx, + replica_rx, } } @@ -878,8 +901,9 @@ where Some(cmd) => cmd(&mut self), None => break, }, - (replica_id, response) = Self::recv(&mut self.replicas) => { - self.handle_response(response, replica_id); + response = self.replica_rx.recv() => match response { + Some(response) => self.handle_response(response), + None => unreachable!("self owns a sender side of the channel"), } } } @@ -956,43 +980,8 @@ where // Clone the command for each active replica. for replica in self.replicas.values_mut() { - // If sending the command fails, the replica requires rehydration. - if replica.client.send(cmd.clone()).is_err() { - replica.failed = true; - } - } - } - - /// Receives the next response from the given replicas. - /// - /// This method is cancellation safe. - async fn recv( - replicas: &mut BTreeMap>, - ) -> (ReplicaId, ComputeResponse) { - loop { - let live_replicas = replicas.iter_mut().filter(|(_, r)| !r.failed); - let response = live_replicas - .map(|(id, replica)| async { (*id, replica.client.recv().await) }) - .collect::>() - .next() - .await; - - match response { - None => { - // There are no live replicas left. - // Block forever to communicate that no response is ready. - future::pending().await - } - Some((replica_id, None)) => { - // A replica has failed and requires rehydration. - let replica = replicas.get_mut(&replica_id).unwrap(); - replica.failed = true; - } - Some((replica_id, Some(response))) => { - // A replica has produced a response. Return it. - return (replica_id, response); - } - } + // Swallow error, we'll notice because the replica task has stopped. + let _ = replica.client.send(cmd.clone()); } } @@ -1012,13 +1001,15 @@ where let replica_epoch = self.replica_epochs.entry(id).or_default(); *replica_epoch += 1; let metrics = self.metrics.for_replica(id); + let epoch = ClusterStartupEpoch::new(self.envd_epoch, *replica_epoch); let client = ReplicaClient::spawn( id, self.build_info, config.clone(), - ClusterStartupEpoch::new(self.envd_epoch, *replica_epoch), + epoch, metrics.clone(), Arc::clone(&self.dyncfg), + self.replica_tx.clone(), ); // Take this opportunity to clean up the history we should present. @@ -1035,7 +1026,7 @@ where } // Add replica to tracked state. - self.add_replica_state(id, client, config); + self.add_replica_state(id, client, config, epoch); Ok(()) } @@ -1104,7 +1095,7 @@ where fn rehydrate_failed_replicas(&mut self) { let replicas = self.replicas.iter(); let failed_replicas: Vec<_> = replicas - .filter_map(|(id, replica)| replica.failed.then_some(*id)) + .filter_map(|(id, replica)| replica.client.is_failed().then_some(*id)) .collect(); for replica_id in failed_replicas { @@ -1692,7 +1683,21 @@ where drop(peek); } - fn handle_response(&mut self, response: ComputeResponse, replica_id: ReplicaId) { + /// Handles a response from a replica. Replica IDs are re-used across replica restarts, so we + /// use the replica incarnation to drop stale responses. + fn handle_response(&mut self, (replica_id, incarnation, response): ReplicaResponse) { + // Filter responses from non-existing or stale replicas. + if self + .replicas + .get(&replica_id) + .filter(|replica| replica.epoch.replica() == incarnation) + .is_none() + { + return; + } + + // Invariant: the replica exists and has the expected incarnation. + match response { ComputeResponse::Frontiers(id, frontiers) => { self.handle_frontiers_response(id, frontiers, replica_id); @@ -2547,8 +2552,8 @@ struct ReplicaState { introspection_tx: crossbeam_channel::Sender, /// Per-replica collection state. collections: BTreeMap>, - /// Whether the replica has failed and requires rehydration. - failed: bool, + /// The epoch of the replica. + epoch: ClusterStartupEpoch, } impl ReplicaState { @@ -2558,6 +2563,7 @@ impl ReplicaState { config: ReplicaConfig, metrics: ReplicaMetrics, introspection_tx: crossbeam_channel::Sender, + epoch: ClusterStartupEpoch, ) -> Self { Self { id, @@ -2565,8 +2571,8 @@ impl ReplicaState { config, metrics, introspection_tx, + epoch, collections: Default::default(), - failed: false, } } @@ -2643,8 +2649,8 @@ impl ReplicaState { config: _, metrics: _, introspection_tx: _, + epoch, collections, - failed, } = self; fn field( @@ -2663,7 +2669,7 @@ impl ReplicaState { let map = serde_json::Map::from_iter([ field("id", id.to_string())?, field("collections", collections)?, - field("failed", failed)?, + field("epoch", epoch)?, ]); Ok(serde_json::Value::Object(map)) } diff --git a/src/compute-client/src/controller/replica.rs b/src/compute-client/src/controller/replica.rs index 90558a9ecfdd..d3e1494f92bd 100644 --- a/src/compute-client/src/controller/replica.rs +++ b/src/compute-client/src/controller/replica.rs @@ -16,6 +16,7 @@ use anyhow::bail; use mz_build_info::BuildInfo; use mz_cluster_client::client::{ClusterReplicaLocation, ClusterStartupEpoch, TimelyConfig}; use mz_dyncfg::ConfigSet; +use mz_ore::channel::InstrumentedUnboundedSender; use mz_ore::retry::Retry; use mz_ore::task::AbortOnDropHandle; use mz_service::client::GenericClient; @@ -25,9 +26,11 @@ use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tracing::{debug, info, trace, warn}; +use crate::controller::instance::ReplicaResponse; use crate::controller::sequential_hydration::SequentialHydration; use crate::controller::{ComputeControllerTimestamp, ReplicaId}; use crate::logging::LoggingConfig; +use crate::metrics::IntCounter; use crate::metrics::ReplicaMetrics; use crate::protocol::command::{ComputeCommand, InstanceConfig}; use crate::protocol::response::ComputeResponse; @@ -48,17 +51,11 @@ pub(super) struct ReplicaConfig { #[derive(Debug)] pub(super) struct ReplicaClient { /// A sender for commands for the replica. - /// - /// If sending to this channel fails, the replica has failed and requires - /// rehydration. command_tx: UnboundedSender>, - /// A receiver for responses from the replica. - /// - /// If receiving from the channel returns `None`, the replica has failed - /// and requires rehydration. - response_rx: UnboundedReceiver>, /// A handle to the task that aborts it when the replica is dropped. - _task: AbortOnDropHandle<()>, + /// + /// If the task is finished, the replica has failed and needs rehydration. + task: AbortOnDropHandle<()>, /// Replica metrics. metrics: ReplicaMetrics, } @@ -75,12 +72,12 @@ where epoch: ClusterStartupEpoch, metrics: ReplicaMetrics, dyncfg: Arc, + response_tx: InstrumentedUnboundedSender, IntCounter>, ) -> Self { // Launch a task to handle communication with the replica // asynchronously. This isolates the main controller thread from // the replica. let (command_tx, command_rx) = unbounded_channel(); - let (response_tx, response_rx) = unbounded_channel(); let task = mz_ore::task::spawn( || format!("active-replication-replica-{id}"), @@ -99,12 +96,13 @@ where Self { command_tx, - response_rx, - _task: task.abort_on_drop(), + task: task.abort_on_drop(), metrics, } } +} +impl ReplicaClient { /// Sends a command to this replica. pub(super) fn send( &self, @@ -116,14 +114,9 @@ where }) } - /// Receives the next response from this replica. - /// - /// This method is cancellation safe. - pub(super) async fn recv(&mut self) -> Option> { - self.response_rx.recv().await.map(|r| { - self.metrics.inner.response_queue_size.dec(); - r - }) + /// Determine if the replica task has failed. + pub(super) fn is_failed(&self) -> bool { + self.task.is_finished() } } @@ -138,7 +131,7 @@ struct ReplicaTask { /// A channel upon which commands intended for the replica are delivered. command_rx: UnboundedReceiver>, /// A channel upon which responses from the replica are delivered. - response_tx: UnboundedSender>, + response_tx: InstrumentedUnboundedSender, IntCounter>, /// A number (technically, pair of numbers) identifying this incarnation of the replica. /// The semantics of this don't matter, except that it must strictly increase. epoch: ClusterStartupEpoch, @@ -223,6 +216,8 @@ where T: ComputeControllerTimestamp, ComputeGrpcClient: ComputeClient, { + let id = self.replica_id; + let incarnation = self.epoch.replica(); loop { select! { // Command from controller to forward to replica. @@ -244,7 +239,7 @@ where self.observe_response(&response); - if self.response_tx.send(response).is_err() { + if self.response_tx.send((id, incarnation, response)).is_err() { // Controller is no longer interested in this replica. Shut down. break; } @@ -303,7 +298,5 @@ where response = ?response, "received response from replica", ); - - self.metrics.inner.response_queue_size.inc(); } } diff --git a/src/compute-client/src/metrics.rs b/src/compute-client/src/metrics.rs index 4bce8cbc6f06..77ea4a20abd8 100644 --- a/src/compute-client/src/metrics.rs +++ b/src/compute-client/src/metrics.rs @@ -31,7 +31,7 @@ use crate::protocol::command::{ComputeCommand, ProtoComputeCommand}; use crate::protocol::response::{PeekResponse, ProtoComputeResponse}; type Counter = DeleteOnDropCounter<'static, AtomicF64, Vec>; -type IntCounter = DeleteOnDropCounter<'static, AtomicU64, Vec>; +pub(crate) type IntCounter = DeleteOnDropCounter<'static, AtomicU64, Vec>; type Gauge = DeleteOnDropGauge<'static, AtomicF64, Vec>; /// TODO(database-issues#7533): Add documentation. pub type UIntGauge = DeleteOnDropGauge<'static, AtomicU64, Vec>; @@ -54,7 +54,8 @@ pub struct ComputeControllerMetrics { subscribe_count: UIntGaugeVec, copy_to_count: UIntGaugeVec, command_queue_size: UIntGaugeVec, - response_queue_size: UIntGaugeVec, + response_send_count: IntCounterVec, + response_recv_count: IntCounterVec, hydration_queue_size: UIntGaugeVec, // command history @@ -131,10 +132,15 @@ impl ComputeControllerMetrics { help: "The size of the compute command queue.", var_labels: ["instance_id", "replica_id"], )), - response_queue_size: metrics_registry.register(metric!( - name: "mz_compute_controller_response_queue_size", - help: "The size of the compute response queue.", - var_labels: ["instance_id", "replica_id"], + response_send_count: metrics_registry.register(metric!( + name: "mz_compute_controller_response_send_count", + help: "The number of sends on the compute response queue.", + var_labels: ["instance_id"], + )), + response_recv_count: metrics_registry.register(metric!( + name: "mz_compute_controller_response_recv_count", + help: "The number of receives on the compute response queue.", + var_labels: ["instance_id"], )), hydration_queue_size: metrics_registry.register(metric!( name: "mz_compute_controller_hydration_queue_size", @@ -220,6 +226,12 @@ impl ComputeControllerMetrics { let labels = labels.iter().cloned().chain([typ.into()]).collect(); self.peek_duration_seconds.get_delete_on_drop_metric(labels) }); + let response_send_count = self + .response_send_count + .get_delete_on_drop_metric(labels.clone()); + let response_recv_count = self + .response_recv_count + .get_delete_on_drop_metric(labels.clone()); InstanceMetrics { instance_id, @@ -234,6 +246,8 @@ impl ComputeControllerMetrics { history_dataflow_count, peeks_total, peek_duration_seconds, + response_send_count, + response_recv_count, } } } @@ -264,6 +278,10 @@ pub struct InstanceMetrics { pub peeks_total: PeekMetrics, /// Histogram tracking peek durations. pub peek_duration_seconds: PeekMetrics, + /// Gauge tracking the number of sends on the compute response queue. + pub response_send_count: IntCounter, + /// Gauge tracking the number of receives on the compute response queue. + pub response_recv_count: IntCounter, } impl InstanceMetrics { @@ -307,10 +325,6 @@ impl InstanceMetrics { .metrics .command_queue_size .get_delete_on_drop_metric(labels.clone()); - let response_queue_size = self - .metrics - .response_queue_size - .get_delete_on_drop_metric(labels.clone()); let hydration_queue_size = self .metrics .hydration_queue_size @@ -326,7 +340,6 @@ impl InstanceMetrics { responses_total, response_message_bytes_total, command_queue_size, - response_queue_size, hydration_queue_size, }), } @@ -382,8 +395,6 @@ pub struct ReplicaMetricsInner { /// Gauge tracking the size of the compute command queue. pub command_queue_size: UIntGauge, - /// Gauge tracking the size of the compute response queue. - pub response_queue_size: UIntGauge, /// Gauge tracking the size of the hydration queue. pub hydration_queue_size: UIntGauge, } diff --git a/src/ore/src/channel.rs b/src/ore/src/channel.rs index 240b61b722d7..7359111135fd 100644 --- a/src/ore/src/channel.rs +++ b/src/ore/src/channel.rs @@ -72,7 +72,7 @@ where /// A wrapper around tokio's `UnboundedSender` that increments a metric when a send occurs. /// /// The metric is not dropped until this sender is dropped. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct InstrumentedUnboundedSender { tx: UnboundedSender, metric: M, diff --git a/src/ore/src/metrics/delete_on_drop.rs b/src/ore/src/metrics/delete_on_drop.rs index e8cabd702895..df7e8110f9d3 100644 --- a/src/ore/src/metrics/delete_on_drop.rs +++ b/src/ore/src/metrics/delete_on_drop.rs @@ -182,7 +182,7 @@ impl<'a> PromLabelsExt<'a> for BTreeMap<&'a str, &'a str> { /// NOTE: This type implements [`Borrow`], which imposes some constraints on implementers. To /// ensure these constraints, do *not* implement any of the `Eq`, `Ord`, or `Hash` traits on this. /// type. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DeleteOnDropMetric<'a, V, L> where V: MetricVec_, diff --git a/test/cluster/mzcompose.py b/test/cluster/mzcompose.py index a2ab82a6ec89..30de35c71f99 100644 --- a/test/cluster/mzcompose.py +++ b/test/cluster/mzcompose.py @@ -2765,8 +2765,11 @@ def fetch_metrics() -> Metrics: assert count > 0, f"got {count}" count = metrics.get_value("mz_compute_controller_command_queue_size") assert count < 10, f"got {count}" - count = metrics.get_value("mz_compute_controller_response_queue_size") - assert count < 10, f"got {count}" + send_count = metrics.get_value("mz_compute_controller_response_send_count") + assert send_count > 10, f"got {send_count}" + recv_count = metrics.get_value("mz_compute_controller_response_recv_count") + assert recv_count > 10, f"got {recv_count}" + assert send_count - recv_count < 10, f"got {send_count}, {recv_count}" count = metrics.get_value("mz_compute_controller_hydration_queue_size") assert count == 0, f"got {count}"