From 042d0753515f2eeac08c12936482dd82f58df338 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Thu, 4 Sep 2025 17:35:18 +0100 Subject: [PATCH 1/3] Extension trait for `Duration` for friendly display Introduces a crate for time utilities (`serde-time-util`). The crate currently provides a trait `DurationExt` which provides a `friendly()` method for formatting a `Duration` as a human-readable string. It also allows parsing a human-friendly duration into a string. Additionally, this removes the helpers from `DurationString` and keeps `restate-serde-util`'s DurationString as a serialization helper only. Minor but relevant changes: - Some debug traces that reported the response latency now use the `to_seconds_span()` since it's cheaper to compute. - Fixed typos in restate CLI service config commands - More efficient serialization for DurationString that avoids an extra String allocation. - Used jiff's native span rounding feature to round a duration to "days" instead of the manual calculation. - Added a `negated()` method to the time span types to support "ago" formatting. - Migrateed all uses of `DurationString` to `DurationExt::friendly()` including in CLI argument parsing. --- Cargo.lock | 14 +- Cargo.toml | 1 + cli/Cargo.toml | 3 +- cli/src/commands/services/config/edit.rs | 8 +- cli/src/commands/services/config/patch.rs | 71 ++-- cli/src/commands/services/config/view.rs | 31 +- crates/admin/Cargo.toml | 1 + crates/admin/src/service.rs | 6 +- crates/ingress-http/Cargo.toml | 3 +- crates/ingress-http/src/server.rs | 6 +- crates/serde-util/Cargo.toml | 4 +- crates/serde-util/src/duration.rs | 129 ++----- crates/time-util/Cargo.toml | 17 + crates/time-util/src/duration.rs | 441 ++++++++++++++++++++++ crates/time-util/src/lib.rs | 13 + 15 files changed, 566 insertions(+), 182 deletions(-) create mode 100644 crates/time-util/Cargo.toml create mode 100644 crates/time-util/src/duration.rs create mode 100644 crates/time-util/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index a6ec14a005..2fc06f98c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6511,6 +6511,7 @@ dependencies = [ "restate-service-protocol", "restate-storage-query-datafusion", "restate-test-util", + "restate-time-util", "restate-types", "restate-wal-protocol", "restate-web-ui", @@ -6676,6 +6677,7 @@ dependencies = [ "restate-cli-util", "restate-cloud-tunnel-client", "restate-serde-util", + "restate-time-util", "restate-types", "restate-workspace-hack", "rustls", @@ -6908,6 +6910,7 @@ dependencies = [ "restate-errors", "restate-serde-util", "restate-test-util", + "restate-time-util", "restate-tracing-instrumentation", "restate-types", "restate-workspace-hack", @@ -7394,8 +7397,8 @@ dependencies = [ "bytesize", "http 1.3.1", "http-serde", - "jiff", "prost", + "restate-time-util", "restate-workspace-hack", "schemars 0.8.22", "serde", @@ -7627,6 +7630,15 @@ dependencies = [ "restate-workspace-hack", ] +[[package]] +name = "restate-time-util" +version = "1.5.0-dev" +dependencies = [ + "jiff", + "restate-workspace-hack", + "thiserror 2.0.12", +] + [[package]] name = "restate-timer" version = "1.5.0-dev" diff --git a/Cargo.toml b/Cargo.toml index 3f448f9cce..b62612a138 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,7 @@ restate-service-protocol-v4 = { path = "crates/service-protocol-v4" } restate-storage-api = { path = "crates/storage-api" } restate-storage-query-datafusion = { path = "crates/storage-query-datafusion" } restate-test-util = { path = "crates/test-util" } +restate-time-util = { path = "crates/time-util" } restate-timer = { path = "crates/timer" } restate-timer-queue = { path = "crates/timer-queue" } restate-tracing-instrumentation = { path = "crates/tracing-instrumentation" } diff --git a/cli/Cargo.toml b/cli/Cargo.toml index d719e1bd2e..8142b9b82f 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -28,6 +28,7 @@ restate-admin-rest-model = { workspace = true } restate-cli-util = { workspace = true } restate-cloud-tunnel-client = { workspace = true } restate-serde-util = { workspace = true } +restate-time-util = { workspace = true } restate-types = { workspace = true } anyhow = { workspace = true } @@ -95,4 +96,4 @@ bench = false [[bin]] name = "restate" -path = "src/main.rs" \ No newline at end of file +path = "src/main.rs" diff --git a/cli/src/commands/services/config/edit.rs b/cli/src/commands/services/config/edit.rs index 3786b7adc5..b8636e0fee 100644 --- a/cli/src/commands/services/config/edit.rs +++ b/cli/src/commands/services/config/edit.rs @@ -54,12 +54,8 @@ async fn edit(env: &CliEnv, opts: &Edit) -> Result<()> { ) .context("Cannot parse the edited config file")?; - super::patch::apply_service_configuration_patch( - opts.service.clone(), - admin_client, - modify_request, - ) - .await + super::patch::apply_service_configuration_patch(&opts.service, admin_client, modify_request) + .await } // TODO generate this file from the JsonSchema diff --git a/cli/src/commands/services/config/patch.rs b/cli/src/commands/services/config/patch.rs index 77754a5603..a597cb9a5b 100644 --- a/cli/src/commands/services/config/patch.rs +++ b/cli/src/commands/services/config/patch.rs @@ -8,7 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use anyhow::{Context, Result}; +use anyhow::Result; use cling::prelude::*; use comfy_table::Table; use const_format::concatcp; @@ -16,13 +16,13 @@ use const_format::concatcp; use restate_admin_rest_model::services::ModifyServiceRequest; use restate_cli_util::c_println; use restate_cli_util::ui::console::{StyledTable, confirm_or_exit}; -use restate_serde_util::DurationString; +use restate_time_util::{DurationExt, FriendlyDuration}; use crate::cli_env::CliEnv; use crate::clients::{AdminClient, AdminClientInterface}; -pub(super) const DURATION_EDIT_DESCRIPTION: &str = "Can be configured using the jiff \ - friendly format (https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) or ISO8601."; +pub(super) const DURATION_EDIT_DESCRIPTION: &str = "Can be configured using a human friendly \ + duration format (e.g. 5d 1h 30m 15s) or ISO8601."; pub(super) const IDEMPOTENCY_RETENTION_EDIT_DESCRIPTION: &str = concatcp!( super::view::IDEMPOTENCY_RETENTION, "\n", @@ -53,19 +53,19 @@ pub struct Patch { public: Option, #[clap(long, alias = "idempotency_retention", help = IDEMPOTENCY_RETENTION_EDIT_DESCRIPTION)] - idempotency_retention: Option, + idempotency_retention: Option, #[clap(long, alias = "workflow_completion_retention", help = WORKFLOW_RETENTION_EDIT_DESCRIPTION)] - workflow_completion_retention: Option, + workflow_completion_retention: Option, #[clap(long, alias = "journal_retention", help = JOURNAL_RETENTION_EDIT_DESCRIPTION)] - journal_retention: Option, + journal_retention: Option, - #[clap(long, alias = "inactivity_retention", help = INACTIVITY_TIMEOUT_EDIT_DESCRIPTION)] - inactivity_timeout: Option, + #[clap(long, alias = "inactivity_timeout", help = INACTIVITY_TIMEOUT_EDIT_DESCRIPTION)] + inactivity_timeout: Option, - #[clap(long, alias = "abort_retention", help = ABORT_TIMEOUT_EDIT_DESCRIPTION)] - abort_timeout: Option, + #[clap(long, alias = "abort_timeout", help = ABORT_TIMEOUT_EDIT_DESCRIPTION)] + abort_timeout: Option, /// Service name service: String, @@ -79,43 +79,20 @@ async fn patch(env: &CliEnv, opts: &Patch) -> Result<()> { let admin_client = AdminClient::new(env).await?; let modify_request = ModifyServiceRequest { public: opts.public, - idempotency_retention: opts - .idempotency_retention - .as_ref() - .map(|s| { - DurationString::parse_duration(s).context("Cannot parse idempotency_retention") - }) - .transpose()?, + idempotency_retention: opts.idempotency_retention.map(FriendlyDuration::to_std), workflow_completion_retention: opts .workflow_completion_retention - .as_ref() - .map(|s| { - DurationString::parse_duration(s) - .context("Cannot parse workflow_completion_retention") - }) - .transpose()?, - journal_retention: opts - .journal_retention - .as_ref() - .map(|s| DurationString::parse_duration(s).context("Cannot parse journal_retention")) - .transpose()?, - inactivity_timeout: opts - .inactivity_timeout - .as_ref() - .map(|s| DurationString::parse_duration(s).context("Cannot parse inactivity_timeout")) - .transpose()?, - abort_timeout: opts - .abort_timeout - .as_ref() - .map(|s| DurationString::parse_duration(s).context("Cannot parse abort_timeout")) - .transpose()?, + .map(FriendlyDuration::to_std), + journal_retention: opts.journal_retention.map(FriendlyDuration::to_std), + inactivity_timeout: opts.inactivity_timeout.map(FriendlyDuration::to_std), + abort_timeout: opts.abort_timeout.map(FriendlyDuration::to_std), }; - apply_service_configuration_patch(opts.service.clone(), admin_client, modify_request).await + apply_service_configuration_patch(&opts.service, admin_client, modify_request).await } pub(super) async fn apply_service_configuration_patch( - service_name: String, + service_name: &str, admin_client: AdminClient, modify_request: ModifyServiceRequest, ) -> Result<()> { @@ -139,35 +116,35 @@ pub(super) async fn apply_service_configuration_patch( if let Some(idempotency_retention) = &modify_request.idempotency_retention { table.add_kv_row( "Idempotent requests retention:", - DurationString::display(*idempotency_retention), + idempotency_retention.friendly().to_days_span(), ); } if let Some(workflow_completion_retention) = &modify_request.workflow_completion_retention { table.add_kv_row( "Workflow retention:", - DurationString::display(*workflow_completion_retention), + workflow_completion_retention.friendly().to_days_span(), ); } if let Some(journal_retention) = &modify_request.journal_retention { table.add_kv_row( "Journal retention:", - DurationString::display(*journal_retention), + journal_retention.friendly().to_days_span(), ); } if let Some(inactivity_timeout) = &modify_request.inactivity_timeout { table.add_kv_row( "Inactivity timeout:", - DurationString::display(*inactivity_timeout), + inactivity_timeout.friendly().to_days_span(), ); } if let Some(abort_timeout) = &modify_request.abort_timeout { - table.add_kv_row("Abort timeout:", DurationString::display(*abort_timeout)); + table.add_kv_row("Abort timeout:", abort_timeout.friendly().to_days_span()); } c_println!("{table}"); confirm_or_exit("Are you sure you want to apply these changes?")?; let _ = admin_client - .patch_service(&service_name, modify_request) + .patch_service(service_name, modify_request) .await? .into_body() .await?; diff --git a/cli/src/commands/services/config/view.rs b/cli/src/commands/services/config/view.rs index a03f4546d3..c5294752ad 100644 --- a/cli/src/commands/services/config/view.rs +++ b/cli/src/commands/services/config/view.rs @@ -8,17 +8,19 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::cli_env::CliEnv; -use crate::clients::{AdminClient, AdminClientInterface}; use anyhow::Result; use cling::prelude::*; use comfy_table::Table; use indoc::indoc; + use restate_cli_util::ui::console::StyledTable; use restate_cli_util::{c_println, c_tip}; -use restate_serde_util::DurationString; +use restate_time_util::DurationExt; use restate_types::invocation::ServiceType; +use crate::cli_env::CliEnv; +use crate::clients::{AdminClient, AdminClientInterface}; + // TODO we could infer this text from the OpenAPI docs! pub(super) const PUBLIC_DESCRIPTION: &str = indoc! { "Whether the service is publicly available or not. @@ -94,7 +96,7 @@ async fn view(env: &CliEnv, opts: &View) -> Result<()> { let mut table = Table::new_styled(); table.add_kv_row( "Idempotent requests retention:", - DurationString::display(service.idempotency_retention), + service.idempotency_retention.friendly(), ); c_println!("{table}"); c_tip!("{}", IDEMPOTENCY_RETENTION); @@ -104,11 +106,10 @@ async fn view(env: &CliEnv, opts: &View) -> Result<()> { let mut table = Table::new_styled(); table.add_kv_row( "Workflow retention time:", - DurationString::display( - service - .workflow_completion_retention - .expect("Workflows must have a well defined retention"), - ), + service + .workflow_completion_retention + .expect("Workflows must have a well defined retention") + .friendly(), ); c_println!("{table}"); c_tip!("{}", WORKFLOW_RETENTION); @@ -120,7 +121,7 @@ async fn view(env: &CliEnv, opts: &View) -> Result<()> { "Journal retention:", service .journal_retention - .map(DurationString::display) + .map(|d| d.friendly().to_string()) .unwrap_or_else(|| "".to_string()), ); c_println!("{table}"); @@ -128,19 +129,13 @@ async fn view(env: &CliEnv, opts: &View) -> Result<()> { c_println!(); let mut table = Table::new_styled(); - table.add_kv_row( - "Inactivity timeout:", - DurationString::display(service.inactivity_timeout), - ); + table.add_kv_row("Inactivity timeout:", service.inactivity_timeout.friendly()); c_println!("{table}"); c_tip!("{}", INACTIVITY_TIMEOUT); c_println!(); let mut table = Table::new_styled(); - table.add_kv_row( - "Abort timeout:", - DurationString::display(service.abort_timeout), - ); + table.add_kv_row("Abort timeout:", service.abort_timeout.friendly()); c_println!("{table}"); c_tip!("{}", ABORT_TIMEOUT); c_println!(); diff --git a/crates/admin/Cargo.toml b/crates/admin/Cargo.toml index 13ae8c63a5..a546ceb5e0 100644 --- a/crates/admin/Cargo.toml +++ b/crates/admin/Cargo.toml @@ -27,6 +27,7 @@ restate-serde-util = { workspace = true } restate-service-client = { workspace = true } restate-service-protocol = { workspace = true, features = ["discovery"] } restate-storage-query-datafusion = { workspace = true } +restate-time-util = { workspace = true } restate-types = { workspace = true } restate-wal-protocol = { workspace = true } restate-web-ui = { git = "https://github.com/restatedev/restate-web-ui-crate", optional = true, version = "0.1.3", tag = "v0.1.3" } diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index a1f7b96be7..d99f507ff6 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -16,9 +16,9 @@ use restate_admin_rest_model::version::AdminApiVersion; use restate_bifrost::Bifrost; use restate_core::MetadataWriter; use restate_core::network::net_util; -use restate_serde_util::DurationString; use restate_service_client::HttpClient; use restate_service_protocol::discovery::ServiceDiscovery; +use restate_time_util::DurationExt; use restate_types::config::AdminOptions; use restate_types::invocation::client::InvocationClient; use restate_types::live::LiveLoad; @@ -134,7 +134,7 @@ where name: "access-log", target: "restate_admin::api", parent: span, - { http.response.status_code = response.status().as_u16(), http.response.latency = DurationString::display(latency) }, + { http.response.status_code = response.status().as_u16(), http.response.latency = %latency.friendly().to_seconds_span() }, "Replied" ) }, @@ -150,7 +150,7 @@ where name: "access-log", target: "restate_admin::api", parent: span, - { error.type = error_string, http.response.latency = DurationString::display(latency) }, + { error.type = error_string, http.response.latency = %latency.friendly().to_seconds_span() }, "Failed processing" ) } diff --git a/crates/ingress-http/Cargo.toml b/crates/ingress-http/Cargo.toml index 54cd0e6169..466e4cdd1e 100644 --- a/crates/ingress-http/Cargo.toml +++ b/crates/ingress-http/Cargo.toml @@ -18,6 +18,7 @@ restate-workspace-hack = { workspace = true } restate-core = { workspace = true } restate-errors = { workspace = true } restate-serde-util = { workspace = true } +restate-time-util = { workspace = true } restate-tracing-instrumentation = { workspace = true } restate-types = { workspace = true } @@ -60,4 +61,4 @@ hyper = { workspace = true, features = ["full"] } hyper-util = { workspace = true, features = ["full"] } tracing-test = { workspace = true } tracing-subscriber = { workspace = true } -googletest = { workspace = true } \ No newline at end of file +googletest = { workspace = true } diff --git a/crates/ingress-http/src/server.rs b/crates/ingress-http/src/server.rs index 6835bd4891..2fe521273a 100644 --- a/crates/ingress-http/src/server.rs +++ b/crates/ingress-http/src/server.rs @@ -17,7 +17,7 @@ use hyper::body::Incoming; use hyper_util::rt::TokioIo; use hyper_util::server::conn::auto; use restate_core::{TaskCenter, TaskKind, cancellation_watcher}; -use restate_serde_util::DurationString; +use restate_time_util::DurationExt; use restate_types::config::IngressOptions; use restate_types::health::HealthStatus; use restate_types::live::Live; @@ -169,7 +169,7 @@ where name: "access-log", target: "restate_ingress_http::api", parent: span, - { http.response.status_code = response.status().as_u16(), http.response.latency = DurationString::display(latency) }, + { http.response.status_code = response.status().as_u16(), http.response.latency = %latency.friendly().to_seconds_span() }, "Replied" ) }, @@ -185,7 +185,7 @@ where name: "access-log", target: "restate_ingress_http::api", parent: span, - { error.type = error_string, http.response.latency = DurationString::display(latency) }, + { error.type = error_string, http.response.latency = %latency.friendly().to_seconds_span() }, "Failed processing" ) } diff --git a/crates/serde-util/Cargo.toml b/crates/serde-util/Cargo.toml index 027ab3714d..dd4420ffdc 100644 --- a/crates/serde-util/Cargo.toml +++ b/crates/serde-util/Cargo.toml @@ -14,6 +14,7 @@ proto = ["dep:prost", "dep:bytes"] [dependencies] restate-workspace-hack = { workspace = true } +restate-time-util = { workspace = true } bytes = { workspace = true, optional = true } bytesize = { workspace = true } @@ -24,7 +25,6 @@ schemars = { workspace = true, optional = true } serde = { workspace = true } serde_json = { workspace = true, optional = true } serde_with = { workspace = true } -jiff = { workspace = true } [dev-dependencies] -serde_json = { workspace = true } \ No newline at end of file +serde_json = { workspace = true } diff --git a/crates/serde-util/src/duration.rs b/crates/serde-util/src/duration.rs index 9092a31e20..b70cafc22b 100644 --- a/crates/serde-util/src/duration.rs +++ b/crates/serde-util/src/duration.rs @@ -8,73 +8,48 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::str::FromStr as _; use std::time::Duration; -use jiff::fmt::friendly::{Designator, SpanPrinter}; -use jiff::{Span, SpanRelativeTo}; -use serde::de::{Error, IntoDeserializer}; -use serde::{Deserialize, Deserializer, Serializer}; +use restate_time_util::{DurationExt, FriendlyDuration}; + +use serde::de::Error; +use serde::{Deserializer, Serializer}; use serde_with::{DeserializeAs, SerializeAs}; /// Serializable/Deserializable duration for use with serde_with. /// -/// Deserialization uses [`jiff::Span`]'s parsing to support both human-friendly and ISO8601 +/// Deserialization uses [`restate_time_util::FriendlyDuration`]'s parsing to support both human-friendly and ISO8601 /// inputs. Days are the largest supported unit of time, and are interpreted as 24 hours long when /// converting the parsed span into an actual duration. /// -/// Serialization is performed using [`jiff::fmt::friendly`] for output. +/// Serialization is performed using [`restate_time_util::FriendlyDuration::to_days_span`] for output. pub struct DurationString; -impl DurationString { - /// Parse a duration from a string using the [`jiff::fmt::friendly`] format, allowing units of - /// up to days to be used. - pub fn parse_duration(s: &str) -> Result { - serde_with::As::::deserialize(s.into_deserializer()) - } - - /// Prints a duration as a string using the [`jiff::fmt::friendly`] format, using units up to - /// days. - pub fn display(duration: Duration) -> String { - // jiff's SignedDuration pretty-print will add up units up to hours but not to days, - // so we use a Span to get finer control over the display output and use calendar units - // up to days (which we consider to be 24 hours in duration) - let mut span = Span::try_from(duration).expect("fits i64"); - if span.get_seconds() >= 60 { - let minutes = span.get_seconds() / 60; - let seconds = span.get_seconds() % 60; - span = span.minutes(minutes).seconds(seconds); - } - if span.get_minutes() >= 60 { - let hours = span.get_minutes() / 60; - let minutes = span.get_minutes() % 60; - span = span.hours(hours).minutes(minutes); - } - if span.get_hours() >= 24 { - let days = span.get_hours() / 24; - let hours = span.get_hours() % 24; - span = span.days(days).hours(hours); - }; - - let printer = SpanPrinter::new().designator(Designator::Compact); - printer.span_to_string(&span) - } -} - -impl<'de> DeserializeAs<'de, std::time::Duration> for DurationString { +impl<'de> DeserializeAs<'de, Duration> for DurationString { fn deserialize_as(deserializer: D) -> Result where D: Deserializer<'de>, { - let span: Span = String::deserialize(deserializer)? - .parse() - .map_err(Error::custom)?; - if span.get_years() > 0 || span.get_months() > 0 { - return Err(Error::custom("Please use units of days or smaller")); + struct Helper; + impl serde::de::Visitor<'_> for Helper { + type Value = Duration; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("a human-friendly duration string") + } + + fn visit_str(self, value: &str) -> Result + where + E: serde::de::Error, + { + FriendlyDuration::from_str(value) + .map_err(Error::custom) + .map(|d| d.to_std()) + } } - let signed_duration = span - .to_duration(SpanRelativeTo::days_are_24_hours()) - .map_err(Error::custom)?; - Duration::try_from(signed_duration).map_err(Error::custom) + + deserializer.deserialize_str(Helper) } } @@ -83,7 +58,7 @@ impl SerializeAs for DurationString { where S: Serializer, { - serializer.collect_str(&DurationString::display(*source)) + serializer.collect_str(&source.friendly().to_days_span()) } } @@ -103,6 +78,8 @@ impl schemars::JsonSchema for DurationString { metadata.examples = vec![ serde_json::Value::String("10 hours".to_owned()), serde_json::Value::String("5 days".to_owned()), + serde_json::Value::String("5d".to_owned()), + serde_json::Value::String("1h 4m".to_owned()), serde_json::Value::String("P40D".to_owned()), ]; schema.into() @@ -113,7 +90,6 @@ impl schemars::JsonSchema for DurationString { mod tests { use super::*; - use jiff::{SignedDuration, ToSpan}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -123,53 +99,6 @@ mod tests { #[serde(transparent)] struct MyDuration(#[serde_as(as = "DurationString")] std::time::Duration); - #[test] - fn parse_duration_input_formats() { - let duration = DurationString::parse_duration("10 min"); - assert_eq!(Ok(Duration::from_secs(600)), duration); - - // we don't support "1 month" as months have variable length - let duration = DurationString::parse_duration("P1M"); - assert_eq!( - Err(serde::de::Error::custom( - "Please use units of days or smaller" - )), - duration - ); - - // we can, however, use "30 days" instead - we fix those to 24 hours - let duration = DurationString::parse_duration("P30D"); - assert_eq!(Ok(Duration::from_secs(30 * 24 * 3600)), duration); - - // we should not render larger units which are unsupported as inputs - let duration = DurationString::parse_duration("30 days 48 hours").unwrap(); - assert_eq!( - 30.days() - .hours(48) - .to_duration(SpanRelativeTo::days_are_24_hours()) - .unwrap(), - SignedDuration::try_from(duration).unwrap() - ); - let duration_str = DurationString::display(duration); - assert_eq!("32d", duration_str); - - // more complex inputs are also supported - but will be serialized as a more humane output - let duration = DurationString::parse_duration("P30DT10H30M15S"); - assert_eq!( - Ok(Duration::from_secs( - 30 * 24 * 3600 + 10 * 3600 + 30 * 60 + 15 - )), - duration - ); - assert_eq!( - serde_json::from_str::( - &serde_json::to_string(&MyDuration(duration.unwrap())).unwrap() - ) - .unwrap(), - "30d 10h 30m 15s" - ); - } - #[test] fn serialize_friendly() { let friendly_output = serde_json::from_str::( diff --git a/crates/time-util/Cargo.toml b/crates/time-util/Cargo.toml new file mode 100644 index 0000000000..47d8309833 --- /dev/null +++ b/crates/time-util/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "restate-time-util" +version.workspace = true +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +publish = false + +[features] +default = [] + +[dependencies] +restate-workspace-hack = { workspace = true } + +jiff = { workspace = true } +thiserror = { workspace = true } diff --git a/crates/time-util/src/duration.rs b/crates/time-util/src/duration.rs new file mode 100644 index 0000000000..d3cbfdfc19 --- /dev/null +++ b/crates/time-util/src/duration.rs @@ -0,0 +1,441 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::marker::PhantomData; +use std::str::FromStr; +use std::time::Duration; + +use jiff::fmt::{friendly, temporal}; +use jiff::{Span, SpanRelativeTo, SpanRound}; + +static VERBOSE_PRINTER: friendly::SpanPrinter = friendly::SpanPrinter::new() + .designator(friendly::Designator::Verbose) + .spacing(friendly::Spacing::BetweenUnitsAndDesignators); +static COMPACT_PRINTER: friendly::SpanPrinter = + friendly::SpanPrinter::new().designator(friendly::Designator::Compact); + +#[inline] +fn print_inner(span: &Span, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let printer = if f.alternate() { + &VERBOSE_PRINTER + } else { + &COMPACT_PRINTER + }; + + printer + .print_span(span, jiff::fmt::StdFmtWrite(f)) + .map_err(|_| std::fmt::Error) +} + +/// Extension trait for [`std::time::Duration`] that provides abstractions for human-friendly +/// printing. +pub trait DurationExt { + /// zero-cost conversion to [`FriendlyDuration`] + /// + /// [`FriendlyDuration`] provides human-friendly options to formatting a duration. + fn friendly(&self) -> FriendlyDuration; +} + +/// Displays a time span with 'days' as the maximum unit. +pub struct Days; +/// Displays a time span with 'seconds' as the maximum unit. +pub struct Seconds; +/// Displays a time span in 'HH::MM::SS[.fff]' format. +pub struct Hms; +/// Displays a time span in ISO 8601 format. +pub struct Iso8601; + +mod private { + pub trait Sealed {} + impl Sealed for super::Days {} + impl Sealed for super::Seconds {} + impl Sealed for super::Hms {} + impl Sealed for super::Iso8601 {} +} + +/// A sealed trait for the different displayable time-span styles. +pub trait Style: private::Sealed { + fn print_span(span: &Span, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result; +} + +impl Style for Days { + fn print_span(span: &Span, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + print_inner(span, f) + } +} + +impl Style for Seconds { + fn print_span(span: &Span, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + print_inner(span, f) + } +} + +impl Style for Hms { + fn print_span(span: &Span, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + static HMS_PRINTER: friendly::SpanPrinter = + friendly::SpanPrinter::new().hours_minutes_seconds(true); + + HMS_PRINTER + .print_span(span, jiff::fmt::StdFmtWrite(f)) + .map_err(|_| std::fmt::Error) + } +} + +impl Style for Iso8601 { + fn print_span(span: &Span, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + static ISO_PRINTER: temporal::SpanPrinter = temporal::SpanPrinter::new(); + + ISO_PRINTER + .print_span(span, jiff::fmt::StdFmtWrite(f)) + .map_err(|_| std::fmt::Error) + } +} + +impl DurationExt for Duration { + /// panics if the duration is negative or too large to fit into i64 + fn friendly(&self) -> FriendlyDuration { + FriendlyDuration::from(*self) + } +} + +/// A wrapper around [`std::time::Duration`] that provides human-friendly display options. +/// +/// # Display +/// +/// The default display behaves the same as `to_days_span()`. But you can also use the following +/// conversions to customize the display behaviour: +/// +/// - Use [`FriendlyDuration::to_days_span`] to display a duration as a span with its maximum unit +/// set to days. +/// - Use [`FriendlyDuration::to_seconds_span`] to display a duration as a span with its maximum +/// unit set to seconds. +/// - Use [`FriendlyDuration::to_hms_span`] to display a duration as a span that's displayed as +/// `HH:MM:SS`. +/// - Use [`FriendlyDuration::to_iso8601_span`] to display a duration as a span that's displayed as +/// `HH:MM:SS`. +/// +/// # Parsing +/// +/// This uses [`jiff::Span`]'s parsing to support both human-friendly and ISO8601 +/// inputs. Days are the largest supported unit of time, and are interpreted as 24 hours long when +/// converting the parsed span into an actual duration. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +#[repr(transparent)] +pub struct FriendlyDuration(Duration); + +impl PartialEq for FriendlyDuration { + fn eq(&self, other: &Duration) -> bool { + &self.0 == other + } +} + +impl PartialEq for Duration { + fn eq(&self, other: &FriendlyDuration) -> bool { + self == &other.0 + } +} + +impl PartialOrd for FriendlyDuration { + fn partial_cmp(&self, other: &Duration) -> Option { + self.0.partial_cmp(other) + } +} + +impl PartialOrd for Duration { + fn partial_cmp(&self, other: &FriendlyDuration) -> Option { + self.partial_cmp(&other.0) + } +} + +impl From for FriendlyDuration { + fn from(d: Duration) -> FriendlyDuration { + FriendlyDuration(d) + } +} + +impl From for Duration { + fn from(d: FriendlyDuration) -> Duration { + d.0 + } +} + +impl std::fmt::Display for FriendlyDuration { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.to_days_span().print(f) + } +} + +impl FriendlyDuration { + /// Returns a span with its maximum unit set to days. + pub fn to_days_span(&self) -> TimeSpan { + TimeSpan::::new(Span::try_from(self.0).unwrap()) + } + + /// Returns a span with its maximum unit set to seconds. + pub fn to_seconds_span(&self) -> TimeSpan { + TimeSpan::::new(Span::try_from(self.0).unwrap()) + } + + /// Returns a span that's displayed as `HH:MM:SS` + pub fn to_hms_span(&self) -> TimeSpan { + TimeSpan::::new(Span::try_from(self.0).unwrap()) + } + + /// Returns a span that's displayed as `HH:MM:SS` + pub fn to_iso8601_span(&self) -> TimeSpan { + TimeSpan::::new(Span::try_from(self.0).unwrap()) + } + + pub fn as_std(&self) -> &Duration { + &self.0 + } + + pub fn to_std(self) -> Duration { + self.0 + } +} + +#[derive(Debug, Clone, thiserror::Error)] +#[error(transparent)] +pub struct DurationParseError(#[from] jiff::Error); + +impl FromStr for FriendlyDuration { + type Err = DurationParseError; + + fn from_str(s: &str) -> Result { + let span: Span = s.parse()?; + if span.get_years() > 0 || span.get_months() > 0 { + return Err(DurationParseError(jiff::Error::from_args(format_args!( + "Please use units of days or smaller" + )))); + } + + span.to_duration(SpanRelativeTo::days_are_24_hours()) + .and_then(Duration::try_from) + .map(FriendlyDuration) + .map_err(DurationParseError) + } +} + +/// A zero-cost abstraction of a time span with a maximum unit of time. +#[derive(Clone, Copy)] +pub struct TimeSpan(Span, PhantomData); + +impl TimeSpan { + /// Makes this span represent a duration in the opposite direction. + /// By default, the span is positive. + /// + /// When a span is negative, it's display representation will reflect that. For instance, + /// a duration of `5s` will be displayed as `5s ago` when formatted with [`TimeSpan`] + /// or [`TimeSpan`]. + /// + pub fn negated(self) -> TimeSpan { + TimeSpan(self.0.negate(), PhantomData) + } + + #[inline] + pub fn print(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + T::print_span(&self.0, f) + } + + /// Returns `true` if the duration is negative. + pub fn is_negative(&self) -> bool { + self.0.is_negative() + } + + pub fn is_zero(&self) -> bool { + self.0.is_zero() + } + + pub fn days(&self) -> i32 { + self.0.get_days() + } + + pub fn hours(&self) -> i32 { + self.0.get_hours() + } + + pub fn minutes(&self) -> i64 { + self.0.get_minutes() + } + + pub fn seconds(&self) -> i64 { + self.0.get_seconds() + } + + pub fn milliseconds(&self) -> i64 { + self.0.get_milliseconds() + } + + pub fn microseconds(&self) -> i64 { + self.0.get_microseconds() + } + + pub fn nanoseconds(&self) -> i64 { + self.0.get_nanoseconds() + } +} + +impl TimeSpan { + fn new(span: Span) -> TimeSpan { + let span = span + .round( + SpanRound::new() + .largest(jiff::Unit::Second) + .days_are_24_hours(), + ) + .unwrap(); + TimeSpan(span, PhantomData) + } +} + +impl TimeSpan { + fn new(span: Span) -> TimeSpan { + let span = span + .round( + SpanRound::new() + .largest(jiff::Unit::Day) + .days_are_24_hours(), + ) + .unwrap(); + TimeSpan(span, PhantomData) + } +} + +impl TimeSpan { + fn new(span: Span) -> TimeSpan { + let span = span + .round( + SpanRound::new() + .largest(jiff::Unit::Hour) + .days_are_24_hours(), + ) + .unwrap(); + TimeSpan(span, PhantomData) + } +} + +impl TimeSpan { + fn new(span: Span) -> TimeSpan { + let span = span + .round( + SpanRound::new() + .largest(jiff::Unit::Day) + .days_are_24_hours(), + ) + .unwrap(); + TimeSpan(span, PhantomData) + } +} + +impl std::fmt::Display for TimeSpan { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.print(f) + } +} + +impl From> for Duration { + fn from(value: TimeSpan) -> Self { + let inner = if value.0.is_negative() { + value.0.negate() + } else { + value.0 + }; + + inner + .to_duration(SpanRelativeTo::days_are_24_hours()) + .and_then(Duration::try_from) + .expect("duration is positive and fits in std::time::Duration") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn friendly_conversion() { + let friendly = Duration::from_nanos(22).friendly(); + assert_eq!("22ns", friendly.to_days_span().to_string()); + } + + #[test] + fn duration_display() { + let dur = FriendlyDuration::from_str("36h 4m 2s").unwrap(); + assert_eq!(dur, Duration::from_secs(129842)); + assert_eq!("129842s", dur.to_seconds_span().to_string()); + // alternate format is more verbose + assert_eq!("129842 seconds", format!("{:#}", dur.to_seconds_span())); + + // days + assert_eq!("1d 12h 4m 2s", dur.to_days_span().to_string()); + // alternate format is more verbose + assert_eq!( + "1 day 12 hours 4 minutes 2 seconds", + format!("{:#}", dur.to_days_span()) + ); + + // negated + assert_eq!("1d 12h 4m 2s ago", dur.to_days_span().negated().to_string()); + + assert_eq!( + "1 day 12 hours 4 minutes 2 seconds ago", + format!("{:#}", dur.to_days_span().negated()) + ); + } + + #[test] + fn hms_duration() { + let dur = FriendlyDuration::from_str("3h 4m 2s").unwrap(); + assert_eq!(11042, dur.as_std().as_secs()); + assert_eq!("03:04:02", dur.to_hms_span().to_string()); + + // with microseconds + let dur = FriendlyDuration::from_str("1h 24us").unwrap(); + assert_eq!(3600000024, dur.as_std().as_micros()); + assert_eq!("01:00:00.000024", dur.to_hms_span().to_string()); + } + + #[test] + fn iso8601_duration() { + let dur = FriendlyDuration::from_str("3h 4m 2s").unwrap(); + assert_eq!(11042, dur.as_std().as_secs()); + assert_eq!("PT3H4M2S", dur.to_iso8601_span().to_string()); + } + + #[test] + fn parse_friendly_duration_input_formats() { + let duration = FriendlyDuration::from_str("10 min").unwrap(); + assert_eq!(Duration::from_secs(600), duration); + + // we don't support "1 month" as months have variable length + let duration = FriendlyDuration::from_str("P1M"); + assert_eq!( + "Please use units of days or smaller", + duration.unwrap_err().to_string() + ); + + // we can, however, use "30 days" instead - we fix those to 24 hours + let duration = FriendlyDuration::from_str("P30D"); + assert_eq!(Duration::from_secs(30 * 24 * 3600), duration.unwrap()); + + // we should not render larger units which are unsupported as inputs + let duration = FriendlyDuration::from_str("30 days 48 hours").unwrap(); + assert_eq!(Duration::from_secs(30 * 24 * 3600 + 48 * 3600), duration); + assert_eq!("32d", duration.to_days_span().to_string()); + + // more complex inputs are also supported - but will be serialized as a more humane output + let duration = FriendlyDuration::from_str("P30DT10H30M15S"); + assert_eq!( + Duration::from_secs(30 * 24 * 3600 + 10 * 3600 + 30 * 60 + 15), + duration.unwrap() + ); + } +} diff --git a/crates/time-util/src/lib.rs b/crates/time-util/src/lib.rs new file mode 100644 index 0000000000..26f6328775 --- /dev/null +++ b/crates/time-util/src/lib.rs @@ -0,0 +1,13 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod duration; + +pub use duration::*; From 71958cd437118b8d2db79c7b351482750bee5f0d Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Thu, 4 Sep 2025 17:35:39 +0100 Subject: [PATCH 2/3] [minor] Reduce allocations when decoding InvocationUuid --- crates/partition-store/src/keys.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/partition-store/src/keys.rs b/crates/partition-store/src/keys.rs index 460438cb5c..f92f955c09 100644 --- a/crates/partition-store/src/keys.rs +++ b/crates/partition-store/src/keys.rs @@ -468,8 +468,11 @@ impl KeyCodec for InvocationUuid { if source.remaining() < InvocationUuid::RAW_BYTES_LEN { return Err(StorageError::DataIntegrityError); } - let bytes = source.copy_to_bytes(InvocationUuid::RAW_BYTES_LEN); - InvocationUuid::from_slice(&bytes).map_err(|err| StorageError::Generic(err.into())) + let mut buf = [0u8; InvocationUuid::RAW_BYTES_LEN]; + + debug_assert!(source.remaining() >= InvocationUuid::RAW_BYTES_LEN); + source.copy_to_slice(&mut buf); + Ok(InvocationUuid::from_bytes(buf)) } fn serialized_length(&self) -> usize { From dbfeebc3846fccbf20eacca11152500b71c00bbe Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Thu, 4 Sep 2025 17:35:18 +0100 Subject: [PATCH 3/3] WIP, a full non-working implementation for delete-range-cf, blocked on WBWI --- Cargo.lock | 6 +- Cargo.toml | 4 +- .../src/invocation_status_table/mod.rs | 2 +- .../partition-store/src/journal_table/mod.rs | 38 ++++----- .../src/journal_table_v2/mod.rs | 82 +++++-------------- crates/partition-store/src/keys.rs | 60 +++++++++++++- .../partition-store/src/outbox_table/mod.rs | 21 +++-- crates/partition-store/src/partition_store.rs | 30 +++++++ .../partition-store/src/promise_table/mod.rs | 26 ++---- crates/partition-store/src/state_table/mod.rs | 18 ++-- .../src/tests/journal_table_test/mod.rs | 2 +- .../src/tests/journal_table_v2_test/mod.rs | 8 +- crates/storage-api/src/journal_table/mod.rs | 1 - .../storage-api/src/journal_table_v2/mod.rs | 1 - .../lifecycle/migrate_journal_table.rs | 2 +- .../worker/src/partition/state_machine/mod.rs | 12 +-- 16 files changed, 171 insertions(+), 142 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2fc06f98c4..a7d35c91d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8166,7 +8166,7 @@ dependencies = [ [[package]] name = "rust-librocksdb-sys" version = "0.39.0+10.5.1" -source = "git+https://github.com/restatedev/rust-rocksdb?rev=8873ba40ae687862dbe177e645e7b03c5ce025e3#8873ba40ae687862dbe177e645e7b03c5ce025e3" +source = "git+https://github.com/restatedev/rust-rocksdb?rev=041bd0dee97c913dc0df2e1e78d5f5715b58e9cf#041bd0dee97c913dc0df2e1e78d5f5715b58e9cf" dependencies = [ "bindgen 0.72.0", "bzip2-sys", @@ -8181,8 +8181,8 @@ dependencies = [ [[package]] name = "rust-rocksdb" -version = "0.43.0" -source = "git+https://github.com/restatedev/rust-rocksdb?rev=8873ba40ae687862dbe177e645e7b03c5ce025e3#8873ba40ae687862dbe177e645e7b03c5ce025e3" +version = "0.43.1" +source = "git+https://github.com/restatedev/rust-rocksdb?rev=041bd0dee97c913dc0df2e1e78d5f5715b58e9cf#041bd0dee97c913dc0df2e1e78d5f5715b58e9cf" dependencies = [ "libc", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index b62612a138..2d5182dae9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -195,10 +195,10 @@ reqwest = { version = "0.12.5", default-features = false, features = [ "stream", ] } rlimit = { version = "0.10.1" } -rocksdb = { version = "0.43.0", package = "rust-rocksdb", features = [ +rocksdb = { version = "0.43.1", package = "rust-rocksdb", features = [ "multi-threaded-cf", "jemalloc", -], git = "https://github.com/restatedev/rust-rocksdb", rev = "8873ba40ae687862dbe177e645e7b03c5ce025e3" } +], git = "https://github.com/restatedev/rust-rocksdb", rev = "041bd0dee97c913dc0df2e1e78d5f5715b58e9cf" } rstest = "0.24.0" rustls = { version = "0.23.26", default-features = false, features = ["ring"] } schemars = { version = "0.8", features = ["bytes", "enumset"] } diff --git a/crates/partition-store/src/invocation_status_table/mod.rs b/crates/partition-store/src/invocation_status_table/mod.rs index a0b40c1907..fef43027f1 100644 --- a/crates/partition-store/src/invocation_status_table/mod.rs +++ b/crates/partition-store/src/invocation_status_table/mod.rs @@ -74,7 +74,7 @@ fn put_invocation_status( status: &InvocationStatus, ) -> Result<()> { match status { - InvocationStatus::Free => storage.delete_key(&create_invocation_status_key(invocation_id)), + InvocationStatus::Free => delete_invocation_status(storage, invocation_id), _ => storage.put_kv(create_invocation_status_key(invocation_id), status), } } diff --git a/crates/partition-store/src/journal_table/mod.rs b/crates/partition-store/src/journal_table/mod.rs index ee1ca3ea1d..c059d6c011 100644 --- a/crates/partition-store/src/journal_table/mod.rs +++ b/crates/partition-store/src/journal_table/mod.rs @@ -38,7 +38,8 @@ define_table_key!( partition_key: PartitionKey, invocation_uuid: InvocationUuid, journal_index: u32 - ) + ), + invocation_prefix = [partition_key, invocation_uuid], ); fn write_journal_entry_key(invocation_id: &InvocationId, journal_index: u32) -> JournalKey { @@ -101,18 +102,20 @@ fn get_journal( ) } -fn delete_journal( - storage: &mut S, - invocation_id: &InvocationId, - journal_length: EntryIndex, -) -> Result<()> { - let mut key = write_journal_entry_key(invocation_id, 0); - let k = &mut key; - for journal_index in 0..journal_length { - k.journal_index = Some(journal_index); - storage.delete_key(k)?; - } - Ok(()) +fn delete_journal(storage: &mut S, invocation_id: &InvocationId) -> Result<()> { + let prefix = JournalKey::default() + .partition_key(invocation_id.partition_key()) + .invocation_uuid(invocation_id.invocation_uuid()) + .invocation_prefix(); + + storage.delete_prefix(Journal, prefix) + // let mut key = write_journal_entry_key(invocation_id, 0); + // let k = &mut key; + // for journal_index in 0..journal_length { + // k.journal_index = Some(journal_index); + // storage.delete_key(k)?; + // } + // Ok(()) } impl ReadOnlyJournalTable for PartitionStore { @@ -207,14 +210,9 @@ impl JournalTable for PartitionStoreTransaction<'_> { put_journal_entry(self, invocation_id, journal_index, journal_entry) } - async fn delete_journal( - &mut self, - invocation_id: &InvocationId, - journal_length: EntryIndex, - ) -> Result<()> { + async fn delete_journal(&mut self, invocation_id: &InvocationId) -> Result<()> { self.assert_partition_key(invocation_id)?; - let _x = RocksDbPerfGuard::new("delete-journal"); - delete_journal(self, invocation_id, journal_length) + delete_journal(self, invocation_id) } } diff --git a/crates/partition-store/src/journal_table_v2/mod.rs b/crates/partition-store/src/journal_table_v2/mod.rs index 613d588d22..c0f3bd8690 100644 --- a/crates/partition-store/src/journal_table_v2/mod.rs +++ b/crates/partition-store/src/journal_table_v2/mod.rs @@ -43,7 +43,8 @@ define_table_key!( partition_key: PartitionKey, invocation_uuid: InvocationUuid, journal_index: u32 - ) + ), + invocation_prefix = [partition_key, invocation_uuid], ); define_table_key!( @@ -53,7 +54,8 @@ define_table_key!( partition_key: PartitionKey, invocation_uuid: InvocationUuid, completion_id: CompletionId - ) + ), + invocation_prefix = [partition_key, invocation_uuid], ); define_table_key!( @@ -63,7 +65,8 @@ define_table_key!( partition_key: PartitionKey, invocation_uuid: InvocationUuid, notification_id: NotificationId - ) + ), + invocation_prefix = [partition_key, invocation_uuid], ); fn write_journal_entry_key(invocation_id: &InvocationId, journal_index: u32) -> JournalKey { @@ -150,66 +153,28 @@ fn get_journal( ) } -fn delete_journal( - storage: &mut S, - invocation_id: &InvocationId, - journal_length: EntryIndex, -) -> Result<()> { - let _x = RocksDbPerfGuard::new("delete-journal"); +fn delete_journal(storage: &mut S, invocation_id: &InvocationId) -> Result<()> { + let prefix = JournalKey::default() + .partition_key(invocation_id.partition_key()) + .invocation_uuid(invocation_id.invocation_uuid()) + .invocation_prefix(); - let mut key = write_journal_entry_key(invocation_id, 0); - let k = &mut key; - for journal_index in 0..journal_length { - k.journal_index = Some(journal_index); - storage.delete_key(k)?; - } + storage.delete_prefix(Journal, prefix)?; // Delete the indexes let notification_id_to_notification_index = JournalNotificationIdToNotificationIndexKey::default() .partition_key(invocation_id.partition_key()) - .invocation_uuid(invocation_id.invocation_uuid()); - let notification_id_index = OwnedIterator::new(storage.iterator_from( - TableScan::SinglePartitionKeyPrefix( - invocation_id.partition_key(), - notification_id_to_notification_index.clone(), - ), - )?) - .map(|(mut key, _)| { - let journal_key = JournalNotificationIdToNotificationIndexKey::deserialize_from(&mut key)?; - let (_, _, notification_id) = journal_key.into_inner_ok_or()?; - Ok(notification_id) - }) - .collect::>>()?; - for notification_id in notification_id_index { - storage.delete_key( - ¬ification_id_to_notification_index - .clone() - .notification_id(notification_id), - )?; - } + .invocation_uuid(invocation_id.invocation_uuid()) + .invocation_prefix(); + storage.delete_prefix(Journal, notification_id_to_notification_index)?; let completion_id_to_command_index = JournalCompletionIdToCommandIndexKey::default() .partition_key(invocation_id.partition_key()) - .invocation_uuid(invocation_id.invocation_uuid()); - let notification_id_index = - OwnedIterator::new(storage.iterator_from(TableScan::SinglePartitionKeyPrefix( - invocation_id.partition_key(), - notification_id_to_notification_index.clone(), - ))?) - .map(|(mut key, _)| { - let journal_key = JournalCompletionIdToCommandIndexKey::deserialize_from(&mut key)?; - let (_, _, completion_id) = journal_key.into_inner_ok_or()?; - Ok(completion_id) - }) - .collect::>>()?; - for notification_id in notification_id_index { - storage.delete_key( - &completion_id_to_command_index - .clone() - .completion_id(notification_id), - )?; - } + .invocation_uuid(invocation_id.invocation_uuid()) + .invocation_prefix(); + + storage.delete_prefix(Journal, completion_id_to_command_index)?; Ok(()) } @@ -406,14 +371,9 @@ impl JournalTable for PartitionStoreTransaction<'_> { put_journal_entry(self, &invocation_id, index, entry, related_completion_ids) } - async fn delete_journal( - &mut self, - invocation_id: InvocationId, - journal_length: EntryIndex, - ) -> Result<()> { + async fn delete_journal(&mut self, invocation_id: InvocationId) -> Result<()> { self.assert_partition_key(&invocation_id)?; - let _x = RocksDbPerfGuard::new("delete-journal"); - delete_journal(self, &invocation_id, journal_length) + delete_journal(self, &invocation_id) } } diff --git a/crates/partition-store/src/keys.rs b/crates/partition-store/src/keys.rs index f92f955c09..29258b4033 100644 --- a/crates/partition-store/src/keys.rs +++ b/crates/partition-store/src/keys.rs @@ -44,6 +44,41 @@ pub enum KeyKind { Promise, } +/// A prefix of a table key that can be used to traverse the table +pub struct KeyPrefix(pub(crate) Vec); + +impl AsRef<[u8]> for KeyPrefix { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl KeyPrefix { + /// Returns the exclusive upper bound of this key prefix in its shortest form. + /// + /// Use this to compute the upper bound of an iterator over keys with this prefix + /// or to delete the range of keys that share this prefix from partition store. + pub fn next_prefix(&self) -> Self { + let mut out = self.0.clone(); + let len = Self::next_prefix_into(&self.0, &mut out); + out.truncate(len); + Self(out) + } + + #[inline] + fn next_prefix_into(prefix: &[u8], out: &mut [u8]) -> usize { + debug_assert!(out.len() >= prefix.len()); + out[..prefix.len()].copy_from_slice(prefix); + for i in (0..prefix.len()).rev() { + if out[i] != 0xFF { + out[i] = out[i].wrapping_add(1); + return i + 1; // logical truncation point + } + } + unreachable!("Key prefix end overflowed, start prefix {:x?}", prefix); + } +} + impl KeyKind { pub const SERIALIZED_LENGTH: usize = 2; @@ -242,7 +277,13 @@ pub trait TableKey: Sized + std::fmt::Debug + Send + 'static { /// macro_rules! define_table_key { - ($table_kind:expr, $key_kind:path, $key_name:ident ( $($element: ident: $ty: ty),+ $(,)? ) ) => (paste::paste! { + ( + $table_kind:expr, + $key_kind:path, + $key_name:ident ( + $($element: ident: $ty: ty),+ $(,)? + )$(,$($prefix_name: ident = [$($field: ident),+]),* $(,)?)? + ) => (paste::paste! { // main key holder #[derive(Default, Debug, Eq, PartialEq, Clone)] pub struct $key_name { $(pub $element: Option<$ty>),+ } @@ -260,6 +301,23 @@ macro_rules! define_table_key { self.$element.as_ref().ok_or_else(|| restate_storage_api::StorageError::DataIntegrityError) })+ + $( + $(pub fn [< $prefix_name>](&self) -> crate::keys::KeyPrefix { + // we always need space for the key kind + let mut serialized_length = $crate::keys::KeyKind::SERIALIZED_LENGTH; + $( + serialized_length += $crate::keys::KeyCodec::serialized_length(&self.$field); + )+ + let mut buf: Vec = Vec::with_capacity(serialized_length); + Self::serialize_key_kind(&mut buf); + $( + $crate::keys::serialize(&self.$field, &mut buf); + )+ + return crate::keys::KeyPrefix(buf); + + })* + )? + pub fn into_inner(self) -> ($(Option<$ty>,)+) { return ( $(self.$element,)+ ) } diff --git a/crates/partition-store/src/outbox_table/mod.rs b/crates/partition-store/src/outbox_table/mod.rs index 23b20102ff..e2084c8442 100644 --- a/crates/partition-store/src/outbox_table/mod.rs +++ b/crates/partition-store/src/outbox_table/mod.rs @@ -26,7 +26,8 @@ use crate::{ define_table_key!( Outbox, KeyKind::Outbox, - OutboxKey(partition_id: PaddedPartitionId, message_index: u64) + OutboxKey(partition_id: PaddedPartitionId, message_index: u64), + outbox_prefix = [partition_id, message_index], ); fn add_message( @@ -112,12 +113,18 @@ fn truncate_outbox( range: RangeInclusive, ) -> Result<()> { let _x = RocksDbPerfGuard::new("truncate-outbox"); - let mut key = OutboxKey::default().partition_id(partition_id.into()); - for seq in range { - key.message_index = Some(seq); - storage.delete_key(&key)?; - } - Ok(()) + let from = OutboxKey::default() + .partition_id(partition_id.into()) + .message_index(*range.start()) + .outbox_prefix(); + + let to = OutboxKey::default() + .partition_id(partition_id.into()) + .message_index(*range.end()) + .outbox_prefix() + // we want the exclusive upper bound + .next_prefix(); + storage.delete_range_cf(Outbox, from, to) } impl ReadOnlyOutboxTable for PartitionStore { diff --git a/crates/partition-store/src/partition_store.rs b/crates/partition-store/src/partition_store.rs index ad32119668..03709aa85f 100644 --- a/crates/partition-store/src/partition_store.rs +++ b/crates/partition-store/src/partition_store.rs @@ -44,6 +44,7 @@ use restate_types::storage::StorageCodec; use crate::durable_lsn_tracking::AppliedLsnCollectorFactory; use crate::fsm_table::{get_locally_durable_lsn, get_schema_version, put_schema_version}; use crate::keys::KeyKind; +use crate::keys::KeyPrefix; use crate::keys::TableKey; use crate::migrations::{LATEST_VERSION, SchemaVersion}; use crate::partition_db::PartitionDb; @@ -849,6 +850,19 @@ impl StorageAccess for PartitionStore { .delete_cf(table, key) .map_err(|error| StorageError::Generic(error.into())) } + + #[inline] + fn delete_range_cf>(&mut self, table: TableKind, from: K, to: K) -> Result<()> { + let table = self.table_handle(table); + let opts = rocksdb::WriteOptions::default(); + println!("partition store delete"); + self.db + .rocksdb() + .inner() + .as_raw_db() + .delete_range_cf_opt(table, from, to, &opts) + .map_err(|error| StorageError::Generic(error.into())) + } } #[derive(Debug, Copy, Clone, Eq, PartialEq)] @@ -1087,6 +1101,13 @@ impl StorageAccess for PartitionStoreTransaction<'_> { .delete_cf(self.data_cf_handle, key); Ok(()) } + + #[inline] + fn delete_range_cf>(&mut self, _table: TableKind, from: K, to: K) -> Result<()> { + self.write_batch_with_index + .delete_range_cf(self.data_cf_handle, from, to); + Ok(()) + } } pub(crate) trait StorageAccess { @@ -1124,6 +1145,8 @@ pub(crate) trait StorageAccess { fn delete_cf(&mut self, table: TableKind, key: impl AsRef<[u8]>) -> Result<()>; + fn delete_range_cf>(&mut self, table: TableKind, from: K, to: K) -> Result<()>; + #[inline] fn put_kv_raw>(&mut self, key: K, value: V) -> Result<()> { let key_buffer = self.cleared_key_buffer_mut(key.serialized_length()); @@ -1163,6 +1186,13 @@ pub(crate) trait StorageAccess { self.delete_cf(K::TABLE, buffer) } + #[inline] + fn delete_prefix(&mut self, table: TableKind, prefix: KeyPrefix) -> Result<()> { + let to = prefix.next_prefix(); + println!("delete_range_cf {:?} {:?}", prefix.0, to.0); + self.delete_range_cf(table, prefix, to) + } + #[inline] fn get_value(&mut self, key: K) -> Result> where diff --git a/crates/partition-store/src/promise_table/mod.rs b/crates/partition-store/src/promise_table/mod.rs index b31f9c8088..0638b29d2b 100644 --- a/crates/partition-store/src/promise_table/mod.rs +++ b/crates/partition-store/src/promise_table/mod.rs @@ -23,10 +23,7 @@ use restate_types::identifiers::{PartitionKey, ServiceId, WithPartitionKey}; use crate::keys::{KeyKind, TableKey, define_table_key}; use crate::scan::TableScan; -use crate::{ - PartitionStore, PartitionStoreTransaction, StorageAccess, TableKind, - TableScanIterationDecision, break_on_err, -}; +use crate::{PartitionStore, PartitionStoreTransaction, StorageAccess, TableKind, break_on_err}; define_table_key!( TableKind::Promise, @@ -36,7 +33,8 @@ define_table_key!( service_name: ByteString, service_key: Bytes, key: ByteString - ) + ), + service_key_prefix = [partition_key, service_name, service_key], ); fn create_key(service_id: &ServiceId, key: &ByteString) -> PromiseKey { @@ -53,7 +51,8 @@ fn get_promise( key: &ByteString, ) -> Result> { let _x = RocksDbPerfGuard::new("get-promise"); - storage.get_value(create_key(service_id, key)) + let key = create_key(service_id, key); + storage.get_value(key) } fn put_promise( @@ -69,18 +68,9 @@ fn delete_all_promises(storage: &mut S, service_id: &ServiceId let prefix_key = PromiseKey::default() .partition_key(service_id.partition_key()) .service_name(service_id.service_name.clone()) - .service_key(service_id.key.as_bytes().clone()); - - let keys = storage.for_each_key_value_in_place( - TableScan::SinglePartitionKeyPrefix(service_id.partition_key(), prefix_key), - |k, _| TableScanIterationDecision::Emit(Ok(Bytes::copy_from_slice(k))), - )?; - - for k in keys { - let key = k?; - storage.delete_cf(TableKind::Promise, key)?; - } - Ok(()) + .service_key(service_id.key.as_bytes().clone()) + .service_key_prefix(); + storage.delete_prefix(TableKind::Promise, prefix_key) } impl ReadOnlyPromiseTable for PartitionStore { diff --git a/crates/partition-store/src/state_table/mod.rs b/crates/partition-store/src/state_table/mod.rs index 55dca481b1..9c1ef5baac 100644 --- a/crates/partition-store/src/state_table/mod.rs +++ b/crates/partition-store/src/state_table/mod.rs @@ -33,7 +33,8 @@ define_table_key!( service_name: ByteString, service_key: ByteString, state_key: Bytes - ) + ), + service_key_prefix = [partition_key, service_name, service_key], ); #[inline] @@ -46,6 +47,9 @@ fn write_state_entry_key(service_id: &ServiceId, state_key: impl AsRef<[u8]>) -> } fn user_state_key_from_slice(key: &[u8]) -> Result { + // note that this is to let a single allocation to be done which will later be decomposed into + // a few ByteStrings that indexes that allocation. This is to avoid smaller allocations on + // every portion of the key. let mut key = Bytes::copy_from_slice(key); let key = StateKey::deserialize_from(&mut key)?; let key = key @@ -80,17 +84,7 @@ fn delete_all_user_state(storage: &mut S, service_id: &Service .service_name(service_id.service_name.clone()) .service_key(service_id.key.clone()); - let keys = storage.for_each_key_value_in_place( - TableScan::SinglePartitionKeyPrefix(service_id.partition_key(), prefix_key), - |k, _| TableScanIterationDecision::Emit(Ok(Bytes::copy_from_slice(k))), - )?; - - for k in keys { - let key = k?; - storage.delete_cf(State, &key)?; - } - - Ok(()) + storage.delete_prefix(State, prefix_key.service_key_prefix()) } fn get_user_state( diff --git a/crates/partition-store/src/tests/journal_table_test/mod.rs b/crates/partition-store/src/tests/journal_table_test/mod.rs index fa612a905c..6782cb1c83 100644 --- a/crates/partition-store/src/tests/journal_table_test/mod.rs +++ b/crates/partition-store/src/tests/journal_table_test/mod.rs @@ -122,7 +122,7 @@ async fn point_lookups(txn: &mut T) { } async fn delete_journal(txn: &mut T) { - txn.delete_journal(&MOCK_INVOCATION_ID_1, 5).await.unwrap(); + txn.delete_journal(&MOCK_INVOCATION_ID_1).await.unwrap(); } async fn verify_journal_deleted(txn: &mut T) { diff --git a/crates/partition-store/src/tests/journal_table_v2_test/mod.rs b/crates/partition-store/src/tests/journal_table_v2_test/mod.rs index 22bc1d82ca..beefdb8c01 100644 --- a/crates/partition-store/src/tests/journal_table_v2_test/mod.rs +++ b/crates/partition-store/src/tests/journal_table_v2_test/mod.rs @@ -190,10 +190,8 @@ async fn sleep_point_lookups(txn: &mut T) { assert!(result.is_none()); } -async fn delete_journal(txn: &mut T, length: usize) { - txn.delete_journal(MOCK_INVOCATION_ID_1, length as u32) - .await - .unwrap(); +async fn delete_journal(txn: &mut T) { + txn.delete_journal(MOCK_INVOCATION_ID_1).await.unwrap(); } async fn verify_journal_deleted(txn: &mut T, length: usize) { @@ -234,7 +232,7 @@ async fn test_sleep_journal() { check_sleep_notification_index(&mut txn).await; sleep_point_lookups(&mut txn).await; - delete_journal(&mut txn, 10).await; + delete_journal(&mut txn).await; txn.commit().await.expect("should not fail"); diff --git a/crates/storage-api/src/journal_table/mod.rs b/crates/storage-api/src/journal_table/mod.rs index c137cd8bce..29f52fafec 100644 --- a/crates/storage-api/src/journal_table/mod.rs +++ b/crates/storage-api/src/journal_table/mod.rs @@ -89,6 +89,5 @@ pub trait JournalTable: ReadOnlyJournalTable { fn delete_journal( &mut self, invocation_id: &InvocationId, - journal_length: EntryIndex, ) -> impl Future> + Send; } diff --git a/crates/storage-api/src/journal_table_v2/mod.rs b/crates/storage-api/src/journal_table_v2/mod.rs index 5f2a9b583c..0ffb7146aa 100644 --- a/crates/storage-api/src/journal_table_v2/mod.rs +++ b/crates/storage-api/src/journal_table_v2/mod.rs @@ -74,7 +74,6 @@ pub trait JournalTable: ReadOnlyJournalTable { fn delete_journal( &mut self, invocation_id: InvocationId, - length: EntryIndex, ) -> impl Future> + Send; } diff --git a/crates/worker/src/partition/state_machine/lifecycle/migrate_journal_table.rs b/crates/worker/src/partition/state_machine/lifecycle/migrate_journal_table.rs index f8a5afbc5a..b9b3274103 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/migrate_journal_table.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/migrate_journal_table.rs @@ -70,7 +70,7 @@ where &[], ) .await?; - journal_table_v1::JournalTable::delete_journal(ctx.storage, &self.invocation_id, 1) + journal_table_v1::JournalTable::delete_journal(ctx.storage, &self.invocation_id) .await .map_err(Error::Storage)?; diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 962553c838..f66a81a46d 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -4043,15 +4043,11 @@ impl StateMachineApplyContext<'_, S> { ); if should_remove_journal_table_v2 { - journal_table_v2::JournalTable::delete_journal( - self.storage, - invocation_id, - journal_length, - ) - .await - .map_err(Error::Storage)? + journal_table_v2::JournalTable::delete_journal(self.storage, invocation_id) + .await + .map_err(Error::Storage)? } else { - JournalTable::delete_journal(self.storage, &invocation_id, journal_length) + JournalTable::delete_journal(self.storage, &invocation_id) .await .map_err(Error::Storage)?; }