Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ metrics = { version = "0.24" }
metrics-exporter-prometheus = { version = "0.17", default-features = false, features = [
"async-runtime",
] }
metrics-util = { version = "0.19.0" }
metrics-util = { version = "0.20.0" }
moka = "0.12.5"
num-traits = { version = "0.2.17" }
object_store = { version = "0.12.2", features = ["aws"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/metric_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

#![allow(unused)]

use metrics::{Unit, describe_counter};

// value of label `kind` in TC_SPAWN are defined in [`crate::TaskKind`].
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/network/message_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,6 @@ impl<S: Service> Default for ServiceReceiver<S> {
}
}

#[derive(Clone)]
struct ServiceSender {
sender: mpsc::Sender<ServiceOp>,
started: Arc<AtomicBool>,
Expand Down
19 changes: 14 additions & 5 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::time::Duration;

use futures::FutureExt;
use futures::future::BoxFuture;
#[cfg(debug_assertions)]
use metrics::counter;
use parking_lot::Mutex;
use tokio::sync::oneshot;
Expand All @@ -42,7 +43,8 @@ use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};

use crate::metric_definitions::{self, STATUS_COMPLETED, STATUS_FAILED, TC_FINISHED, TC_SPAWN};
#[cfg(debug_assertions)]
use crate::metric_definitions::{STATUS_COMPLETED, STATUS_FAILED, TC_FINISHED, TC_SPAWN};
use crate::{Metadata, ShutdownError, ShutdownSourceErr};
use restate_types::SharedString;
use restate_types::cluster_state::ClusterState;
Expand Down Expand Up @@ -350,7 +352,7 @@ impl TaskCenterInner {
// partition processor runtimes
#[cfg(any(test, feature = "test-util"))] pause_time: bool,
) -> Self {
metric_definitions::describe_metrics();
crate::metric_definitions::describe_metrics();
let root_task_context = TaskContext {
id: TaskId::ROOT,
name: "::".into(),
Expand Down Expand Up @@ -899,10 +901,13 @@ impl TaskCenterInner {
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let kind_str: &'static str = kind.into();
let runtime_name: &'static str = kind.runtime().into();
let tokio_task = tokio::task::Builder::new().name(name);
counter!(TC_SPAWN, "kind" => kind_str, "runtime" => runtime_name).increment(1);
#[cfg(debug_assertions)]
{
let kind_str: &'static str = kind.into();
let runtime_name: &'static str = kind.runtime().into();
counter!(TC_SPAWN, "kind" => kind_str, "runtime" => runtime_name).increment(1);
}
let runtime = match kind.runtime() {
crate::AsyncRuntime::Inherit => &tokio::runtime::Handle::current(),
crate::AsyncRuntime::Default => &self.default_runtime_handle,
Expand Down Expand Up @@ -930,6 +935,7 @@ impl TaskCenterInner {
// This can happen if the task ownership was taken by calling take_task(id);
return;
};
#[cfg(debug_assertions)]
let kind_str: &'static str = task.kind().into();

let should_shutdown_on_error = task.kind().should_shutdown_on_error();
Expand All @@ -938,6 +944,7 @@ impl TaskCenterInner {
match result {
Ok(Ok(())) => {
trace!(kind = ?task.kind(), name = ?task.name(), "Task {} exited normally", task_id);
#[cfg(debug_assertions)]
counter!(TC_FINISHED, "kind" => kind_str, "status" => STATUS_COMPLETED)
.increment(1);
}
Expand All @@ -961,10 +968,12 @@ impl TaskCenterInner {
} else {
error!(kind = ?task.kind(), name = ?task.name(), "Task {} failed with: {:?}", task_id, err);
}
#[cfg(debug_assertions)]
counter!(TC_FINISHED, "kind" => kind_str, "status" => STATUS_FAILED)
.increment(1);
}
Err(err) => {
#[cfg(debug_assertions)]
counter!(TC_FINISHED, "kind" => kind_str, "status" => STATUS_FAILED)
.increment(1);
if should_shutdown_on_error {
Expand Down
62 changes: 36 additions & 26 deletions crates/core/src/task_center/monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use std::time::Duration;

use metrics::gauge;
use metrics::{counter, gauge};
use tokio::runtime::RuntimeMetrics;

use restate_types::SharedString;
Expand Down Expand Up @@ -60,35 +60,45 @@ impl TaskCenterMonitoring for Handle {
}

fn submit_runtime_metrics(runtime: impl Into<SharedString>, stats: RuntimeMetrics) {
let runtime = runtime.into();
gauge!("restate.tokio.num_workers", "runtime" => runtime.clone())
.set(stats.num_workers() as f64);
gauge!("restate.tokio.blocking_threads", "runtime" => runtime.clone())
.set(stats.num_blocking_threads() as f64);
gauge!("restate.tokio.blocking_queue_depth", "runtime" => runtime.clone())
.set(stats.blocking_queue_depth() as f64);
gauge!("restate.tokio.num_alive_tasks", "runtime" => runtime.clone())
.set(stats.num_alive_tasks() as f64);
gauge!("restate.tokio.io_driver_ready_count", "runtime" => runtime.clone())
let runtime: SharedString = runtime.into();
#[cfg(debug_assertions)]
let labels = [("runtime", runtime.clone())];
#[cfg(debug_assertions)]
gauge!("restate.tokio.num_workers", &labels).set(stats.num_workers() as f64);
#[cfg(debug_assertions)]
gauge!("restate.tokio.blocking_threads", &labels).set(stats.num_blocking_threads() as f64);
#[cfg(debug_assertions)]
gauge!("restate.tokio.blocking_queue_depth", &labels).set(stats.blocking_queue_depth() as f64);
#[cfg(debug_assertions)]
gauge!("restate.tokio.num_alive_tasks", &labels).set(stats.num_alive_tasks() as f64);
#[cfg(debug_assertions)]
gauge!("restate.tokio.io_driver_ready_count", &labels)
.set(stats.io_driver_ready_count() as f64);
gauge!("restate.tokio.remote_schedule_count", "runtime" => runtime.clone())
.set(stats.remote_schedule_count() as f64);
#[cfg(debug_assertions)]
counter!("restate.tokio.remote_schedule_count", &labels)
.absolute(stats.remote_schedule_count());
// per worker stats
for idx in 0..stats.num_workers() {
gauge!("restate.tokio.worker_overflow_count", "runtime" => runtime.clone(), "worker" =>
idx.to_string())
.set(stats.worker_overflow_count(idx) as f64);
gauge!("restate.tokio.worker_poll_count", "runtime" => runtime.clone(), "worker" => idx.to_string())
.set(stats.worker_poll_count(idx) as f64);
gauge!("restate.tokio.worker_park_count", "runtime" => runtime.clone(), "worker" => idx.to_string())
.set(stats.worker_park_count(idx) as f64);
gauge!("restate.tokio.worker_noop_count", "runtime" => runtime.clone(), "worker" => idx.to_string())
.set(stats.worker_noop_count(idx) as f64);
gauge!("restate.tokio.worker_steal_count", "runtime" => runtime.clone(), "worker" => idx.to_string())
.set(stats.worker_steal_count(idx) as f64);
gauge!("restate.tokio.worker_total_busy_duration_seconds", "runtime" => runtime.clone(), "worker" => idx.to_string())
let labels = [
("runtime", runtime.clone()),
("worker", idx.to_string().into()),
];
#[cfg(debug_assertions)]
counter!("restate.tokio.worker_overflow_count", &labels)
.absolute(stats.worker_overflow_count(idx));
#[cfg(debug_assertions)]
counter!("restate.tokio.worker_park_count", &labels).absolute(stats.worker_park_count(idx));
#[cfg(debug_assertions)]
counter!("restate.tokio.worker_noop_count", &labels).absolute(stats.worker_noop_count(idx));
#[cfg(debug_assertions)]
counter!("restate.tokio.worker_steal_count", &labels)
.absolute(stats.worker_steal_count(idx));
#[cfg(debug_assertions)]
gauge!("restate.tokio.worker_total_busy_duration_seconds", &labels)
.set(stats.worker_total_busy_duration(idx).as_secs_f64());
gauge!("restate.tokio.worker_mean_poll_time", "runtime" => runtime.clone(), "worker" => idx.to_string())
// Main metrics we want in non-debug mode
counter!("restate.tokio.worker_poll_count", &labels).absolute(stats.worker_poll_count(idx));
gauge!("restate.tokio.worker_mean_poll_time", &labels)
.set(stats.worker_mean_poll_time(idx).as_secs_f64());
}
}
2 changes: 0 additions & 2 deletions crates/ingress-http/src/handler/service_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,13 @@ where
histogram!(
INGRESS_REQUEST_DURATION,
"rpc.service" => service_name.clone(),
"rpc.method" => handler_name.clone(),
)
.record(start_time.elapsed());

counter!(
INGRESS_REQUESTS,
"status" => REQUEST_COMPLETED,
"rpc.service" => service_name,
"rpc.method" => handler_name,
)
.increment(1);
result
Expand Down
4 changes: 2 additions & 2 deletions crates/ingress-http/src/layers/load_shed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::metric_definitions::{INGRESS_REQUESTS, REQUEST_ADMITTED, REQUEST_DENIED_THROTTLE};
use crate::metric_definitions::{INGRESS_REQUESTS, REQUEST_ADMITTED, REQUEST_RATE_LIMITED};
use futures::ready;
use http::{Request, Response, StatusCode};
use metrics::counter;
Expand Down Expand Up @@ -76,7 +76,7 @@ where
warn!("No available quota to process the request");

// Register request denied
counter!(INGRESS_REQUESTS, "status" => REQUEST_DENIED_THROTTLE).increment(1);
counter!(INGRESS_REQUESTS, "status" => REQUEST_RATE_LIMITED).increment(1);

return ResponseFuture {
state: ResponseState::Overloaded,
Expand Down
2 changes: 1 addition & 1 deletion crates/ingress-http/src/metric_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub const INGRESS_REQUESTS: &str = "restate.ingress.requests.total";
// values of label `status` in INGRESS_REQUEST
pub const REQUEST_ADMITTED: &str = "admitted";
pub const REQUEST_COMPLETED: &str = "completed";
pub const REQUEST_DENIED_THROTTLE: &str = "throttled";
pub const REQUEST_RATE_LIMITED: &str = "rate-limited";

pub const INGRESS_REQUEST_DURATION: &str = "restate.ingress.request_duration.seconds";

Expand Down
14 changes: 5 additions & 9 deletions crates/metadata-store/src/metadata_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ impl MetadataStoreClient {
key: ByteString,
) -> Result<Option<T>, ReadError> {
let start_time = Instant::now();
let key_str = key.to_string();
let result = {
let value = self.inner.get(key).await?;

Expand All @@ -311,7 +310,7 @@ impl MetadataStoreClient {
};

histogram!(METADATA_CLIENT_GET_DURATION).record(start_time.elapsed());
counter!(METADATA_CLIENT_GET_TOTAL, "key" => key_str, "status" => status).increment(1);
counter!(METADATA_CLIENT_GET_TOTAL, "status" => status).increment(1);

result
}
Expand All @@ -320,7 +319,6 @@ impl MetadataStoreClient {
/// [`None`].
pub async fn get_version(&self, key: ByteString) -> Result<Option<Version>, ReadError> {
let start_time = Instant::now();
let key_str = key.to_string();
let result = self.inner.get_version(key).await;

let status = if result.is_ok() {
Expand All @@ -330,8 +328,7 @@ impl MetadataStoreClient {
};

histogram!(METADATA_CLIENT_GET_VERSION_DURATION).record(start_time.elapsed());
counter!(METADATA_CLIENT_GET_VERSION_TOTAL, "key" => key_str, "status" => status)
.increment(1);
counter!(METADATA_CLIENT_GET_VERSION_TOTAL, "status" => status).increment(1);

result
}
Expand All @@ -348,7 +345,6 @@ impl MetadataStoreClient {
T: Versioned + StorageEncode,
{
let start_time = Instant::now();
let key_str = key.to_string();
let result = {
let versioned_value =
serialize_value(value).map_err(|err| WriteError::Codec(err.into()))?;
Expand All @@ -363,7 +359,7 @@ impl MetadataStoreClient {
};

histogram!(METADATA_CLIENT_PUT_DURATION).record(start_time.elapsed());
counter!(METADATA_CLIENT_PUT_TOTAL, "key" => key_str, "status" => status).increment(1);
counter!(METADATA_CLIENT_PUT_TOTAL, "status" => status).increment(1);

result
}
Expand All @@ -383,7 +379,6 @@ impl MetadataStoreClient {
precondition: Precondition,
) -> Result<(), WriteError> {
let start_time = Instant::now();
let key_str = key.to_string();
let result = self.inner.delete(key, precondition).await;

let status = if result.is_ok() {
Expand All @@ -394,7 +389,8 @@ impl MetadataStoreClient {

histogram!(crate::metric_definitions::METADATA_CLIENT_DELETE_DURATION)
.record(start_time.elapsed());
counter!(crate::metric_definitions::METADATA_CLIENT_DELETE_TOTAL,"key" => key_str, "status" => status).increment(1);
counter!(crate::metric_definitions::METADATA_CLIENT_DELETE_TOTAL, "status" => status)
.increment(1);

result
}
Expand Down
5 changes: 2 additions & 3 deletions crates/partition-store/src/owned_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl<'a, DB: DBAccess> OwnedIterator<'a, DB> {
pub(crate) fn new(iter: DBRawIteratorWithThreadMode<'a, DB>) -> Self {
Self {
iter,
arena: BytesMut::with_capacity(8196),
arena: BytesMut::new(),
}
}
}
Expand All @@ -30,9 +30,8 @@ impl<DB: DBAccess> Iterator for OwnedIterator<'_, DB> {

#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.arena.reserve(8192);

if let Some((k, v)) = self.iter.item() {
self.arena.reserve(k.len() + v.len());
self.arena.put_slice(k);
let key = self.arena.split().freeze();
self.arena.put_slice(v);
Expand Down
23 changes: 4 additions & 19 deletions crates/rocksdb/src/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ use derive_builder::Builder;
use metrics::{gauge, histogram};
use tokio::sync::oneshot;

use crate::metric_definitions::{
STORAGE_BG_TASK_RUN_DURATION, STORAGE_BG_TASK_TOTAL_DURATION, STORAGE_BG_TASK_WAIT_DURATION,
};
use crate::metric_definitions::{STORAGE_BG_TASK_RUN_DURATION, STORAGE_BG_TASK_WAIT_DURATION};
use crate::{OP_TYPE, PRIORITY, Priority, STORAGE_BG_TASK_IN_FLIGHT};

#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::IntoStaticStr)]
Expand Down Expand Up @@ -89,23 +87,10 @@ where

fn run(self) -> R {
let start = Instant::now();
histogram!(
STORAGE_BG_TASK_WAIT_DURATION,
PRIORITY => self.priority.as_static_str(),
OP_TYPE => self.kind.as_static_str(),
)
.record(self.created_at.elapsed());
let labels = [(OP_TYPE, self.kind.as_static_str())];
histogram!(STORAGE_BG_TASK_WAIT_DURATION, &labels).record(self.created_at.elapsed());
let res = (self.op)();
histogram!(STORAGE_BG_TASK_RUN_DURATION,
PRIORITY => self.priority.as_static_str(),
OP_TYPE => self.kind.as_static_str(),
)
.record(start.elapsed());
histogram!(STORAGE_BG_TASK_TOTAL_DURATION,
PRIORITY => self.priority.as_static_str(),
OP_TYPE => self.kind.as_static_str(),
)
.record(self.created_at.elapsed());
histogram!(STORAGE_BG_TASK_RUN_DURATION, &labels).record(start.elapsed());
gauge!(STORAGE_BG_TASK_IN_FLIGHT,
PRIORITY => self.priority.as_static_str(),
OP_TYPE => self.kind.as_static_str(),
Expand Down
Loading
Loading