-
Notifications
You must be signed in to change notification settings - Fork 18
feat(cachet): structured telemetry with spans, events, and handler API #460
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
schgoo
wants to merge
27
commits into
main
Choose a base branch
from
u/schgoo/coalescemetrics
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
0c4cd31
Fix fallback stampede protection tests
schgoo 3b4c240
Revert "Fix fallback stampede protection tests"
schgoo 43a3881
Request coalescing telemetry
schgoo cd35c72
CI fixes
schgoo cd8d893
CI fixes
schgoo fbe2a1e
Merge with main
schgoo 1b89e36
Fix broken cachet_memory tests
schgoo b02b890
CI fixes
schgoo aee1c56
Increase code coverage
schgoo d008140
Merge with main
schgoo 31ded78
Review fixes
schgoo 012ddbf
Fix miri issue
schgoo 5794cb7
Comment fixes
schgoo b9aa175
More comment fixes
schgoo 80433b7
Merge with main
schgoo 0111148
PR comment fixes
schgoo 25a2e3a
Mutant coverage
schgoo 7b05348
Comment fixeus
schgoo 2cb02ae
Fix mutants
schgoo bf3586e
Remove spans, they are not really necessary, especially with request …
schgoo 356017f
Comment fixes
schgoo bbd6591
Merge with main
schgoo 9100bfc
Comment fixes
schgoo 18c80c8
Fix broken docs
schgoo d0410d9
Fix PR comments
schgoo 7f2d29e
Make tracing optional on logs
schgoo b9c44e8
PR comment fixes
schgoo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -358,6 +358,7 @@ mockable | |
| mockall | ||
| modularity | ||
| moka | ||
| moka's | ||
| monomorphization | ||
| monomorphize | ||
| monomorphized | ||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,173 @@ | ||
| // Copyright (c) Microsoft Corporation. | ||
| // Licensed under the MIT License. | ||
|
|
||
| //! Demonstrates accumulating cachet telemetry into a single summary | ||
| //! per cache operation, correlated by `request_id`. | ||
| //! | ||
| //! This pattern mirrors how a TVS-style consumer would collect tier | ||
| //! outcomes, latencies, and flags into one log row per request. | ||
| //! | ||
| //! Uses `DashMap` for concurrent, low-contention accumulation — safe across | ||
| //! all async runtimes, including work-stealing (tokio) and thread-per-core | ||
| //! (oxidizer), even if a task migrates between cores mid-operation. | ||
| //! | ||
| //! Run with: `cargo run --example telemetry_accumulator --features "memory,logs"` | ||
|
|
||
| use std::time::Duration; | ||
|
|
||
| use cachet::telemetry::handler::{CacheEventHandler, CacheOperationEvent, CacheTierEvent, RequestId}; | ||
| use cachet::{Cache, CacheEntry}; | ||
| use dashmap::DashMap; | ||
| use tick::Clock; | ||
|
|
||
| // --------------------------------------------------------------------------- | ||
| // Accumulated state — one entry per in-flight operation, keyed by request_id | ||
| // --------------------------------------------------------------------------- | ||
|
|
||
| #[derive(Debug)] | ||
| struct TierRecord { | ||
| tier_name: String, | ||
| outcome: String, | ||
| duration_us: u64, | ||
| fallback: bool, | ||
| } | ||
|
|
||
| /// Handler that accumulates tier events per `request_id` and prints a | ||
| /// one-line summary when the operation completes. | ||
| /// | ||
| /// `DashMap` shards the map internally so concurrent operations on | ||
| /// different cores rarely contend. | ||
| struct AccumulatingHandler { | ||
| pending: DashMap<RequestId, Vec<TierRecord>>, | ||
| } | ||
|
|
||
| impl AccumulatingHandler { | ||
| fn new() -> Self { | ||
| Self { pending: DashMap::new() } | ||
| } | ||
| } | ||
|
|
||
| impl CacheEventHandler for AccumulatingHandler { | ||
| fn on_tier_event(&self, event: &CacheTierEvent<'_>) { | ||
| // Eviction events have request_id > 0 when triggered synchronously | ||
| // during an insert (capacity overflow). Background maintenance | ||
| // evictions have request_id == 0. | ||
| self.pending.entry(event.request_id).or_default().push(TierRecord { | ||
| tier_name: event.tier_name.to_owned(), | ||
| outcome: event.outcome.to_owned(), | ||
| duration_us: u64::try_from(event.duration.as_micros()).unwrap_or(u64::MAX), | ||
| fallback: event.fallback, | ||
| }); | ||
| } | ||
|
|
||
| fn on_operation_complete(&self, event: &CacheOperationEvent<'_>) { | ||
| let tiers = self.pending.remove(&event.request_id).map(|(_, v)| v).unwrap_or_default(); | ||
|
|
||
| // --- Build the summary line --- | ||
| // A TVS consumer would pack these into a bitfield here. | ||
|
|
||
| let mut flags = Vec::new(); | ||
| if event.coalesced { | ||
| flags.push("COALESCED"); | ||
| } | ||
| if tiers.iter().any(|t| t.fallback) { | ||
| flags.push("FALLBACK"); | ||
| } | ||
| let flags_str = if flags.is_empty() { | ||
| String::new() | ||
| } else { | ||
| format!(" [{}]", flags.join(", ")) | ||
| }; | ||
|
|
||
| // Final outcome = last tier's outcome | ||
| let outcome = tiers.last().map_or("?", |t| t.outcome.as_str()); | ||
|
|
||
| print!( | ||
| "[{}] {} -> {} ({}us total){flags_str}", | ||
| event.cache_name, | ||
| event.operation, | ||
| outcome, | ||
| event.duration.as_micros(), | ||
| ); | ||
|
|
||
| // Per-tier breakdown for multi-tier caches | ||
| if tiers.len() > 1 { | ||
| print!(" | "); | ||
| for (i, tier) in tiers.iter().enumerate() { | ||
| if i > 0 { | ||
| print!(", "); | ||
| } | ||
| print!("{}={} ({}us)", tier.tier_name, tier.outcome, tier.duration_us); | ||
| } | ||
| } | ||
|
|
||
| println!(); | ||
| } | ||
| } | ||
|
|
||
| // --------------------------------------------------------------------------- | ||
| // Main | ||
| // --------------------------------------------------------------------------- | ||
|
|
||
| #[tokio::main] | ||
| async fn main() { | ||
| let clock = Clock::new_tokio(); | ||
|
|
||
| // Single-tier cache | ||
| println!("=== Single-tier cache ==="); | ||
| let cache: Cache<String, String> = Cache::builder(clock.clone()) | ||
| .memory() | ||
| .name("single") | ||
| .event_handler(AccumulatingHandler::new()) | ||
| .build(); | ||
|
|
||
| cache | ||
| .insert("key".to_string(), CacheEntry::new("value".to_string())) | ||
| .await | ||
| .expect("insert should succeed"); | ||
| let _ = cache.get(&"key".to_string()).await; | ||
| let _ = cache.get(&"missing".to_string()).await; | ||
|
|
||
| // Two-tier cache with fallback | ||
| println!("\n=== Two-tier cache (L1 -> L2) ==="); | ||
| let l2 = Cache::builder::<String, String>(clock.clone()).memory().name("l2"); | ||
| let cache2: Cache<String, String> = Cache::builder(clock) | ||
| .memory() | ||
| .name("l1") | ||
| .ttl(Duration::from_secs(30)) | ||
| .event_handler(AccumulatingHandler::new()) | ||
| .fallback(l2) | ||
| .build(); | ||
|
|
||
| cache2 | ||
| .insert("user:1".to_string(), CacheEntry::new("Alice".to_string())) | ||
| .await | ||
| .expect("insert should succeed"); | ||
| let _ = cache2.get(&"user:1".to_string()).await; | ||
| let _ = cache2.get(&"nobody".to_string()).await; | ||
|
|
||
| // Capacity-limited cache — evictions correlated with inserts | ||
| println!("\n=== Capacity-limited cache (max 2 entries) ==="); | ||
| let cache3: Cache<String, String> = Cache::builder(Clock::new_tokio()) | ||
| .memory_with(|b| b.max_capacity(2).with_eviction_telemetry()) | ||
| .name("tiny") | ||
| .event_handler(AccumulatingHandler::new()) | ||
| .build(); | ||
|
|
||
| // Fill to capacity | ||
| cache3 | ||
| .insert("a".to_string(), CacheEntry::new("1".to_string())) | ||
| .await | ||
| .expect("insert should succeed"); | ||
| cache3 | ||
| .insert("b".to_string(), CacheEntry::new("2".to_string())) | ||
| .await | ||
| .expect("insert should succeed"); | ||
| // This insert may trigger an eviction — the eviction event will carry | ||
| // the same request_id as the insert, so the accumulator sees both the | ||
| // insert and the eviction in one summary. | ||
| cache3 | ||
| .insert("c".to_string(), CacheEntry::new("3".to_string())) | ||
| .await | ||
| .expect("insert should succeed"); | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.