diff --git a/.spelling b/.spelling
index 82bf70418..0a4bc784e 100644
--- a/.spelling
+++ b/.spelling
@@ -358,6 +358,7 @@ mockable
mockall
modularity
moka
+moka's
monomorphization
monomorphize
monomorphized
diff --git a/Cargo.lock b/Cargo.lock
index c1258435f..347ab4e70 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -509,6 +509,7 @@ dependencies = [
"cachet_service",
"cachet_tier",
"criterion",
+ "dashmap",
"dynosaur",
"futures",
"layered",
@@ -522,7 +523,6 @@ dependencies = [
"seatbelt",
"serde",
"testing_aids",
- "thread_aware",
"tick",
"tokio",
"tracing",
diff --git a/crates/cachet/Cargo.toml b/crates/cachet/Cargo.toml
index c5cd5210d..da626a34b 100644
--- a/crates/cachet/Cargo.toml
+++ b/crates/cachet/Cargo.toml
@@ -56,7 +56,6 @@ parking_lot = { workspace = true }
pin-project-lite = { workspace = true }
postcard = { workspace = true, optional = true }
serde = { workspace = true, optional = true, features = ["derive"] }
-thread_aware = { workspace = true }
tick = { workspace = true, features = [] }
tracing = { workspace = true, optional = true }
uniflight = { workspace = true }
@@ -67,6 +66,7 @@ bytesbuf = { path = "../bytesbuf" }
cachet_memory = { path = "../cachet_memory" }
cachet_tier = { path = "../cachet_tier", features = ["test-util"] }
criterion = { workspace = true }
+dashmap = { workspace = true }
dynosaur = { workspace = true }
opentelemetry = { workspace = true, features = [
"metrics",
@@ -81,7 +81,7 @@ testing_aids = { path = "../testing_aids" }
tick = { path = "../tick", features = ["test-util", "tokio"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tracing = { workspace = true, features = ["std"] }
-tracing-subscriber = { workspace = true }
+tracing-subscriber = { workspace = true, features = ["fmt", "registry"] }
tracing-test = { workspace = true, features = ["no-env-filter"] }
[[bench]]
@@ -159,6 +159,10 @@ required-features = ["memory", "test-util"]
name = "telemetry_subscriber"
required-features = ["memory", "logs"]
+[[example]]
+name = "telemetry_accumulator"
+required-features = ["memory", "logs"]
+
[[example]]
name = "serialization"
required-features = ["memory", "serialize"]
diff --git a/crates/cachet/README.md b/crates/cachet/README.md
index 9931e8069..e81dfc1d2 100644
--- a/crates/cachet/README.md
+++ b/crates/cachet/README.md
@@ -14,7 +14,7 @@
A composable, multi-tier caching library with stampede protection, background
-refresh, and built-in OpenTelemetry telemetry.
+refresh, and structured telemetry.
## Why Multi-Tier Caching?
@@ -89,7 +89,7 @@ builds on top of them and adds:
|Stampede protection|❌|✅|
|Background refresh|❌|✅|
|Service middleware integration|❌|✅|
-|Structured telemetry (tracing)|❌|✅|
+|Structured telemetry|❌|✅|
|Pluggable storage backends|❌|✅|
|Clock injection for testing|❌|✅|
@@ -228,12 +228,16 @@ cache.insert("key".to_string(), "value".to_string()).await?;
## Telemetry
-Enable with the `logs` feature and `.enable_logs()` on the cache builder.
+Cachet provides two complementary telemetry channels:
+
+### Tracing events
-Each cache operation emits a structured [`tracing`][__link20] event with fields
-`cache.name`, `cache.event`, and `cache.duration_ns`.
+Enable with the `logs` feature and `.enable_logs()` on the cache builder.
+Each tier outcome and operation completion emits a structured [`tracing`][__link20] event.
-### Subscribing to events
+**Tier events** carry `cache.name`, `cache.event`, and `cache.duration_ns`.
+**Operation-complete events** carry `cache.name`, `cache.operation`,
+`cache.duration_ns`, and `cache.coalesced`.
Use [`telemetry::attributes`][__link21] constants to filter and match events in a
custom `tracing_subscriber::Layer`:
@@ -251,21 +255,32 @@ if event_value == attributes::EVENT_HIT { /* cache hit */ }
See the `telemetry_subscriber` example for a complete demonstration.
-### Event types
+#### Event types
|Level|Events|
|-----|------|
|ERROR|`cache.get_error`, `cache.insert_error`, `cache.invalidate_error`, `cache.clear_error`|
-|INFO|`cache.expired`, `cache.refresh_miss`, `cache.inserted`, `cache.insert_rejected`, `cache.invalidated`, `cache.fallback`, `cache.eviction`|
+|INFO|`cache.expired`, `cache.refresh_miss`, `cache.inserted`, `cache.insert_rejected`, `cache.invalidated`, `cache.eviction`|
|DEBUG|`cache.hit`, `cache.miss`, `cache.refresh_hit`, `cache.cleared`|
+### Event handler callback API
+
+Register a [`CacheEventHandler`][__link22] via
+`.event_handler(handler)` on the cache builder to receive typed
+[`CacheTierEvent`][__link23] and
+[`CacheOperationEvent`][__link24] callbacks.
+Events carry a `request_id` for correlating tier outcomes with their parent
+operation. Works independently of the `logs` feature.
+
+See the `telemetry_accumulator` example for a DashMap-based accumulation pattern.
+
This crate was developed as part of The Oxidizer Project. Browse this crate's source code.
- [__cargo_doc2readme_dependencies_info]: ggGmYW0CYXZlMC43LjJhdIQbLiTyV0MU86EbZU15e0PmecoboQ9jo59bnAEbyDXw04U13GlhYvRhcoQbg_hDqE88LP4bMh0J5Y4y4Osb0zDJ1kwqOsoblCGrm49Rx2thZIiCaGJ5dGVzYnVmZTAuNS4zgmZjYWNoZXRlMC42LjSCbWNhY2hldF9tZW1vcnllMC4zLjOCbmNhY2hldF9zZXJ2aWNlZTAuMi4zgmtjYWNoZXRfdGllcmUwLjIuMoJkdGlja2UwLjMuM4JndHJhY2luZ2YwLjEuNDSCaXVuaWZsaWdodGUwLjIuMw
+ [__cargo_doc2readme_dependencies_info]: ggGkYW0CYXSEGy4k8ldDFPOhG2VNeXtD5nnKG6EPY6OfW5wBG8g18NOFNdxpYXKEG_8ZSA792uloG6CGM3YZObWMG5vDWjb2V8K3G4SF7NHmnsnBYWSIgmhieXRlc2J1ZmUwLjUuM4JmY2FjaGV0ZTAuNi40gm1jYWNoZXRfbWVtb3J5ZTAuMy4zgm5jYWNoZXRfc2VydmljZWUwLjIuM4JrY2FjaGV0X3RpZXJlMC4yLjKCZHRpY2tlMC4zLjOCZ3RyYWNpbmdmMC4xLjQ0gml1bmlmbGlnaHRlMC4yLjM
[__link0]: https://docs.rs/cachet/0.6.4/cachet/?search=TimeToRefresh
[__link1]: https://crates.io/crates/uniflight/0.2.3
[__link10]: https://docs.rs/cachet_tier/0.2.2/cachet_tier/?search=CacheTier
@@ -281,6 +296,9 @@ This crate was developed as part of The Oxidizer Project. Br
[__link2]: https://docs.rs/cachet/0.6.4/cachet/?search=CacheBuilder::stampede_protection
[__link20]: https://crates.io/crates/tracing/0.1.44
[__link21]: https://docs.rs/cachet/0.6.4/cachet/?search=telemetry::attributes
+ [__link22]: https://docs.rs/cachet/0.6.4/cachet/?search=telemetry::handler::CacheEventHandler
+ [__link23]: https://docs.rs/cachet/0.6.4/cachet/?search=telemetry::handler::CacheTierEvent
+ [__link24]: https://docs.rs/cachet/0.6.4/cachet/?search=telemetry::handler::CacheOperationEvent
[__link3]: https://docs.rs/cachet_tier/0.2.2/cachet_tier/?search=CacheTier
[__link4]: https://docs.rs/cachet_tier/0.2.2/cachet_tier/?search=DynamicCache
[__link5]: https://docs.rs/cachet/0.6.4/cachet/?search=InsertPolicy
diff --git a/crates/cachet/benches/operations.rs b/crates/cachet/benches/operations.rs
index c3598580f..a4db39446 100644
--- a/crates/cachet/benches/operations.rs
+++ b/crates/cachet/benches/operations.rs
@@ -15,6 +15,7 @@ use cachet_tier::MockCache;
use criterion::{Criterion, criterion_group, criterion_main};
use tick::Clock;
use tokio::runtime::Runtime;
+use tracing_subscriber::layer::SubscriberExt;
fn rt() -> Runtime {
Runtime::new().expect("failed to create runtime")
@@ -87,6 +88,7 @@ fn bench_cache_operations(c: &mut Criterion) {
// Wrapper Overhead (direct vs wrapped vs features)
// =============================================================================
+#[expect(clippy::too_many_lines, reason = "benchmark function with multiple related groups")]
fn bench_wrapper_overhead(c: &mut Criterion) {
let rt = rt();
let mut group = c.benchmark_group("wrapper_overhead");
@@ -148,6 +150,31 @@ fn bench_wrapper_overhead(c: &mut Criterion) {
});
});
+ // With telemetry + active subscriber (measures event formatting/dispatch overhead)
+ group.bench_function("with_telemetry_subscriber", |b| {
+ let subscriber = tracing_subscriber::registry().with(tracing_subscriber::fmt::layer().with_writer(std::io::sink).with_ansi(false));
+ let _guard = tracing::subscriber::set_default(subscriber);
+
+ let cache = rt.block_on(async {
+ let clock = Clock::new_tokio();
+ Cache::builder(clock)
+ .storage(MockCache::::new())
+ .enable_logs()
+ .build()
+ });
+ let key = "key".to_string();
+
+ b.iter_custom(|iters| {
+ rt.block_on(async {
+ let start = Instant::now();
+ for _ in 0..iters {
+ let _ = black_box(cache.get(black_box(&key)).await);
+ }
+ start.elapsed()
+ })
+ });
+ });
+
// With fallback tier
group.bench_function("with_fallback", |b| {
let cache = rt.block_on(async {
diff --git a/crates/cachet/examples/telemetry_accumulator.rs b/crates/cachet/examples/telemetry_accumulator.rs
new file mode 100644
index 000000000..45b4e81e4
--- /dev/null
+++ b/crates/cachet/examples/telemetry_accumulator.rs
@@ -0,0 +1,173 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+//! Demonstrates accumulating cachet telemetry into a single summary
+//! per cache operation, correlated by `request_id`.
+//!
+//! This pattern mirrors how a TVS-style consumer would collect tier
+//! outcomes, latencies, and flags into one log row per request.
+//!
+//! Uses `DashMap` for concurrent, low-contention accumulation — safe across
+//! all async runtimes, including work-stealing (tokio) and thread-per-core
+//! (oxidizer), even if a task migrates between cores mid-operation.
+//!
+//! Run with: `cargo run --example telemetry_accumulator --features "memory,logs"`
+
+use std::time::Duration;
+
+use cachet::telemetry::handler::{CacheEventHandler, CacheOperationEvent, CacheTierEvent, RequestId};
+use cachet::{Cache, CacheEntry};
+use dashmap::DashMap;
+use tick::Clock;
+
+// ---------------------------------------------------------------------------
+// Accumulated state — one entry per in-flight operation, keyed by request_id
+// ---------------------------------------------------------------------------
+
+#[derive(Debug)]
+struct TierRecord {
+ tier_name: String,
+ outcome: String,
+ duration_us: u64,
+ fallback: bool,
+}
+
+/// Handler that accumulates tier events per `request_id` and prints a
+/// one-line summary when the operation completes.
+///
+/// `DashMap` shards the map internally so concurrent operations on
+/// different cores rarely contend.
+struct AccumulatingHandler {
+ pending: DashMap>,
+}
+
+impl AccumulatingHandler {
+ fn new() -> Self {
+ Self { pending: DashMap::new() }
+ }
+}
+
+impl CacheEventHandler for AccumulatingHandler {
+ fn on_tier_event(&self, event: &CacheTierEvent<'_>) {
+ // Eviction events have request_id > 0 when triggered synchronously
+ // during an insert (capacity overflow). Background maintenance
+ // evictions have request_id == 0.
+ self.pending.entry(event.request_id).or_default().push(TierRecord {
+ tier_name: event.tier_name.to_owned(),
+ outcome: event.outcome.to_owned(),
+ duration_us: u64::try_from(event.duration.as_micros()).unwrap_or(u64::MAX),
+ fallback: event.fallback,
+ });
+ }
+
+ fn on_operation_complete(&self, event: &CacheOperationEvent<'_>) {
+ let tiers = self.pending.remove(&event.request_id).map(|(_, v)| v).unwrap_or_default();
+
+ // --- Build the summary line ---
+ // A TVS consumer would pack these into a bitfield here.
+
+ let mut flags = Vec::new();
+ if event.coalesced {
+ flags.push("COALESCED");
+ }
+ if tiers.iter().any(|t| t.fallback) {
+ flags.push("FALLBACK");
+ }
+ let flags_str = if flags.is_empty() {
+ String::new()
+ } else {
+ format!(" [{}]", flags.join(", "))
+ };
+
+ // Final outcome = last tier's outcome
+ let outcome = tiers.last().map_or("?", |t| t.outcome.as_str());
+
+ print!(
+ "[{}] {} -> {} ({}us total){flags_str}",
+ event.cache_name,
+ event.operation,
+ outcome,
+ event.duration.as_micros(),
+ );
+
+ // Per-tier breakdown for multi-tier caches
+ if tiers.len() > 1 {
+ print!(" | ");
+ for (i, tier) in tiers.iter().enumerate() {
+ if i > 0 {
+ print!(", ");
+ }
+ print!("{}={} ({}us)", tier.tier_name, tier.outcome, tier.duration_us);
+ }
+ }
+
+ println!();
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Main
+// ---------------------------------------------------------------------------
+
+#[tokio::main]
+async fn main() {
+ let clock = Clock::new_tokio();
+
+ // Single-tier cache
+ println!("=== Single-tier cache ===");
+ let cache: Cache = Cache::builder(clock.clone())
+ .memory()
+ .name("single")
+ .event_handler(AccumulatingHandler::new())
+ .build();
+
+ cache
+ .insert("key".to_string(), CacheEntry::new("value".to_string()))
+ .await
+ .expect("insert should succeed");
+ let _ = cache.get(&"key".to_string()).await;
+ let _ = cache.get(&"missing".to_string()).await;
+
+ // Two-tier cache with fallback
+ println!("\n=== Two-tier cache (L1 -> L2) ===");
+ let l2 = Cache::builder::(clock.clone()).memory().name("l2");
+ let cache2: Cache = Cache::builder(clock)
+ .memory()
+ .name("l1")
+ .ttl(Duration::from_secs(30))
+ .event_handler(AccumulatingHandler::new())
+ .fallback(l2)
+ .build();
+
+ cache2
+ .insert("user:1".to_string(), CacheEntry::new("Alice".to_string()))
+ .await
+ .expect("insert should succeed");
+ let _ = cache2.get(&"user:1".to_string()).await;
+ let _ = cache2.get(&"nobody".to_string()).await;
+
+ // Capacity-limited cache — evictions correlated with inserts
+ println!("\n=== Capacity-limited cache (max 2 entries) ===");
+ let cache3: Cache = Cache::builder(Clock::new_tokio())
+ .memory_with(|b| b.max_capacity(2).with_eviction_telemetry())
+ .name("tiny")
+ .event_handler(AccumulatingHandler::new())
+ .build();
+
+ // Fill to capacity
+ cache3
+ .insert("a".to_string(), CacheEntry::new("1".to_string()))
+ .await
+ .expect("insert should succeed");
+ cache3
+ .insert("b".to_string(), CacheEntry::new("2".to_string()))
+ .await
+ .expect("insert should succeed");
+ // This insert may trigger an eviction — the eviction event will carry
+ // the same request_id as the insert, so the accumulator sees both the
+ // insert and the eviction in one summary.
+ cache3
+ .insert("c".to_string(), CacheEntry::new("3".to_string()))
+ .await
+ .expect("insert should succeed");
+}
diff --git a/crates/cachet/examples/telemetry_subscriber.rs b/crates/cachet/examples/telemetry_subscriber.rs
index 9ce1d914b..27476f86c 100644
--- a/crates/cachet/examples/telemetry_subscriber.rs
+++ b/crates/cachet/examples/telemetry_subscriber.rs
@@ -1,96 +1,34 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
-//! Demonstrates subscribing to cachet telemetry events using a custom tracing Layer.
+//! Demonstrates cachet telemetry as structured tracing events.
//!
-//! This example shows how to use the public constants in `cachet::telemetry::attributes`
-//! to build a Layer that reacts to specific cache events.
+//! Run with: `cargo run --example telemetry_subscriber --features "memory,logs"`
use std::time::Duration;
-use cachet::telemetry::attributes;
use cachet::{Cache, CacheEntry};
use tick::Clock;
-use tracing::field::{Field, Visit};
-use tracing_subscriber::Layer;
-use tracing_subscriber::layer::{Context, SubscriberExt};
-
-/// A simple Layer that prints cache events to stdout.
-struct CacheEventPrinter;
-
-/// Visitor that extracts cache telemetry fields from a tracing event.
-#[derive(Default)]
-struct CacheFieldVisitor {
- cache_name: Option,
- event: Option,
- duration_ns: Option,
-}
-
-impl Visit for CacheFieldVisitor {
- fn record_str(&mut self, field: &Field, value: &str) {
- match field.name() {
- attributes::FIELD_NAME => self.cache_name = Some(value.to_owned()),
- attributes::FIELD_EVENT => self.event = Some(value.to_owned()),
- _ => {}
- }
- }
-
- fn record_u128(&mut self, field: &Field, value: u128) {
- if field.name() == attributes::FIELD_DURATION_NS {
- self.duration_ns = Some(value);
- }
- }
-
- fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {}
-}
-
-impl Layer for CacheEventPrinter {
- fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
- // Only process cachet events
- if !event.metadata().target().starts_with(attributes::TARGET) {
- return;
- }
-
- let mut visitor = CacheFieldVisitor::default();
- event.record(&mut visitor);
-
- let cache_name = visitor.cache_name.as_deref().unwrap_or("unknown");
- let event_type = visitor.event.as_deref().unwrap_or("unknown");
- let duration_us = visitor.duration_ns.unwrap_or(0) / 1000;
-
- // React to specific events using the public constants
- match event_type {
- attributes::EVENT_HIT => println!("HIT on {cache_name} ({duration_us}µs)"),
- attributes::EVENT_MISS => println!("MISS on {cache_name} ({duration_us}µs)"),
- attributes::EVENT_INSERTED => println!("INSERT on {cache_name} ({duration_us}µs)"),
- attributes::EVENT_EXPIRED => println!("EXPIRED on {cache_name} ({duration_us}µs)"),
- other => println!("{other} on {cache_name} ({duration_us}µs)"),
- }
- }
-}
+use tracing_subscriber::layer::SubscriberExt;
#[tokio::main]
async fn main() {
- // Set up the subscriber with our custom cache event layer
- let subscriber = tracing_subscriber::registry().with(CacheEventPrinter);
- let _guard = tracing::subscriber::set_default(subscriber);
+ // Set up a subscriber that shows cachet's structured events.
+ let subscriber = tracing_subscriber::registry().with(tracing_subscriber::fmt::layer().with_ansi(true).with_target(false));
+ tracing::subscriber::set_global_default(subscriber).expect("subscriber already set");
let clock = Clock::new_tokio();
- let cache = Cache::builder::(clock)
- .memory()
- .enable_logs()
- .ttl(Duration::from_secs(30))
- .build();
+ let cache: Cache = Cache::builder(clock).memory().enable_logs().ttl(Duration::from_secs(30)).build();
- println!("Inserting entry...");
+ println!("--- Insert ---");
cache
.insert("user:1".to_string(), CacheEntry::new("Alice".to_string()))
.await
.expect("insert failed");
- println!("Getting existing key...");
+ println!("\n--- Get (hit) ---");
let _ = cache.get(&"user:1".to_string()).await;
- println!("Getting missing key...");
+ println!("\n--- Get (miss) ---");
let _ = cache.get(&"user:999".to_string()).await;
}
diff --git a/crates/cachet/src/builder/buildable.rs b/crates/cachet/src/builder/buildable.rs
index befc93b83..95c4d25d5 100644
--- a/crates/cachet/src/builder/buildable.rs
+++ b/crates/cachet/src/builder/buildable.rs
@@ -19,7 +19,7 @@ pub(crate) trait Buildable {
fn build(self) -> Cache;
- fn build_tier(self, clock: Clock, telemetry: CacheTelemetry) -> Self::TierOutput;
+ fn build_tier(self, clock: Clock, telemetry: CacheTelemetry, fallback: bool) -> Self::TierOutput;
}
impl Buildable for CacheBuilder
@@ -36,18 +36,18 @@ where
let telemetry = self.telemetry.clone();
let stampede_protection = self.stampede_protection;
- let tier = DynamicCache::new(self.build_tier(clock.clone(), telemetry));
+ let tier = DynamicCache::new(self.build_tier(clock.clone(), telemetry.clone(), false));
- Cache::new(type_name::(name), tier, clock, stampede_protection)
+ Cache::new(type_name::(name), tier, clock, telemetry, stampede_protection)
}
- fn build_tier(self, clock: Clock, telemetry: CacheTelemetry) -> Self::TierOutput {
+ fn build_tier(self, clock: Clock, telemetry: CacheTelemetry, fallback: bool) -> Self::TierOutput {
let name = type_name::(self.name);
#[cfg(feature = "memory")]
if let Some(hook) = &self.eviction_hook {
hook.init(telemetry.clone(), name);
}
- CacheWrapper::new(name, self.storage, clock, self.ttl, telemetry, self.policy)
+ CacheWrapper::new(name, self.storage, clock, self.ttl, telemetry, self.policy, fallback)
}
}
@@ -66,14 +66,14 @@ where
let telemetry = self.telemetry.clone();
let stampede_protection = self.stampede_protection;
- let tier = DynamicCache::new(self.build_tier(clock.clone(), telemetry));
+ let tier = DynamicCache::new(self.build_tier(clock.clone(), telemetry.clone(), false));
- Cache::new(type_name::(name), tier, clock, stampede_protection)
+ Cache::new(type_name::(name), tier, clock, telemetry, stampede_protection)
}
- fn build_tier(self, clock: Clock, telemetry: CacheTelemetry) -> Self::TierOutput {
- let primary = self.primary_builder.build_tier(clock.clone(), telemetry.clone());
- let fallback = self.fallback_builder.build_tier(clock.clone(), telemetry.clone());
+ fn build_tier(self, clock: Clock, telemetry: CacheTelemetry, fallback: bool) -> Self::TierOutput {
+ let primary = self.primary_builder.build_tier(clock.clone(), telemetry.clone(), fallback);
+ let fallback = self.fallback_builder.build_tier(clock.clone(), telemetry.clone(), true);
FallbackCache::new(
type_name::(self.name),
diff --git a/crates/cachet/src/builder/cache.rs b/crates/cachet/src/builder/cache.rs
index 2a56c6fde..9eea7946d 100644
--- a/crates/cachet/src/builder/cache.rs
+++ b/crates/cachet/src/builder/cache.rs
@@ -3,7 +3,6 @@
use std::hash::Hash;
use std::marker::PhantomData;
-#[cfg(feature = "memory")]
use std::sync::Arc;
use std::time::Duration;
@@ -18,6 +17,7 @@ use super::sealed::{CacheTierBuilder, Sealed};
use crate::eviction::EvictionHook;
use crate::policy::InsertPolicy;
use crate::telemetry::CacheTelemetry;
+use crate::telemetry::handler::CacheEventHandler;
use crate::{Cache, CacheTier};
/// Builder for constructing a cache with a single tier.
@@ -206,7 +206,7 @@ impl CacheBuilder {
/// If not set, a name is derived from the storage type.
///
/// Requires `&'static str` because the name is embedded in every telemetry
- /// event (metric labels, log fields). A static reference avoids cloning the
+ /// event (tracing fields, handler callbacks). A static reference avoids cloning the
/// name into a new allocation on each cache operation, which matters at high
/// throughput. In practice, cache names are always string literals.
#[must_use]
@@ -221,7 +221,7 @@ impl CacheBuilder {
#[cfg(any(feature = "logs", test))]
#[must_use]
pub fn enable_logs(mut self) -> Self {
- self.telemetry = CacheTelemetry::with_logging();
+ self.telemetry = self.telemetry.enable_logging();
self
}
@@ -251,6 +251,13 @@ impl CacheBuilder {
self
}
+ /// Registers a callback for structured cache events.
+ #[must_use]
+ pub fn event_handler(mut self, handler: impl CacheEventHandler + 'static) -> Self {
+ self.telemetry = self.telemetry.with_handler(Arc::new(handler));
+ self
+ }
+
/// Sets the time-to-live (TTL) for entries in this cache tier.
///
/// Entries older than the TTL will be considered expired and won't be
@@ -284,7 +291,7 @@ impl CacheBuilder {
/// [`Cache::get_or_insert`](crate::Cache::get_or_insert), and promotion from a fallback tier.
///
/// If the policy rejects an insert, the operation is skipped and a
- /// `cache.rejected` telemetry event is recorded with `cache.operation = cache.insert`.
+ /// `cache.insert_rejected` telemetry event is recorded.
///
/// # Examples
///
diff --git a/crates/cachet/src/builder/transform.rs b/crates/cachet/src/builder/transform.rs
index 990b1e9d2..97a48e6f7 100644
--- a/crates/cachet/src/builder/transform.rs
+++ b/crates/cachet/src/builder/transform.rs
@@ -245,17 +245,17 @@ where
let clock = self.clock.clone();
let telemetry = self.telemetry.clone();
let stampede_protection = self.stampede_protection;
- let tier = self.build_tier(clock.clone(), telemetry);
+ let tier = self.build_tier(clock.clone(), telemetry.clone(), false);
- crate::Cache::new(type_name::(None), tier, clock, stampede_protection)
+ crate::Cache::new(type_name::(None), tier, clock, telemetry, stampede_protection)
}
- fn build_tier(self, clock: Clock, telemetry: CacheTelemetry) -> Self::TierOutput {
+ fn build_tier(self, clock: Clock, telemetry: CacheTelemetry, fallback: bool) -> Self::TierOutput {
// Build pre-transform tier
- let pre_tier = self.pre.build_tier(clock.clone(), telemetry.clone());
+ let pre_tier = self.pre.build_tier(clock.clone(), telemetry.clone(), fallback);
// Build post-transform tier, wrap in TransformAdapter
- let post_tier = self.post.build_tier(clock.clone(), telemetry.clone());
+ let post_tier = self.post.build_tier(clock.clone(), telemetry.clone(), true);
let adapted = TransformAdapter::from_boxed(post_tier, self.key_encoder, self.value_codec);
// Combine: pre is primary, adapted is fallback
diff --git a/crates/cachet/src/cache.rs b/crates/cachet/src/cache.rs
index 9b3c653d3..6c15856a3 100644
--- a/crates/cachet/src/cache.rs
+++ b/crates/cachet/src/cache.rs
@@ -13,11 +13,13 @@ use uniflight::Merger;
use crate::Error;
use crate::builder::CacheBuilder;
+use crate::telemetry::CacheTelemetry;
+use crate::telemetry::cache::{WithRequestIdExt, next_request_id};
/// Type alias for cache names used in telemetry.
///
/// A static reference is used so that names can be embedded in telemetry
-/// attributes (metric labels, log fields) without allocating on every
+/// fields (tracing events, handler callbacks) without allocating on every
/// cache operation.
pub type CacheName = &'static str;
@@ -113,6 +115,7 @@ pub struct Cache {
pub(crate) name: CacheName,
pub(crate) storage: DynamicCache,
pub(crate) clock: Clock,
+ pub(crate) telemetry: CacheTelemetry,
/// Mergers for stampede protection on all operations.
/// Only present when `stampede_protection` is enabled.
mergers: Option>,
@@ -149,11 +152,18 @@ where
K: Clone + Eq + Hash + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
- pub(crate) fn new(name: CacheName, storage: DynamicCache, clock: Clock, stampede_protection: bool) -> Self {
+ pub(crate) fn new(
+ name: CacheName,
+ storage: DynamicCache,
+ clock: Clock,
+ telemetry: CacheTelemetry,
+ stampede_protection: bool,
+ ) -> Self {
Self {
name,
storage,
clock,
+ telemetry,
mergers: stampede_protection.then(Mergers::new),
}
}
@@ -220,18 +230,28 @@ where
K: Borrow,
Q: Hash + Eq + ToOwned + ?Sized + Send + Sync,
{
- if let Some(mergers) = &self.mergers {
- let owned = key.to_owned();
- let storage = &self.storage;
- mergers
- .get
- .execute(key, move || async move { storage.get(&owned).await })
- .await
- .unwrap_or_else(|panicked| Err(Error::from_source(panicked)))
- } else {
- let owned = key.to_owned();
- self.storage.get(&owned).await
+ let request_id = next_request_id();
+ let watch = self.clock.stopwatch();
+ async {
+ let (result, coalesced) = if let Some(mergers) = &self.mergers {
+ let owned = key.to_owned();
+ let storage = &self.storage;
+ let result = mergers
+ .get
+ .execute(key, move || async move { storage.get(&owned).await })
+ .await
+ .unwrap_or_else(|panicked| Err(Error::from_source(panicked)));
+ (result, true)
+ } else {
+ let owned = key.to_owned();
+ (self.storage.get(&owned).await, false)
+ };
+ self.telemetry
+ .complete_operation(request_id, self.name, "cache.get", watch.elapsed(), coalesced);
+ result
}
+ .with_request_id(request_id)
+ .await
}
/// Inserts a value into the cache.
@@ -260,7 +280,16 @@ where
/// # };
/// ```
pub async fn insert(&self, key: K, entry: impl Into>) -> Result<(), Error> {
- self.storage.insert(key, entry.into()).await
+ let request_id = next_request_id();
+ let watch = self.clock.stopwatch();
+ async {
+ let result = self.storage.insert(key, entry.into()).await;
+ self.telemetry
+ .complete_operation(request_id, self.name, "cache.insert", watch.elapsed(), false);
+ result
+ }
+ .with_request_id(request_id)
+ .await
}
/// Invalidates (removes) a value from the cache.
@@ -282,18 +311,28 @@ where
K: Borrow,
Q: Hash + Eq + ToOwned + ?Sized + Send + Sync,
{
- if let Some(mergers) = &self.mergers {
- let owned = key.to_owned();
- let storage = &self.storage;
- mergers
- .invalidate
- .execute(key, move || async move { storage.invalidate(&owned).await })
- .await
- .unwrap_or_else(|panicked| Err(Error::from_source(panicked)))
- } else {
- let owned = key.to_owned();
- self.storage.invalidate(&owned).await
+ let request_id = next_request_id();
+ let watch = self.clock.stopwatch();
+ async {
+ let (result, coalesced) = if let Some(mergers) = &self.mergers {
+ let owned = key.to_owned();
+ let storage = &self.storage;
+ let result = mergers
+ .invalidate
+ .execute(key, move || async move { storage.invalidate(&owned).await })
+ .await
+ .unwrap_or_else(|panicked| Err(Error::from_source(panicked)));
+ (result, true)
+ } else {
+ let owned = key.to_owned();
+ (self.storage.invalidate(&owned).await, false)
+ };
+ self.telemetry
+ .complete_operation(request_id, self.name, "cache.invalidate", watch.elapsed(), coalesced);
+ result
}
+ .with_request_id(request_id)
+ .await
}
/// Returns true if the cache contains a value for the given key.
@@ -315,7 +354,16 @@ where
///
/// Returns an error if the underlying cache tier operation fails.
pub async fn clear(&self) -> Result<(), Error> {
- self.storage.clear().await
+ let request_id = next_request_id();
+ let watch = self.clock.stopwatch();
+ async {
+ let result = self.storage.clear().await;
+ self.telemetry
+ .complete_operation(request_id, self.name, "cache.clear", watch.elapsed(), false);
+ result
+ }
+ .with_request_id(request_id)
+ .await
}
/// Returns an **approximate** count of entries, if supported by the underlying storage.
@@ -412,16 +460,26 @@ where
Q: Hash + Eq + ToOwned + ?Sized + Send + Sync,
Fut: Future