Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .spelling
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ mockable
mockall
modularity
moka
moka's
monomorphization
monomorphize
monomorphized
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions crates/cachet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ parking_lot = { workspace = true }
pin-project-lite = { workspace = true }
postcard = { workspace = true, optional = true }
serde = { workspace = true, optional = true, features = ["derive"] }
thread_aware = { workspace = true }
tick = { workspace = true, features = [] }
tracing = { workspace = true, optional = true }
uniflight = { workspace = true }
Expand All @@ -67,6 +66,7 @@ bytesbuf = { path = "../bytesbuf" }
cachet_memory = { path = "../cachet_memory" }
cachet_tier = { path = "../cachet_tier", features = ["test-util"] }
criterion = { workspace = true }
dashmap = { workspace = true }
dynosaur = { workspace = true }
opentelemetry = { workspace = true, features = [
"metrics",
Expand All @@ -81,7 +81,7 @@ testing_aids = { path = "../testing_aids" }
tick = { path = "../tick", features = ["test-util", "tokio"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tracing = { workspace = true, features = ["std"] }
tracing-subscriber = { workspace = true }
tracing-subscriber = { workspace = true, features = ["fmt", "registry"] }
tracing-test = { workspace = true, features = ["no-env-filter"] }
Comment thread
schgoo marked this conversation as resolved.

[[bench]]
Expand Down Expand Up @@ -159,6 +159,10 @@ required-features = ["memory", "test-util"]
name = "telemetry_subscriber"
required-features = ["memory", "logs"]

[[example]]
name = "telemetry_accumulator"
required-features = ["memory", "logs"]

[[example]]
name = "serialization"
required-features = ["memory", "serialize"]
Expand Down
36 changes: 27 additions & 9 deletions crates/cachet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
</div>

A composable, multi-tier caching library with stampede protection, background
refresh, and built-in OpenTelemetry telemetry.
refresh, and structured telemetry.

## Why Multi-Tier Caching?

Expand Down Expand Up @@ -89,7 +89,7 @@ builds on top of them and adds:
|Stampede protection|❌|✅|
|Background refresh|❌|✅|
|Service middleware integration|❌|✅|
|Structured telemetry (tracing)|❌|✅|
|Structured telemetry|❌|✅|
|Pluggable storage backends|❌|✅|
|Clock injection for testing|❌|✅|

Expand Down Expand Up @@ -228,12 +228,16 @@ cache.insert("key".to_string(), "value".to_string()).await?;

## Telemetry

Enable with the `logs` feature and `.enable_logs()` on the cache builder.
Cachet provides two complementary telemetry channels:

### Tracing events

Each cache operation emits a structured [`tracing`][__link20] event with fields
`cache.name`, `cache.event`, and `cache.duration_ns`.
Enable with the `logs` feature and `.enable_logs()` on the cache builder.
Each tier outcome and operation completion emits a structured [`tracing`][__link20] event.

### Subscribing to events
**Tier events** carry `cache.name`, `cache.event`, and `cache.duration_ns`.
**Operation-complete events** carry `cache.name`, `cache.operation`,
`cache.duration_ns`, and `cache.coalesced`.

Use [`telemetry::attributes`][__link21] constants to filter and match events in a
custom `tracing_subscriber::Layer`:
Expand All @@ -251,21 +255,32 @@ if event_value == attributes::EVENT_HIT { /* cache hit */ }

See the `telemetry_subscriber` example for a complete demonstration.

### Event types
#### Event types

|Level|Events|
|-----|------|
|ERROR|`cache.get_error`, `cache.insert_error`, `cache.invalidate_error`, `cache.clear_error`|
|INFO|`cache.expired`, `cache.refresh_miss`, `cache.inserted`, `cache.insert_rejected`, `cache.invalidated`, `cache.fallback`, `cache.eviction`|
|INFO|`cache.expired`, `cache.refresh_miss`, `cache.inserted`, `cache.insert_rejected`, `cache.invalidated`, `cache.eviction`|
|DEBUG|`cache.hit`, `cache.miss`, `cache.refresh_hit`, `cache.cleared`|

### Event handler callback API

Register a [`CacheEventHandler`][__link22] via
`.event_handler(handler)` on the cache builder to receive typed
[`CacheTierEvent`][__link23] and
[`CacheOperationEvent`][__link24] callbacks.
Events carry a `request_id` for correlating tier outcomes with their parent
operation. Works independently of the `logs` feature.

See the `telemetry_accumulator` example for a DashMap-based accumulation pattern.


<hr/>
<sub>
This crate was developed as part of <a href="../..">The Oxidizer Project</a>. Browse this crate's <a href="https://github.com/microsoft/oxidizer/tree/main/crates/cachet">source code</a>.
</sub>

[__cargo_doc2readme_dependencies_info]: ggGmYW0CYXZlMC43LjJhdIQbLiTyV0MU86EbZU15e0PmecoboQ9jo59bnAEbyDXw04U13GlhYvRhcoQbg_hDqE88LP4bMh0J5Y4y4Osb0zDJ1kwqOsoblCGrm49Rx2thZIiCaGJ5dGVzYnVmZTAuNS4zgmZjYWNoZXRlMC42LjSCbWNhY2hldF9tZW1vcnllMC4zLjOCbmNhY2hldF9zZXJ2aWNlZTAuMi4zgmtjYWNoZXRfdGllcmUwLjIuMoJkdGlja2UwLjMuM4JndHJhY2luZ2YwLjEuNDSCaXVuaWZsaWdodGUwLjIuMw
[__cargo_doc2readme_dependencies_info]: ggGkYW0CYXSEGy4k8ldDFPOhG2VNeXtD5nnKG6EPY6OfW5wBG8g18NOFNdxpYXKEG_8ZSA792uloG6CGM3YZObWMG5vDWjb2V8K3G4SF7NHmnsnBYWSIgmhieXRlc2J1ZmUwLjUuM4JmY2FjaGV0ZTAuNi40gm1jYWNoZXRfbWVtb3J5ZTAuMy4zgm5jYWNoZXRfc2VydmljZWUwLjIuM4JrY2FjaGV0X3RpZXJlMC4yLjKCZHRpY2tlMC4zLjOCZ3RyYWNpbmdmMC4xLjQ0gml1bmlmbGlnaHRlMC4yLjM
[__link0]: https://docs.rs/cachet/0.6.4/cachet/?search=TimeToRefresh
[__link1]: https://crates.io/crates/uniflight/0.2.3
[__link10]: https://docs.rs/cachet_tier/0.2.2/cachet_tier/?search=CacheTier
Expand All @@ -281,6 +296,9 @@ This crate was developed as part of <a href="../..">The Oxidizer Project</a>. Br
[__link2]: https://docs.rs/cachet/0.6.4/cachet/?search=CacheBuilder::stampede_protection
[__link20]: https://crates.io/crates/tracing/0.1.44
[__link21]: https://docs.rs/cachet/0.6.4/cachet/?search=telemetry::attributes
[__link22]: https://docs.rs/cachet/0.6.4/cachet/?search=telemetry::handler::CacheEventHandler
[__link23]: https://docs.rs/cachet/0.6.4/cachet/?search=telemetry::handler::CacheTierEvent
[__link24]: https://docs.rs/cachet/0.6.4/cachet/?search=telemetry::handler::CacheOperationEvent
[__link3]: https://docs.rs/cachet_tier/0.2.2/cachet_tier/?search=CacheTier
[__link4]: https://docs.rs/cachet_tier/0.2.2/cachet_tier/?search=DynamicCache
[__link5]: https://docs.rs/cachet/0.6.4/cachet/?search=InsertPolicy
Expand Down
27 changes: 27 additions & 0 deletions crates/cachet/benches/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use cachet_tier::MockCache;
use criterion::{Criterion, criterion_group, criterion_main};
use tick::Clock;
use tokio::runtime::Runtime;
use tracing_subscriber::layer::SubscriberExt;

fn rt() -> Runtime {
Runtime::new().expect("failed to create runtime")
Expand Down Expand Up @@ -87,6 +88,7 @@ fn bench_cache_operations(c: &mut Criterion) {
// Wrapper Overhead (direct vs wrapped vs features)
// =============================================================================

#[expect(clippy::too_many_lines, reason = "benchmark function with multiple related groups")]
fn bench_wrapper_overhead(c: &mut Criterion) {
let rt = rt();
let mut group = c.benchmark_group("wrapper_overhead");
Expand Down Expand Up @@ -148,6 +150,31 @@ fn bench_wrapper_overhead(c: &mut Criterion) {
});
});

// With telemetry + active subscriber (measures event formatting/dispatch overhead)
group.bench_function("with_telemetry_subscriber", |b| {
let subscriber = tracing_subscriber::registry().with(tracing_subscriber::fmt::layer().with_writer(std::io::sink).with_ansi(false));
let _guard = tracing::subscriber::set_default(subscriber);

let cache = rt.block_on(async {
let clock = Clock::new_tokio();
Cache::builder(clock)
.storage(MockCache::<String, String>::new())
.enable_logs()
.build()
});
let key = "key".to_string();

b.iter_custom(|iters| {
rt.block_on(async {
let start = Instant::now();
for _ in 0..iters {
let _ = black_box(cache.get(black_box(&key)).await);
}
start.elapsed()
})
});
});

// With fallback tier
group.bench_function("with_fallback", |b| {
let cache = rt.block_on(async {
Expand Down
173 changes: 173 additions & 0 deletions crates/cachet/examples/telemetry_accumulator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

//! Demonstrates accumulating cachet telemetry into a single summary
//! per cache operation, correlated by `request_id`.
//!
//! This pattern mirrors how a TVS-style consumer would collect tier
//! outcomes, latencies, and flags into one log row per request.
//!
//! Uses `DashMap` for concurrent, low-contention accumulation — safe across
//! all async runtimes, including work-stealing (tokio) and thread-per-core
//! (oxidizer), even if a task migrates between cores mid-operation.
//!
//! Run with: `cargo run --example telemetry_accumulator --features "memory,logs"`

use std::time::Duration;

use cachet::telemetry::handler::{CacheEventHandler, CacheOperationEvent, CacheTierEvent, RequestId};
use cachet::{Cache, CacheEntry};
use dashmap::DashMap;
use tick::Clock;

// ---------------------------------------------------------------------------
// Accumulated state — one entry per in-flight operation, keyed by request_id
// ---------------------------------------------------------------------------

#[derive(Debug)]
struct TierRecord {
tier_name: String,
outcome: String,
duration_us: u64,
fallback: bool,
}

/// Handler that accumulates tier events per `request_id` and prints a
/// one-line summary when the operation completes.
///
/// `DashMap` shards the map internally so concurrent operations on
/// different cores rarely contend.
struct AccumulatingHandler {
pending: DashMap<RequestId, Vec<TierRecord>>,
}

impl AccumulatingHandler {
fn new() -> Self {
Self { pending: DashMap::new() }
}
}

impl CacheEventHandler for AccumulatingHandler {
fn on_tier_event(&self, event: &CacheTierEvent<'_>) {
// Eviction events have request_id > 0 when triggered synchronously
// during an insert (capacity overflow). Background maintenance
// evictions have request_id == 0.
self.pending.entry(event.request_id).or_default().push(TierRecord {
tier_name: event.tier_name.to_owned(),
outcome: event.outcome.to_owned(),
duration_us: u64::try_from(event.duration.as_micros()).unwrap_or(u64::MAX),
fallback: event.fallback,
});
}

fn on_operation_complete(&self, event: &CacheOperationEvent<'_>) {
let tiers = self.pending.remove(&event.request_id).map(|(_, v)| v).unwrap_or_default();

// --- Build the summary line ---
// A TVS consumer would pack these into a bitfield here.

let mut flags = Vec::new();
if event.coalesced {
flags.push("COALESCED");
}
if tiers.iter().any(|t| t.fallback) {
flags.push("FALLBACK");
}
let flags_str = if flags.is_empty() {
String::new()
} else {
format!(" [{}]", flags.join(", "))
};

// Final outcome = last tier's outcome
let outcome = tiers.last().map_or("?", |t| t.outcome.as_str());

print!(
"[{}] {} -> {} ({}us total){flags_str}",
event.cache_name,
event.operation,
outcome,
event.duration.as_micros(),
);

// Per-tier breakdown for multi-tier caches
if tiers.len() > 1 {
print!(" | ");
for (i, tier) in tiers.iter().enumerate() {
if i > 0 {
print!(", ");
}
print!("{}={} ({}us)", tier.tier_name, tier.outcome, tier.duration_us);
}
}

println!();
}
}

// ---------------------------------------------------------------------------
// Main
// ---------------------------------------------------------------------------

#[tokio::main]
async fn main() {
let clock = Clock::new_tokio();

// Single-tier cache
println!("=== Single-tier cache ===");
let cache: Cache<String, String> = Cache::builder(clock.clone())
.memory()
.name("single")
.event_handler(AccumulatingHandler::new())
.build();

cache
.insert("key".to_string(), CacheEntry::new("value".to_string()))
.await
.expect("insert should succeed");
let _ = cache.get(&"key".to_string()).await;
let _ = cache.get(&"missing".to_string()).await;

// Two-tier cache with fallback
println!("\n=== Two-tier cache (L1 -> L2) ===");
let l2 = Cache::builder::<String, String>(clock.clone()).memory().name("l2");
let cache2: Cache<String, String> = Cache::builder(clock)
.memory()
.name("l1")
.ttl(Duration::from_secs(30))
.event_handler(AccumulatingHandler::new())
.fallback(l2)
.build();

cache2
.insert("user:1".to_string(), CacheEntry::new("Alice".to_string()))
.await
.expect("insert should succeed");
let _ = cache2.get(&"user:1".to_string()).await;
let _ = cache2.get(&"nobody".to_string()).await;

// Capacity-limited cache — evictions correlated with inserts
println!("\n=== Capacity-limited cache (max 2 entries) ===");
let cache3: Cache<String, String> = Cache::builder(Clock::new_tokio())
.memory_with(|b| b.max_capacity(2).with_eviction_telemetry())
.name("tiny")
.event_handler(AccumulatingHandler::new())
.build();

// Fill to capacity
cache3
.insert("a".to_string(), CacheEntry::new("1".to_string()))
.await
.expect("insert should succeed");
cache3
.insert("b".to_string(), CacheEntry::new("2".to_string()))
.await
.expect("insert should succeed");
// This insert may trigger an eviction — the eviction event will carry
// the same request_id as the insert, so the accumulator sees both the
// insert and the eviction in one summary.
cache3
.insert("c".to_string(), CacheEntry::new("3".to_string()))
.await
.expect("insert should succeed");
}
Loading
Loading