diff --git a/Cargo.lock b/Cargo.lock index 829b53c8ce..56d8d87ebb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4442,7 +4442,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -4662,9 +4662,9 @@ dependencies = [ [[package]] name = "metrics-exporter-prometheus" -version = "0.17.0" +version = "0.17.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df88858cd28baaaf2cfc894e37789ed4184be0e1351157aec7bf3c2266c793fd" +checksum = "2b166dea96003ee2531cf14833efedced545751d800f03535801d833313f8c15" dependencies = [ "base64 0.22.1", "hyper-util", @@ -4678,9 +4678,9 @@ dependencies = [ [[package]] name = "metrics-util" -version = "0.19.1" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8496cc523d1f94c1385dd8f0f0c2c480b2b8aeccb5b7e4485ad6365523ae376" +checksum = "fe8db7a05415d0f919ffb905afa37784f71901c9a773188876984b4f769ab986" dependencies = [ "aho-corasick", "crossbeam-epoch", diff --git a/Cargo.toml b/Cargo.toml index 80966d87b3..4cbdb990fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -163,7 +163,7 @@ metrics = { version = "0.24" } metrics-exporter-prometheus = { version = "0.17", default-features = false, features = [ "async-runtime", ] } -metrics-util = { version = "0.19.0" } +metrics-util = { version = "0.20.0" } moka = "0.12.5" num-traits = { version = "0.2.17" } object_store = { version = "0.12.2", features = ["aws"] } diff --git a/crates/core/src/metric_definitions.rs b/crates/core/src/metric_definitions.rs index f9b297c8ad..4f6a5e66ff 100644 --- a/crates/core/src/metric_definitions.rs +++ b/crates/core/src/metric_definitions.rs @@ -8,6 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +#![allow(unused)] + use metrics::{Unit, describe_counter}; // value of label `kind` in TC_SPAWN are defined in [`crate::TaskKind`]. diff --git a/crates/core/src/network/message_router.rs b/crates/core/src/network/message_router.rs index 62e7b9c893..4a108cd7fb 100644 --- a/crates/core/src/network/message_router.rs +++ b/crates/core/src/network/message_router.rs @@ -423,7 +423,6 @@ impl Default for ServiceReceiver { } } -#[derive(Clone)] struct ServiceSender { sender: mpsc::Sender, started: Arc, diff --git a/crates/core/src/task_center.rs b/crates/core/src/task_center.rs index 53b26a9205..fc5cb11f93 100644 --- a/crates/core/src/task_center.rs +++ b/crates/core/src/task_center.rs @@ -33,6 +33,7 @@ use std::time::Duration; use futures::FutureExt; use futures::future::BoxFuture; +#[cfg(debug_assertions)] use metrics::counter; use parking_lot::Mutex; use tokio::sync::oneshot; @@ -42,7 +43,8 @@ use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; -use crate::metric_definitions::{self, STATUS_COMPLETED, STATUS_FAILED, TC_FINISHED, TC_SPAWN}; +#[cfg(debug_assertions)] +use crate::metric_definitions::{STATUS_COMPLETED, STATUS_FAILED, TC_FINISHED, TC_SPAWN}; use crate::{Metadata, ShutdownError, ShutdownSourceErr}; use restate_types::SharedString; use restate_types::cluster_state::ClusterState; @@ -350,7 +352,7 @@ impl TaskCenterInner { // partition processor runtimes #[cfg(any(test, feature = "test-util"))] pause_time: bool, ) -> Self { - metric_definitions::describe_metrics(); + crate::metric_definitions::describe_metrics(); let root_task_context = TaskContext { id: TaskId::ROOT, name: "::".into(), @@ -899,10 +901,13 @@ impl TaskCenterInner { F: Future + Send + 'static, T: Send + 'static, { - let kind_str: &'static str = kind.into(); - let runtime_name: &'static str = kind.runtime().into(); let tokio_task = tokio::task::Builder::new().name(name); - counter!(TC_SPAWN, "kind" => kind_str, "runtime" => runtime_name).increment(1); + #[cfg(debug_assertions)] + { + let kind_str: &'static str = kind.into(); + let runtime_name: &'static str = kind.runtime().into(); + counter!(TC_SPAWN, "kind" => kind_str, "runtime" => runtime_name).increment(1); + } let runtime = match kind.runtime() { crate::AsyncRuntime::Inherit => &tokio::runtime::Handle::current(), crate::AsyncRuntime::Default => &self.default_runtime_handle, @@ -930,6 +935,7 @@ impl TaskCenterInner { // This can happen if the task ownership was taken by calling take_task(id); return; }; + #[cfg(debug_assertions)] let kind_str: &'static str = task.kind().into(); let should_shutdown_on_error = task.kind().should_shutdown_on_error(); @@ -938,6 +944,7 @@ impl TaskCenterInner { match result { Ok(Ok(())) => { trace!(kind = ?task.kind(), name = ?task.name(), "Task {} exited normally", task_id); + #[cfg(debug_assertions)] counter!(TC_FINISHED, "kind" => kind_str, "status" => STATUS_COMPLETED) .increment(1); } @@ -961,10 +968,12 @@ impl TaskCenterInner { } else { error!(kind = ?task.kind(), name = ?task.name(), "Task {} failed with: {:?}", task_id, err); } + #[cfg(debug_assertions)] counter!(TC_FINISHED, "kind" => kind_str, "status" => STATUS_FAILED) .increment(1); } Err(err) => { + #[cfg(debug_assertions)] counter!(TC_FINISHED, "kind" => kind_str, "status" => STATUS_FAILED) .increment(1); if should_shutdown_on_error { diff --git a/crates/core/src/task_center/monitoring.rs b/crates/core/src/task_center/monitoring.rs index 7b2dde23ec..aa6528601a 100644 --- a/crates/core/src/task_center/monitoring.rs +++ b/crates/core/src/task_center/monitoring.rs @@ -10,7 +10,7 @@ use std::time::Duration; -use metrics::gauge; +use metrics::{counter, gauge}; use tokio::runtime::RuntimeMetrics; use restate_types::SharedString; @@ -60,35 +60,45 @@ impl TaskCenterMonitoring for Handle { } fn submit_runtime_metrics(runtime: impl Into, stats: RuntimeMetrics) { - let runtime = runtime.into(); - gauge!("restate.tokio.num_workers", "runtime" => runtime.clone()) - .set(stats.num_workers() as f64); - gauge!("restate.tokio.blocking_threads", "runtime" => runtime.clone()) - .set(stats.num_blocking_threads() as f64); - gauge!("restate.tokio.blocking_queue_depth", "runtime" => runtime.clone()) - .set(stats.blocking_queue_depth() as f64); - gauge!("restate.tokio.num_alive_tasks", "runtime" => runtime.clone()) - .set(stats.num_alive_tasks() as f64); - gauge!("restate.tokio.io_driver_ready_count", "runtime" => runtime.clone()) + let runtime: SharedString = runtime.into(); + #[cfg(debug_assertions)] + let labels = [("runtime", runtime.clone())]; + #[cfg(debug_assertions)] + gauge!("restate.tokio.num_workers", &labels).set(stats.num_workers() as f64); + #[cfg(debug_assertions)] + gauge!("restate.tokio.blocking_threads", &labels).set(stats.num_blocking_threads() as f64); + #[cfg(debug_assertions)] + gauge!("restate.tokio.blocking_queue_depth", &labels).set(stats.blocking_queue_depth() as f64); + #[cfg(debug_assertions)] + gauge!("restate.tokio.num_alive_tasks", &labels).set(stats.num_alive_tasks() as f64); + #[cfg(debug_assertions)] + gauge!("restate.tokio.io_driver_ready_count", &labels) .set(stats.io_driver_ready_count() as f64); - gauge!("restate.tokio.remote_schedule_count", "runtime" => runtime.clone()) - .set(stats.remote_schedule_count() as f64); + #[cfg(debug_assertions)] + counter!("restate.tokio.remote_schedule_count", &labels) + .absolute(stats.remote_schedule_count()); // per worker stats for idx in 0..stats.num_workers() { - gauge!("restate.tokio.worker_overflow_count", "runtime" => runtime.clone(), "worker" => - idx.to_string()) - .set(stats.worker_overflow_count(idx) as f64); - gauge!("restate.tokio.worker_poll_count", "runtime" => runtime.clone(), "worker" => idx.to_string()) - .set(stats.worker_poll_count(idx) as f64); - gauge!("restate.tokio.worker_park_count", "runtime" => runtime.clone(), "worker" => idx.to_string()) - .set(stats.worker_park_count(idx) as f64); - gauge!("restate.tokio.worker_noop_count", "runtime" => runtime.clone(), "worker" => idx.to_string()) - .set(stats.worker_noop_count(idx) as f64); - gauge!("restate.tokio.worker_steal_count", "runtime" => runtime.clone(), "worker" => idx.to_string()) - .set(stats.worker_steal_count(idx) as f64); - gauge!("restate.tokio.worker_total_busy_duration_seconds", "runtime" => runtime.clone(), "worker" => idx.to_string()) + let labels = [ + ("runtime", runtime.clone()), + ("worker", idx.to_string().into()), + ]; + #[cfg(debug_assertions)] + counter!("restate.tokio.worker_overflow_count", &labels) + .absolute(stats.worker_overflow_count(idx)); + #[cfg(debug_assertions)] + counter!("restate.tokio.worker_park_count", &labels).absolute(stats.worker_park_count(idx)); + #[cfg(debug_assertions)] + counter!("restate.tokio.worker_noop_count", &labels).absolute(stats.worker_noop_count(idx)); + #[cfg(debug_assertions)] + counter!("restate.tokio.worker_steal_count", &labels) + .absolute(stats.worker_steal_count(idx)); + #[cfg(debug_assertions)] + gauge!("restate.tokio.worker_total_busy_duration_seconds", &labels) .set(stats.worker_total_busy_duration(idx).as_secs_f64()); - gauge!("restate.tokio.worker_mean_poll_time", "runtime" => runtime.clone(), "worker" => idx.to_string()) + // Main metrics we want in non-debug mode + counter!("restate.tokio.worker_poll_count", &labels).absolute(stats.worker_poll_count(idx)); + gauge!("restate.tokio.worker_mean_poll_time", &labels) .set(stats.worker_mean_poll_time(idx).as_secs_f64()); } } diff --git a/crates/ingress-http/src/handler/service_handler.rs b/crates/ingress-http/src/handler/service_handler.rs index 1831136994..3c847590e9 100644 --- a/crates/ingress-http/src/handler/service_handler.rs +++ b/crates/ingress-http/src/handler/service_handler.rs @@ -227,7 +227,6 @@ where histogram!( INGRESS_REQUEST_DURATION, "rpc.service" => service_name.clone(), - "rpc.method" => handler_name.clone(), ) .record(start_time.elapsed()); @@ -235,7 +234,6 @@ where INGRESS_REQUESTS, "status" => REQUEST_COMPLETED, "rpc.service" => service_name, - "rpc.method" => handler_name, ) .increment(1); result diff --git a/crates/ingress-http/src/layers/load_shed.rs b/crates/ingress-http/src/layers/load_shed.rs index 61b1809dae..aa2900582f 100644 --- a/crates/ingress-http/src/layers/load_shed.rs +++ b/crates/ingress-http/src/layers/load_shed.rs @@ -8,7 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::metric_definitions::{INGRESS_REQUESTS, REQUEST_ADMITTED, REQUEST_DENIED_THROTTLE}; +use crate::metric_definitions::{INGRESS_REQUESTS, REQUEST_ADMITTED, REQUEST_RATE_LIMITED}; use futures::ready; use http::{Request, Response, StatusCode}; use metrics::counter; @@ -76,7 +76,7 @@ where warn!("No available quota to process the request"); // Register request denied - counter!(INGRESS_REQUESTS, "status" => REQUEST_DENIED_THROTTLE).increment(1); + counter!(INGRESS_REQUESTS, "status" => REQUEST_RATE_LIMITED).increment(1); return ResponseFuture { state: ResponseState::Overloaded, diff --git a/crates/ingress-http/src/metric_definitions.rs b/crates/ingress-http/src/metric_definitions.rs index a51688aab7..faa17f69e8 100644 --- a/crates/ingress-http/src/metric_definitions.rs +++ b/crates/ingress-http/src/metric_definitions.rs @@ -16,7 +16,7 @@ pub const INGRESS_REQUESTS: &str = "restate.ingress.requests.total"; // values of label `status` in INGRESS_REQUEST pub const REQUEST_ADMITTED: &str = "admitted"; pub const REQUEST_COMPLETED: &str = "completed"; -pub const REQUEST_DENIED_THROTTLE: &str = "throttled"; +pub const REQUEST_RATE_LIMITED: &str = "rate-limited"; pub const INGRESS_REQUEST_DURATION: &str = "restate.ingress.request_duration.seconds"; diff --git a/crates/metadata-store/src/metadata_store.rs b/crates/metadata-store/src/metadata_store.rs index 3f72a968f2..c8f7fa0635 100644 --- a/crates/metadata-store/src/metadata_store.rs +++ b/crates/metadata-store/src/metadata_store.rs @@ -284,7 +284,6 @@ impl MetadataStoreClient { key: ByteString, ) -> Result, ReadError> { let start_time = Instant::now(); - let key_str = key.to_string(); let result = { let value = self.inner.get(key).await?; @@ -311,7 +310,7 @@ impl MetadataStoreClient { }; histogram!(METADATA_CLIENT_GET_DURATION).record(start_time.elapsed()); - counter!(METADATA_CLIENT_GET_TOTAL, "key" => key_str, "status" => status).increment(1); + counter!(METADATA_CLIENT_GET_TOTAL, "status" => status).increment(1); result } @@ -320,7 +319,6 @@ impl MetadataStoreClient { /// [`None`]. pub async fn get_version(&self, key: ByteString) -> Result, ReadError> { let start_time = Instant::now(); - let key_str = key.to_string(); let result = self.inner.get_version(key).await; let status = if result.is_ok() { @@ -330,8 +328,7 @@ impl MetadataStoreClient { }; histogram!(METADATA_CLIENT_GET_VERSION_DURATION).record(start_time.elapsed()); - counter!(METADATA_CLIENT_GET_VERSION_TOTAL, "key" => key_str, "status" => status) - .increment(1); + counter!(METADATA_CLIENT_GET_VERSION_TOTAL, "status" => status).increment(1); result } @@ -348,7 +345,6 @@ impl MetadataStoreClient { T: Versioned + StorageEncode, { let start_time = Instant::now(); - let key_str = key.to_string(); let result = { let versioned_value = serialize_value(value).map_err(|err| WriteError::Codec(err.into()))?; @@ -363,7 +359,7 @@ impl MetadataStoreClient { }; histogram!(METADATA_CLIENT_PUT_DURATION).record(start_time.elapsed()); - counter!(METADATA_CLIENT_PUT_TOTAL, "key" => key_str, "status" => status).increment(1); + counter!(METADATA_CLIENT_PUT_TOTAL, "status" => status).increment(1); result } @@ -383,7 +379,6 @@ impl MetadataStoreClient { precondition: Precondition, ) -> Result<(), WriteError> { let start_time = Instant::now(); - let key_str = key.to_string(); let result = self.inner.delete(key, precondition).await; let status = if result.is_ok() { @@ -394,7 +389,8 @@ impl MetadataStoreClient { histogram!(crate::metric_definitions::METADATA_CLIENT_DELETE_DURATION) .record(start_time.elapsed()); - counter!(crate::metric_definitions::METADATA_CLIENT_DELETE_TOTAL,"key" => key_str, "status" => status).increment(1); + counter!(crate::metric_definitions::METADATA_CLIENT_DELETE_TOTAL, "status" => status) + .increment(1); result } diff --git a/crates/partition-store/src/owned_iter.rs b/crates/partition-store/src/owned_iter.rs index 08867ce85d..5b8d6bfa85 100644 --- a/crates/partition-store/src/owned_iter.rs +++ b/crates/partition-store/src/owned_iter.rs @@ -20,7 +20,7 @@ impl<'a, DB: DBAccess> OwnedIterator<'a, DB> { pub(crate) fn new(iter: DBRawIteratorWithThreadMode<'a, DB>) -> Self { Self { iter, - arena: BytesMut::with_capacity(8196), + arena: BytesMut::new(), } } } @@ -30,9 +30,8 @@ impl Iterator for OwnedIterator<'_, DB> { #[inline] fn next(&mut self) -> Option { - self.arena.reserve(8192); - if let Some((k, v)) = self.iter.item() { + self.arena.reserve(k.len() + v.len()); self.arena.put_slice(k); let key = self.arena.split().freeze(); self.arena.put_slice(v); diff --git a/crates/rocksdb/src/background.rs b/crates/rocksdb/src/background.rs index db37535c20..a431fb552c 100644 --- a/crates/rocksdb/src/background.rs +++ b/crates/rocksdb/src/background.rs @@ -14,9 +14,7 @@ use derive_builder::Builder; use metrics::{gauge, histogram}; use tokio::sync::oneshot; -use crate::metric_definitions::{ - STORAGE_BG_TASK_RUN_DURATION, STORAGE_BG_TASK_TOTAL_DURATION, STORAGE_BG_TASK_WAIT_DURATION, -}; +use crate::metric_definitions::{STORAGE_BG_TASK_RUN_DURATION, STORAGE_BG_TASK_WAIT_DURATION}; use crate::{OP_TYPE, PRIORITY, Priority, STORAGE_BG_TASK_IN_FLIGHT}; #[derive(Debug, Clone, Copy, PartialEq, Eq, strum::IntoStaticStr)] @@ -89,23 +87,10 @@ where fn run(self) -> R { let start = Instant::now(); - histogram!( - STORAGE_BG_TASK_WAIT_DURATION, - PRIORITY => self.priority.as_static_str(), - OP_TYPE => self.kind.as_static_str(), - ) - .record(self.created_at.elapsed()); + let labels = [(OP_TYPE, self.kind.as_static_str())]; + histogram!(STORAGE_BG_TASK_WAIT_DURATION, &labels).record(self.created_at.elapsed()); let res = (self.op)(); - histogram!(STORAGE_BG_TASK_RUN_DURATION, - PRIORITY => self.priority.as_static_str(), - OP_TYPE => self.kind.as_static_str(), - ) - .record(start.elapsed()); - histogram!(STORAGE_BG_TASK_TOTAL_DURATION, - PRIORITY => self.priority.as_static_str(), - OP_TYPE => self.kind.as_static_str(), - ) - .record(self.created_at.elapsed()); + histogram!(STORAGE_BG_TASK_RUN_DURATION, &labels).record(start.elapsed()); gauge!(STORAGE_BG_TASK_IN_FLIGHT, PRIORITY => self.priority.as_static_str(), OP_TYPE => self.kind.as_static_str(), diff --git a/crates/rocksdb/src/metric_definitions.rs b/crates/rocksdb/src/metric_definitions.rs index 3e297c3952..503c539b52 100644 --- a/crates/rocksdb/src/metric_definitions.rs +++ b/crates/rocksdb/src/metric_definitions.rs @@ -18,9 +18,6 @@ pub const STORAGE_BG_TASK_WAIT_DURATION: &str = pub const STORAGE_BG_TASK_RUN_DURATION: &str = "restate.rocksdb_manager.bg_task_run_duration.seconds"; -pub const STORAGE_BG_TASK_TOTAL_DURATION: &str = - "restate.rocksdb_manager.bg_task_total_duration.seconds"; - // Perf guard metrics pub const BLOCK_READ_BYTES: &str = "restate.rocksdb.perf.block_read_bytes.total"; pub const BLOCK_READ_DURATION: &str = "restate.rocksdb.perf.block_read_duration.seconds"; @@ -103,12 +100,6 @@ pub fn describe_metrics() { "Run time of storage tasks, with 'priority' label" ); - describe_histogram!( - STORAGE_BG_TASK_TOTAL_DURATION, - Unit::Seconds, - "Total time to queue+run a storage task, with 'priority' label" - ); - describe_histogram!( WRITE_WAL_DURATION, Unit::Seconds, diff --git a/crates/rocksdb/src/perf.rs b/crates/rocksdb/src/perf.rs index d9e7ae7b6e..a138e6a47e 100644 --- a/crates/rocksdb/src/perf.rs +++ b/crates/rocksdb/src/perf.rs @@ -72,100 +72,65 @@ impl RocksDbPerfGuard { } fn report(&self) { + let labels = [(OP_NAME, self.name)]; // Note to future visitors of this code. RocksDb reports times in nanoseconds in this // API compared to microseconds in Statistics/Properties. Use n_to_s() to convert to // standard prometheus unit (second). - histogram!(TOTAL_DURATION, - OP_NAME => self.name, - ) - .record(self.start.elapsed()); + histogram!(TOTAL_DURATION, &labels).record(self.start.elapsed()); // iterators-related let v = self.context.metric(PerfMetric::NextOnMemtableCount); if v != 0 { - counter!(NEXT_ON_MEMTABLE, - OP_NAME => self.name, - ) - .increment(v) + counter!(NEXT_ON_MEMTABLE, &labels).increment(v) }; let v = self.context.metric(PerfMetric::SeekOnMemtableTime); if v != 0 { - histogram!(SEEK_ON_MEMTABLE, - OP_NAME => self.name, - ) - .record(n_to_s(v)); + histogram!(SEEK_ON_MEMTABLE, &labels).record(n_to_s(v)); } let v = self.context.metric(PerfMetric::FindNextUserEntryTime); if v != 0 { - histogram!(SEEK_ON_MEMTABLE, - OP_NAME => self.name, - ) - .record(n_to_s(v)); + histogram!(SEEK_ON_MEMTABLE, &labels).record(n_to_s(v)); } // read-related let v = self.context.metric(PerfMetric::BlockReadByte); if v != 0 { - counter!(BLOCK_READ_BYTES, - OP_NAME => self.name, - ) - .increment(v) + counter!(BLOCK_READ_BYTES, &labels).increment(v) }; let v = self.context.metric(PerfMetric::BlockReadTime); if v != 0 { - histogram!(BLOCK_READ_DURATION, - OP_NAME => self.name, - ) - .record(n_to_s(v)); + histogram!(BLOCK_READ_DURATION, &labels).record(n_to_s(v)); } let v = self.context.metric(PerfMetric::GetFromMemtableTime); if v != 0 { - histogram!(GET_FROM_MEMTABLE_DURATION, - OP_NAME => self.name, - ) - .record(n_to_s(v)); + histogram!(GET_FROM_MEMTABLE_DURATION, &labels).record(n_to_s(v)); } let v = self.context.metric(PerfMetric::BlockDecompressTime); if v != 0 { - histogram!(BLOCK_DECOMPRESS_DURATION, - OP_NAME => self.name, - ) - .record(n_to_s(v)); + histogram!(BLOCK_DECOMPRESS_DURATION, &labels).record(n_to_s(v)); } // write-related let v = self.context.metric(PerfMetric::WriteWalTime); if v != 0 { - histogram!(WRITE_WAL_DURATION, - OP_NAME => self.name, - ) - .record(n_to_s(v)); + histogram!(WRITE_WAL_DURATION, &labels).record(n_to_s(v)); } let v = self.context.metric(PerfMetric::WriteMemtableTime); if v != 0 { - histogram!(WRITE_MEMTABLE_DURATION, - OP_NAME => self.name, - ) - .record(n_to_s(v)); + histogram!(WRITE_MEMTABLE_DURATION, &labels).record(n_to_s(v)); } let v = self.context.metric(PerfMetric::WritePreAndPostProcessTime); if v != 0 { - histogram!(WRITE_PRE_AND_POST_DURATION, - OP_NAME => self.name, - ) - .record(n_to_s(v)); + histogram!(WRITE_PRE_AND_POST_DURATION, &labels).record(n_to_s(v)); } let v = self.context.metric(PerfMetric::WriteDelayTime); if v != 0 { - histogram!(WRITE_ARTIFICIAL_DELAY_DURATION, - OP_NAME => self.name, - ) - .record(n_to_s(v)); + histogram!(WRITE_ARTIFICIAL_DELAY_DURATION, &labels).record(n_to_s(v)); } } } diff --git a/crates/storage-api/src/protobuf_types.rs b/crates/storage-api/src/protobuf_types.rs index b196f0aafa..d07a6d54df 100644 --- a/crates/storage-api/src/protobuf_types.rs +++ b/crates/storage-api/src/protobuf_types.rs @@ -47,6 +47,10 @@ impl StorageEncode for ProtobufStorageWrapper { .map_err(|err| restate_types::storage::StorageEncodeError::EncodeValue(err.into())) } + fn estimated_encoded_len(&self) -> usize { + T::encoded_len(&self.0) + } + fn default_codec(&self) -> restate_types::storage::StorageCodecKind { restate_types::storage::StorageCodecKind::Protobuf } diff --git a/crates/tracing-instrumentation/src/prometheus_metrics.rs b/crates/tracing-instrumentation/src/prometheus_metrics.rs index e7b45c63b5..06b5ea17a5 100644 --- a/crates/tracing-instrumentation/src/prometheus_metrics.rs +++ b/crates/tracing-instrumentation/src/prometheus_metrics.rs @@ -8,15 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; -use metrics_util::MetricKindMask; - use metrics_exporter_prometheus::formatting; -use restate_types::config::CommonOptions; +use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; use tokio::task::AbortHandle; use tokio::time::MissedTickBehavior; use tracing::{debug, trace}; +use restate_types::config::CommonOptions; + #[derive(Default)] pub struct Prometheus { handle: Option, @@ -39,11 +38,6 @@ impl Prometheus { }; } let builder = PrometheusBuilder::default() - // Remove a metric from registry if it was not updated for that duration - .idle_timeout( - MetricKindMask::HISTOGRAM, - opts.histogram_inactivity_timeout.map(Into::into), - ) .add_global_label("cluster_name", opts.cluster_name()) .add_global_label("node_name", opts.node_name()) .set_quantiles(&[0.5, 0.9, 0.99, 1.0]) diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 1e38b92cc5..0f3baef156 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -192,15 +192,6 @@ pub struct CommonOptions { #[cfg_attr(feature = "schemars", schemars(with = "String"))] pub tokio_console_bind_address: Option, - /// Timeout for idle histograms. - /// - /// The duration after which a histogram is considered idle and will be removed from - /// metric responses to save memory. This value should be configured higher than the - /// scrape interval of the telemetry collection system (e.g. Prometheus). - #[serde(with = "serde_with::As::>")] - #[cfg_attr(feature = "schemars", schemars(with = "Option"))] - pub histogram_inactivity_timeout: Option, - #[serde(flatten)] pub service_client: ServiceClientOptions, @@ -444,7 +435,6 @@ impl Default for CommonOptions { advertised_address: AdvertisedAddress::from_str(DEFAULT_ADVERTISED_ADDRESS).unwrap(), default_num_partitions: 24, default_replication: ReplicationProperty::new_unchecked(1), - histogram_inactivity_timeout: None, disable_prometheus: false, service_client: Default::default(), shutdown_timeout: Duration::from_secs(60).into(), diff --git a/crates/types/src/net/codec.rs b/crates/types/src/net/codec.rs index 9611446d90..b528f23bdb 100644 --- a/crates/types/src/net/codec.rs +++ b/crates/types/src/net/codec.rs @@ -114,7 +114,7 @@ pub fn decode_as_flexbuffers( } pub fn encode_as_bilrost(value: &T) -> Bytes { - let buf = value.encode_fast(); + let buf = value.encode_contiguous(); Bytes::from(buf.into_vec()) } diff --git a/crates/types/src/storage.rs b/crates/types/src/storage.rs index 06390b356a..05b905a935 100644 --- a/crates/types/src/storage.rs +++ b/crates/types/src/storage.rs @@ -101,6 +101,7 @@ impl StorageCodec { value: &T, buf: &mut BytesMut, ) -> Result<(), StorageEncodeError> { + buf.reserve(value.estimated_encoded_len() + mem::size_of::()); // write codec buf.put_u8(value.default_codec().into()); // encode value @@ -143,6 +144,9 @@ pub trait StorageEncode: DowncastSync { /// Codec which is used when encode new values. fn default_codec(&self) -> StorageCodecKind; + + /// Returns an estimate for the size of the encoded value. + fn estimated_encoded_len(&self) -> usize; } impl_downcast!(sync StorageEncode); @@ -176,6 +180,10 @@ macro_rules! flexbuffers_storage_encode_decode { ) -> Result<(), $crate::storage::StorageEncodeError> { $crate::storage::encode::encode_serde(self, buf, self.default_codec()) } + + fn estimated_encoded_len(&self) -> usize { + $crate::storage::encode::estimate_encoded_serde_len(self, self.default_codec()) + } } impl $crate::storage::StorageDecode for $name { @@ -227,12 +235,7 @@ impl PolyBytes { pub fn estimated_encode_size(&self) -> usize { match self { PolyBytes::Bytes(bytes) => bytes.len(), - PolyBytes::Typed(_) => { - // constant, assumption based on base envelope size of ~600 bytes. - // todo: use StorageEncode trait to get an actual estimate based - // on the underlying type - 2_048 // 2KiB - } + PolyBytes::Typed(value) => value.estimated_encoded_len(), } } } @@ -251,6 +254,13 @@ impl StorageEncode for PolyBytes { fn default_codec(&self) -> StorageCodecKind { StorageCodecKind::FlexbuffersSerde } + + fn estimated_encoded_len(&self) -> usize { + match self { + PolyBytes::Bytes(bytes) => bytes.len(), + PolyBytes::Typed(typed) => typed.estimated_encoded_len(), + } + } } /// SerializeAs/DeserializeAs to implement ser/de trait for [`PolyBytes`] @@ -265,7 +275,6 @@ impl serde_with::SerializeAs for EncodedPolyBytes { match source { PolyBytes::Bytes(bytes) => serializer.serialize_bytes(bytes.as_ref()), PolyBytes::Typed(typed) => { - // todo: estimate size to avoid re allocations let mut buf = BytesMut::new(); StorageCodec::encode(&**typed, &mut buf).expect("record serde is infallible"); serializer.serialize_bytes(buf.as_ref()) @@ -292,6 +301,10 @@ impl StorageEncode for String { StorageCodecKind::LengthPrefixedRawBytes } + fn estimated_encoded_len(&self) -> usize { + self.len() + mem::size_of::() + } + fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { let my_bytes = self.as_bytes(); buf.put_u32_le(u32::try_from(my_bytes.len()).map_err(|_| { @@ -359,6 +372,10 @@ impl StorageEncode for bytes::Bytes { StorageCodecKind::LengthPrefixedRawBytes } + fn estimated_encoded_len(&self) -> usize { + self.len() + mem::size_of::() + } + fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { buf.put_u32_le(u32::try_from(self.len()).map_err(|_| { StorageEncodeError::EncodeValue( diff --git a/crates/types/src/storage/encode.rs b/crates/types/src/storage/encode.rs index a38c88c6ac..e51c78651d 100644 --- a/crates/types/src/storage/encode.rs +++ b/crates/types/src/storage/encode.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. use std::mem; +use bincode::enc::write::SizeWriter; use bytes::{BufMut, Bytes, BytesMut}; use serde::Serialize; @@ -33,6 +34,17 @@ pub fn encode_serde( } } +pub fn estimate_encoded_serde_len(value: &T, codec: StorageCodecKind) -> usize { + match codec { + // 2 KiB, completely arbitrary size since we don't have a way to estimate the size + // beforehand which is s a shame. + StorageCodecKind::FlexbuffersSerde => 2_048, + StorageCodecKind::BincodeSerde => estimate_bincode_len(value).unwrap_or(0), + StorageCodecKind::Json => estimate_json_serde_len(value).unwrap_or(0), + codec => unreachable!("Cannot encode serde type with codec {}", codec), + } +} + /// Utility method to encode a [`Serialize`] type as flexbuffers using serde. fn encode_serde_as_flexbuffers( value: T, @@ -50,6 +62,12 @@ fn encode_serde_as_flexbuffers( Ok(()) } +fn estimate_bincode_len(value: &T) -> Result { + let mut writer = SizeWriter::default(); + bincode::serde::encode_into_writer(value, &mut writer, bincode::config::standard())?; + Ok(writer.bytes_written) +} + /// Utility method to encode a [`Serialize`] type as bincode using serde. fn encode_serde_as_bincode( value: &T, @@ -69,6 +87,29 @@ fn encode_serde_as_bincode( Ok(()) } +fn estimate_json_serde_len(value: &T) -> Result { + #[derive(Default)] + struct SizeWriter { + bytes_written: usize, + } + + impl std::io::Write for SizeWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.bytes_written += buf.len(); + Ok(buf.len()) + } + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + } + + let mut writer = SizeWriter::default(); + + serde_json::to_writer(&mut writer, value)?; + + Ok(writer.bytes_written) +} + /// Utility method to encode a [`Serialize`] type as json using serde. fn encode_serde_as_json( value: &T, diff --git a/crates/wal-protocol/src/lib.rs b/crates/wal-protocol/src/lib.rs index 40dd070b7c..b4ca829542 100644 --- a/crates/wal-protocol/src/lib.rs +++ b/crates/wal-protocol/src/lib.rs @@ -250,7 +250,9 @@ mod envelope { use restate_storage_api::protobuf_types::v1 as protobuf; use restate_types::storage::decode::{decode_bilrost, decode_serde}; - use restate_types::storage::encode::{encode_bilrost, encode_serde}; + use restate_types::storage::encode::{ + encode_bilrost, encode_serde, estimate_encoded_serde_len, + }; use restate_types::storage::{ StorageCodecKind, StorageDecode, StorageDecodeError, StorageEncode, StorageEncodeError, }; @@ -261,7 +263,9 @@ mod envelope { fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { use bytes::BufMut; match self.default_codec() { - StorageCodecKind::FlexbuffersSerde => encode_serde(self, buf, self.default_codec()), + StorageCodecKind::FlexbuffersSerde => { + encode_serde(self, buf, StorageCodecKind::FlexbuffersSerde) + } StorageCodecKind::Custom => { buf.put_slice(&encode(self)?); Ok(()) @@ -270,6 +274,19 @@ mod envelope { } } + fn estimated_encoded_len(&self) -> usize { + match self.default_codec() { + StorageCodecKind::FlexbuffersSerde => { + restate_types::storage::encode::estimate_encoded_serde_len( + self, + StorageCodecKind::FlexbuffersSerde, + ) + } + StorageCodecKind::Custom => estimate_custom_encoding_len(self), + _ => unreachable!("developer error"), + } + } + fn default_codec(&self) -> StorageCodecKind { // TODO(azmy): Change to `Custom` in v1.5 StorageCodecKind::FlexbuffersSerde @@ -359,6 +376,29 @@ mod envelope { }) } + fn serde_encoded_len(value: &T, codec: StorageCodecKind) -> usize { + static EMPTY: Field = Field { + // note: the value of codec is irrelevant for this method, we assume that + // it takes a similar amount of space to encode all values of the StorageCodecKind + // enum. + codec: Some(StorageCodecKind::FlexbuffersSerde), + bytes: Bytes::new(), + }; + + let value_len = estimate_encoded_serde_len(value, codec); + EMPTY.encoded_len() + value_len + } + + fn bilrost_encoded_len(value: &T) -> usize { + static EMPTY: Field = Field { + codec: Some(StorageCodecKind::Bilrost), + bytes: Bytes::new(), + }; + + let value_len = bilrost::Message::encoded_len(value); + EMPTY.encoded_len() + value_len + } + fn encode_bilrost(value: &T) -> Result { Ok(Self { codec: Some(StorageCodecKind::Bilrost), @@ -366,6 +406,19 @@ mod envelope { }) } + fn protobuf_encoded_len(value: &T) -> usize { + static EMPTY: Field = Field { + // note: the value of codec is irrelevant for this method, we assume that + // it takes a similar amount of space to encode all values of the StorageCodecKind + // enum. + codec: Some(StorageCodecKind::Protobuf), + bytes: Bytes::new(), + }; + + let value_len = prost::Message::encoded_len(value); + EMPTY.encoded_len() + value_len + } + fn encode_protobuf(value: &T) -> Result { let mut buf = BytesMut::new(); value @@ -437,6 +490,71 @@ mod envelope { }}; } + pub fn estimate_custom_encoding_len(envelope: &super::Envelope) -> usize { + let command_len = match &envelope.command { + Command::UpdatePartitionDurability(value) => Field::bilrost_encoded_len(value), + Command::VersionBarrier(value) => Field::bilrost_encoded_len(value), + Command::AnnounceLeader(value) => { + Field::serde_encoded_len(value, StorageCodecKind::FlexbuffersSerde) + } + Command::PatchState(value) => { + // we are copying because we _assume_ that PatchState is not widely used. + // The clone will allocate a new hashmap but kvpairs are Bytes (cheap clones) + let value = protobuf::StateMutation::from(value.clone()); + Field::protobuf_encoded_len(&value) + } + Command::TerminateInvocation(value) => { + Field::serde_encoded_len(value, StorageCodecKind::FlexbuffersSerde) + } + Command::PurgeInvocation(value) => { + Field::serde_encoded_len(value, StorageCodecKind::FlexbuffersSerde) + } + Command::PurgeJournal(value) => { + Field::serde_encoded_len(value, StorageCodecKind::FlexbuffersSerde) + } + Command::Invoke(value) => { + let value = protobuf::ServiceInvocation::from(value.as_ref()); + // ideally, the envelope would carry the protobuf wrapper instead of doing the + // conversion twice (once for length estimate and another for serialization) + Field::protobuf_encoded_len(&value) + } + Command::TruncateOutbox(value) => { + Field::serde_encoded_len(value, StorageCodecKind::FlexbuffersSerde) + } + Command::ProxyThrough(value) => { + let value = protobuf::ServiceInvocation::from(value.as_ref()); + Field::protobuf_encoded_len(&value) + } + Command::AttachInvocation(value) => { + let value = protobuf::outbox_message::AttachInvocationRequest::from(value.clone()); + Field::protobuf_encoded_len(&value) + } + Command::InvokerEffect(value) => { + Field::serde_encoded_len(value, StorageCodecKind::FlexbuffersSerde) + } + Command::Timer(value) => { + Field::serde_encoded_len(value, StorageCodecKind::FlexbuffersSerde) + } + Command::ScheduleTimer(value) => { + Field::serde_encoded_len(value, StorageCodecKind::FlexbuffersSerde) + } + Command::InvocationResponse(value) => { + let value = + protobuf::outbox_message::OutboxServiceInvocationResponse::from(value.clone()); + Field::protobuf_encoded_len(&value) + } + Command::NotifyGetInvocationOutputResponse(value) => Field::bilrost_encoded_len(value), + Command::NotifySignal(value) => { + let value = protobuf::outbox_message::NotifySignal::from(value.clone()); + Field::protobuf_encoded_len(&value) + } + }; + + // Assuming 350 bytes for the header and the envelope type-tag + 8 bytes for the command kind + // overhead = 358 + 358 + command_len + } + pub fn encode(envelope: &super::Envelope) -> Result { // todo(azmy): avoid clone? this will require change to `From` implementation let (command_kind, command) = match &envelope.command { diff --git a/crates/worker/src/metric_definitions.rs b/crates/worker/src/metric_definitions.rs index 9b57191c1e..fb190daae5 100644 --- a/crates/worker/src/metric_definitions.rs +++ b/crates/worker/src/metric_definitions.rs @@ -10,7 +10,7 @@ /// Optional to have but adds description/help message to the metrics emitted to /// the metrics' sink. -use metrics::{Unit, describe_counter, describe_gauge, describe_histogram}; +use metrics::{Unit, describe_gauge, describe_histogram}; pub const PARTITION_LABEL: &str = "partition"; @@ -23,18 +23,12 @@ pub const NUM_PARTITIONS: &str = "restate.num_partitions"; pub const NUM_ACTIVE_PARTITIONS: &str = "restate.num_active_partitions"; pub const PARTITION_TIME_SINCE_LAST_STATUS_UPDATE: &str = "restate.partition.time_since_last_status_update"; -pub const PARTITION_APPLIED_LSN: &str = "restate.partition.applied_lsn"; pub const PARTITION_APPLIED_LSN_LAG: &str = "restate.partition.applied_lsn_lag"; -pub const PARTITION_DURABLE_LSN: &str = "restate.partition.durable_lsn"; pub const PARTITION_IS_EFFECTIVE_LEADER: &str = "restate.partition.is_effective_leader"; -pub const PARTITION_IS_ACTIVE: &str = "restate.partition.is_active"; pub const PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS: &str = "restate.partition.record_committed_to_read_latency.seconds"; -// to calculate read rates -pub const PARTITION_RECORD_READ_COUNT: &str = "restate.partition.record_read_count"; - pub(crate) fn describe_metrics() { describe_gauge!( PARTITION_BLOCKED_FLARE, @@ -75,39 +69,15 @@ pub(crate) fn describe_metrics() { "Set to 1 if the partition is an effective leader" ); - describe_gauge!( - PARTITION_IS_ACTIVE, - Unit::Count, - "Set to 1 if the partition is an active replay (not catching up or starting)" - ); - describe_gauge!( PARTITION_TIME_SINCE_LAST_STATUS_UPDATE, Unit::Seconds, "Number of seconds since the last partition status update" ); - describe_gauge!( - PARTITION_APPLIED_LSN, - Unit::Count, - "Raw value of the last applied log LSN" - ); - describe_gauge!( PARTITION_APPLIED_LSN_LAG, Unit::Count, "Number of records between last applied lsn and the log tail" ); - - describe_gauge!( - PARTITION_DURABLE_LSN, - Unit::Count, - "Raw value of the LSN that can be trimmed" - ); - - describe_counter!( - PARTITION_RECORD_READ_COUNT, - Unit::Count, - "Number of read records from bifrost", - ); } diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 51090ff64c..8bd9a34216 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -23,7 +23,7 @@ use anyhow::Context; use assert2::let_assert; use enumset::EnumSet; use futures::{FutureExt, Stream, StreamExt, TryStreamExt as _}; -use metrics::{SharedString, counter, gauge, histogram}; +use metrics::{SharedString, gauge, histogram}; use tokio::sync::{mpsc, watch}; use tokio::time::MissedTickBehavior; use tracing::{Span, debug, error, info, instrument, trace, warn}; @@ -76,7 +76,6 @@ use restate_wal_protocol::{Command, Destination, Envelope, Header, Source}; use self::leadership::trim_queue::TrimQueue; use crate::metric_definitions::{ PARTITION_BLOCKED_FLARE, PARTITION_LABEL, PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS, - PARTITION_RECORD_READ_COUNT, }; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::leadership::LeadershipState; @@ -403,10 +402,10 @@ where let mut live_config = Configuration::live(); // Telemetry setup - let leader_record_write_to_read_latency = histogram!(PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS, PARTITION_LABEL => self.partition_id_str.clone(), "leader" => "1"); - let follower_record_write_to_read_latency = histogram!(PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS, PARTITION_LABEL => self.partition_id_str.clone(), "leader" => "0"); - let command_read_count = - counter!(PARTITION_RECORD_READ_COUNT, PARTITION_LABEL => self.partition_id_str.clone()); + let leader_record_write_to_read_latency = + histogram!(PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS, "leader" => "1"); + let follower_record_write_to_read_latency = + histogram!(PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS, "leader" => "0"); // Start reading after the last applied lsn let key_query = KeyFilter::Within(self.partition_store.partition_key_range().clone()); @@ -511,8 +510,6 @@ where // check that reading has succeeded operation?; - command_read_count.increment(u64::try_from(command_buffer.len()).expect("usize fit in u64")); - let mut transaction = partition_store.transaction(); // clear buffers used when applying the next record diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 92c1b7a0ab..2ac2c7b15c 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -31,13 +31,11 @@ use tokio::task::JoinSet; use tokio::time::MissedTickBehavior; use tracing::{debug, error, info, info_span, instrument, trace, warn}; -use crate::metric_definitions::PARTITION_DURABLE_LSN; -use crate::metric_definitions::PARTITION_IS_ACTIVE; +use crate::metric_definitions::NUM_PARTITIONS; use crate::metric_definitions::PARTITION_IS_EFFECTIVE_LEADER; use crate::metric_definitions::PARTITION_LABEL; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_STATUS_UPDATE; use crate::metric_definitions::{NUM_ACTIVE_PARTITIONS, PARTITION_APPLIED_LSN_LAG}; -use crate::metric_definitions::{NUM_PARTITIONS, PARTITION_APPLIED_LSN}; use crate::partition::ProcessorError; use crate::partition_processor_manager::processor_state::{ LeaderEpochToken, ProcessorState, StartedProcessor, @@ -720,38 +718,18 @@ impl PartitionProcessorManager { .iter() .filter_map(|(partition_id, processor_state)| { let mut status = processor_state.partition_processor_status()?; + let labels = [(PARTITION_LABEL, partition_id.to_string())]; - gauge!(PARTITION_TIME_SINCE_LAST_STATUS_UPDATE, - PARTITION_LABEL => partition_id.to_string()) - .set(status.updated_at.elapsed()); + gauge!(PARTITION_TIME_SINCE_LAST_STATUS_UPDATE, &labels) + .set(status.updated_at.elapsed()); - gauge!(PARTITION_IS_EFFECTIVE_LEADER, - PARTITION_LABEL => partition_id.to_string()) - .set(if status.is_effective_leader() { - 1.0 - } else { - 0.0 - }); - - gauge!(PARTITION_IS_ACTIVE, - PARTITION_LABEL => partition_id.to_string()) - .set(if status.replay_status == ReplayStatus::Active { - 1.0 - } else { - 0.0 - }); - - if let Some(last_applied_log_lsn) = status.last_applied_log_lsn { - gauge!(PARTITION_APPLIED_LSN, - PARTITION_LABEL => partition_id.to_string()) - .set(last_applied_log_lsn.as_u64() as f64); - } - - if let Some(durable_lsn) = status.last_persisted_log_lsn { - gauge!(PARTITION_DURABLE_LSN, - PARTITION_LABEL => partition_id.to_string()) - .set(durable_lsn.as_u64() as f64); - } + gauge!(PARTITION_IS_EFFECTIVE_LEADER, &labels).set( + if status.is_effective_leader() { + 1.0 + } else { + 0.0 + }, + ); // it is a bit unfortunate that we share PartitionProcessorStatus between the // PP and the PPManager :-(. Maybe at some point we want to split the struct for it. @@ -768,16 +746,18 @@ impl PartitionProcessorManager { None => { // current tail lsn is unknown. // This might indicate an issue, so we set the metric to infinity - gauge!(PARTITION_APPLIED_LSN_LAG, PARTITION_LABEL => partition_id.to_string()) - .set(f64::INFINITY); - }, + gauge!(PARTITION_APPLIED_LSN_LAG, &labels).set(f64::INFINITY); + } Some(target_tail_lsn) => { status.target_tail_lsn = Some(target_tail_lsn); // tail lsn always points to the next "free" lsn slot. Therefor the lag is calculate as `lsn-1` // hence we do target_tail_lsn.prev() below - gauge!(PARTITION_APPLIED_LSN_LAG, PARTITION_LABEL => partition_id.to_string()) - .set(target_tail_lsn.prev().as_u64().saturating_sub(status.last_applied_log_lsn.unwrap_or(Lsn::OLDEST).as_u64()) as f64); + gauge!(PARTITION_APPLIED_LSN_LAG, &labels).set( + target_tail_lsn.prev().as_u64().saturating_sub( + status.last_applied_log_lsn.unwrap_or(Lsn::OLDEST).as_u64(), + ) as f64, + ); } } diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 6c8dbaf1ae..b690751e98 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -83,7 +83,7 @@ libz-sys = { version = "1", features = ["static"] } log = { version = "0.4", default-features = false, features = ["std"] } md-5 = { version = "0.10" } memchr = { version = "2" } -metrics-util = { version = "0.19" } +metrics-util = { version = "0.20" } nom = { version = "7" } num-bigint = { version = "0.4" } num-integer = { version = "0.1", features = ["i128"] } @@ -210,7 +210,7 @@ libz-sys = { version = "1", features = ["static"] } log = { version = "0.4", default-features = false, features = ["std"] } md-5 = { version = "0.10" } memchr = { version = "2" } -metrics-util = { version = "0.19" } +metrics-util = { version = "0.20" } nom = { version = "7" } num-bigint = { version = "0.4" } num-integer = { version = "0.1", features = ["i128"] }