feat(cachet): structured telemetry with spans, events, and handler API#460
feat(cachet): structured telemetry with spans, events, and handler API#460schgoo wants to merge 20 commits into
Conversation
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #460 +/- ##
========================================
Coverage 100.0% 100.0%
========================================
Files 313 312 -1
Lines 24277 24915 +638
========================================
+ Hits 24277 24915 +638 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR upgrades cachet telemetry from standalone events to a correlated span + event model, and introduces a callback-based handler API so consumers can build structured telemetry pipelines without tracing visitor boilerplate.
Changes:
- Adds per-operation and per-tier spans/events, including timing fields and flags (coalesced/fallback).
- Introduces
CacheEventHandlerwith typedCacheTierEvent/CacheOperationEventand request correlation viarequest_id. - Removes the old timing extension (
telemetry/ext.rs) and updates wrappers/builders/examples accordingly.
Reviewed changes
Copilot reviewed 21 out of 22 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/cachet/src/wrapper.rs | Wraps tier operations in tier spans; records hit/miss/expired/etc with new telemetry API; propagates fallback flag. |
| crates/cachet/src/telemetry/mod.rs | Exposes new handler module and updates module docs. |
| crates/cachet/src/telemetry/handler.rs | Adds public callback API types (CacheEventHandler, event structs, RequestId). |
| crates/cachet/src/telemetry/ext.rs | Removes old ClockExt/Timed timing utilities. |
| crates/cachet/src/telemetry/cache.rs | Implements request correlation, span factories, event recording, and handler emission; adds extensive tests. |
| crates/cachet/src/telemetry/attributes.rs | Updates public telemetry field constants and removes EVENT_FALLBACK in favor of FIELD_FALLBACK. |
| crates/cachet/src/refresh.rs | Updates background refresh flow to use new spans/recording helpers. |
| crates/cachet/src/lib.rs | Updates crate docs for span-based telemetry and re-exports handler types. |
| crates/cachet/src/fallback.rs | Updates fallback get/promotion path to set fallback flag and rely on wrapper tier telemetry. |
| crates/cachet/src/eviction.rs | Routes eviction/expiration causes to new telemetry helpers and adds a no-logging behavior test. |
| crates/cachet/src/cache.rs | Adds operation-level spans/events with request_id correlation and stampede-protection coalesced flag recording. |
| crates/cachet/src/builder/transform.rs | Threads fallback flag through transform tier construction and passes telemetry into Cache::new. |
| crates/cachet/src/builder/cache.rs | Updates enable_logs behavior and adds event_handler(...) registration API. |
| crates/cachet/src/builder/buildable.rs | Extends tier-building to propagate fallback metadata and passes telemetry into Cache::new. |
| crates/cachet/README.md | Updates generated README telemetry docs to match the new span model. |
| crates/cachet/examples/telemetry_subscriber.rs | Simplifies example to show default span+event output using fmt subscriber. |
| crates/cachet/examples/telemetry_accumulator.rs | Adds example demonstrating request-correlated accumulation using CacheEventHandler + DashMap. |
| crates/cachet/Cargo.toml | Adjusts logs feature and dependencies; adds dashmap dev-dependency; registers new example. |
| crates/cachet/benches/operations.rs | Adds a benchmark configuration to measure overhead with an active tracing subscriber. |
| crates/cachet_memory/README.md | Updates generated README dependency info. |
| Cargo.lock | Adds dashmap and removes thread_aware from cachet crate dependency set. |
| .spelling | Adds “moka’s” to dictionary. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Have you had a look at |
@ralfbiedert that's the idea (it's actually in the PR description)! I'm not sure when it will be available, though, but I'm hoping this provides a good middle ground. |
| /// RAII guard that resets the thread-local request ID to 0 on drop, | ||
| /// ensuring cleanup even if the inner future panics during poll. | ||
| struct ResetRequestId; | ||
|
|
||
| impl Drop for ResetRequestId { | ||
| fn drop(&mut self) { | ||
| CURRENT_REQUEST_ID.with(|cell| cell.set(0)); | ||
| } | ||
| } | ||
|
|
||
| /// Internal state for cache telemetry when no features are enabled (no-op). | ||
| #[cfg(not(any(feature = "logs", test)))] | ||
| #[derive(Clone, Debug, Default)] | ||
| pub(crate) struct CacheTelemetryInner; | ||
| impl<F: Future> Future for WithRequestId<F> { | ||
| type Output = F::Output; | ||
|
|
||
| #[cfg(not(any(feature = "logs", test)))] | ||
| #[expect(clippy::unused_self, reason = "Methods must match the logs-enabled impl signature")] | ||
| impl CacheTelemetryInner { | ||
| #[inline] | ||
| fn debug(&self, _: CacheName, _: &'static str, _: Duration) {} | ||
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| let this = self.project(); | ||
| CURRENT_REQUEST_ID.with(|cell| cell.set(*this.request_id)); | ||
| let _guard = ResetRequestId; | ||
| this.inner.poll(cx) | ||
| } |
| if self.logging_enabled { | ||
| let span = Span::current(); | ||
| if !span.is_disabled() { | ||
| let duration_ns = saturating_nanos(duration); | ||
| span.record(attributes::FIELD_EVENT, event); |
| fn record_info_with_duration(&self, event: &'static str, duration: Duration) { | ||
| #[cfg(any(feature = "logs", test))] | ||
| if self.logging_enabled { | ||
| let span = Span::current(); | ||
| if !span.is_disabled() { | ||
| let duration_ns = saturating_nanos(duration); | ||
| span.record(attributes::FIELD_EVENT, event); | ||
| span.record(attributes::FIELD_DURATION_NS, duration_ns); | ||
| tracing::info!(cache.event = event, cache.duration_ns = duration_ns); | ||
| } | ||
| } | ||
| #[cfg(not(any(feature = "logs", test)))] | ||
| { | ||
| let _ = (event, duration); | ||
| } | ||
| } |
| fn record_error_with_duration(&self, event: &'static str, duration: Duration) { | ||
| #[cfg(any(feature = "logs", test))] | ||
| if self.logging_enabled { | ||
| let span = Span::current(); | ||
| if !span.is_disabled() { | ||
| let duration_ns = saturating_nanos(duration); | ||
| span.record(attributes::FIELD_EVENT, event); | ||
| span.record(attributes::FIELD_DURATION_NS, duration_ns); | ||
| tracing::error!(cache.event = event, cache.duration_ns = duration_ns); | ||
| } | ||
| } | ||
| #[cfg(not(any(feature = "logs", test)))] | ||
| { | ||
| let _ = (event, duration); | ||
| } | ||
| } |
| .with_request_id(inner_id) | ||
| ); | ||
| let mut inner_cx = Context::from_waker(waker); | ||
| assert!(matches!(inner.as_mut().poll(&mut inner_cx), Poll::Ready(()))); |
| .with_request_id(outer_id) | ||
| ); | ||
| let mut outer_cx = Context::from_waker(waker); | ||
| assert!(matches!(outer.as_mut().poll(&mut outer_cx), Poll::Ready(()))); |
| pub(crate) fn record_fallback(&self) { | ||
| #[cfg(any(feature = "logs", test))] | ||
| if self.logging_enabled { | ||
| tracing::info!(cache.event = "cache.fallback", cache.fallback = true); | ||
| } |
| fn record_debug_with_duration(&self, event: &'static str, duration: Duration) { | ||
| #[cfg(any(feature = "logs", test))] | ||
| if self.logging_enabled { | ||
| let duration_ns = saturating_nanos(duration); | ||
| tracing::debug!(cache.event = event, cache.duration_ns = duration_ns); | ||
| } |
| //! Use these constants to filter or match cachet telemetry events in a custom | ||
| //! `tracing_subscriber::Layer`. Cache operations emit structured events with | ||
| //! `FIELD_NAME`, `FIELD_EVENT`, `FIELD_DURATION_NS`, and additional fields when applicable. | ||
| //! |
| fn field_constants_match_tracing_field_names() { | ||
| // These constants must match the field names used in tracing macros in cache.rs. | ||
| assert_eq!(FIELD_NAME, "cache.name"); |
| let request_id = next_request_id(); | ||
| let watch = self.clock.stopwatch(); | ||
| async { | ||
| let (result, coalesced) = if let Some(mergers) = &self.mergers { | ||
| let owned = key.to_owned(); | ||
| let storage = &self.storage; | ||
| let result = mergers | ||
| .get | ||
| .execute(key, move || async move { storage.get(&owned).await }) | ||
| .await | ||
| .unwrap_or_else(|panicked| Err(Error::from_source(panicked))); | ||
| (result, true) | ||
| } else { | ||
| let owned = key.to_owned(); | ||
| (self.storage.get(&owned).await, false) | ||
| }; | ||
| self.telemetry | ||
| .complete_operation(request_id, self.name, "cache.get", watch.elapsed(), coalesced); | ||
| result | ||
| } | ||
| .with_request_id(request_id) | ||
| .await |
Summary
Replaces cachet's event-only telemetry with a span + event model that provides per-tier timing, request correlation, and a callback API for consumers to build custom telemetry pipelines.
Motivation
The previous telemetry emitted standalone tracing events with no correlation between tiers. Consumers couldn't distinguish which tier events belonged to which cache operation, and there was no way to subscribe to structured telemetry without parsing tracing fields via the visitor pattern.
Changes
Tracing spans + events
Cachemethod (get,insert,invalidate,clear,get_or_insert, etc.) creates a parent span viaCacheTelemetryCacheWrappertier operation creates a child span, producing a nested trace:cache.get → cache.tierCacheEventHandlercallback APICacheEventHandlertrait withon_tier_eventandon_operation_completecallbacksCacheBuilder::event_handler(handler)CacheTierEventandCacheOperationEventstructs — no tracing visitor boilerplatelogsfeature flagemitRequest correlation
request_id: u64from a process-wide atomic counterWithRequestId<F>future wrapper restores the request ID into a thread-local on everypoll(), surviving task migration across threads/cores (same pattern astracing::Instrument)CacheTierEventandCacheOperationEventcarry therequest_idfor groupingFallback as a flag
cache.fallbackis now a boolean flag on tier events, not a separate event typeRemoved
telemetry/ext.rs(ClockExt,Timed,TimedResult) — replaced byclock.stopwatch()directlyEVENT_FALLBACKandEVENT_REQUEST_MERGEDattribute constantsCacheTelemetryInner— span creation moved intoCacheTelemetrydirectlyNew attributes
FIELD_COALESCED— boolean flag for stampede protectionFIELD_FALLBACK— boolean flag for fallback tier consultationExamples
telemetry_subscriber— shows span + event output withtracing_subscriber::fmttelemetry_accumulator— demonstrates accumulating tier events into a single summary per operation usingCacheEventHandler+DashMap, mirroring a TVS-style consumer patternPerformance
Benchmarked in release mode (MockCache get, single tier):
Telemetry with no active subscriber adds ~10ns overhead. The
WithRequestIdwrapper adds ~300ns on the stampede protection path.