Skip to content

Commit

Permalink
Fix leak and add loom test
Browse files Browse the repository at this point in the history
  • Loading branch information
rcoh committed May 8, 2023
1 parent a2e82a2 commit 54fadf8
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 21 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ tracing-subscriber = { version = "0.3.5", features = ["env-filter"] }
tokio = { version = "1", features = ["full"]}
proptest = "1.1.0"
env_logger = "0.10.0"
loom = "0.5"

[target.'cfg(loom)'.dependencies]
loom = "0.5"

[profile.release]
debug = 1
30 changes: 20 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tracing::span::{Attributes, Record};
use tracing::subscriber::set_global_default;
use tracing::{Event as TracingEvent, Id, Span, Subscriber};

use crate::tracker::Action;
use tracing_subscriber::layer::Context;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::{Layer, Registry};
Expand Down Expand Up @@ -412,15 +413,18 @@ impl TeXRayLayer {
&self,
span: &Id,
ctx: &Context<'a, S>,
f: impl Fn(&mut InterestTracker, Vec<Id>),
) where
f: impl Fn(&mut InterestTracker, Vec<Id>) -> Action,
) -> Option<InterestTracker>
where
S: Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
{
if let Some(path) = ctx.span_scope(span) {
self.tracker
.if_interested(path.from_root().map(|s| s.id()), |tracker, path| {
f(tracker, path.collect::<Vec<_>>())
});
})
} else {
None
}
}

Expand Down Expand Up @@ -468,14 +472,16 @@ where
check_initialized!(self);
self.for_tracker(id, &ctx, |tracker, path| {
tracker.new_span(path).record_metadata(attrs);
Action::DoNothing
});
}

fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
check_initialized!(self);
self.for_tracker(id, &ctx, |tracker, path| {
tracker.record_metadata(&path, values)
})
tracker.record_metadata(&path, values);
Action::DoNothing
});
}

fn on_event(&self, event: &TracingEvent<'_>, ctx: Context<'_, S>) {
Expand All @@ -487,6 +493,7 @@ where
event.record(&mut tracker.field_recorder(&mut metadata));
let tracked_event = EventInfo::now(metadata);
tracker.add_event(path, tracked_event);
Action::DoNothing
});
}
}
Expand All @@ -495,18 +502,21 @@ where
check_initialized!(self);
self.for_tracker(id, &ctx, |tracker, path| {
tracker.open(path, SpanInfo::for_span(id, &ctx));
Action::DoNothing
});
}

fn on_close(&self, id: Id, ctx: Context<'_, S>) {
check_initialized!(self);
self.for_tracker(&id, &ctx, |tracker, path| {
if let Some(removed_tracker) = self.for_tracker(&id, &ctx, |tracker, path| {
tracker.exit(path, SystemTime::now());
if self.tracker.end_tracking(id.clone()) {
let _ = tracker
.dump()
.map_err(|err| eprintln!("failed to dump output: {}", err));
Action::ForgetSpan
} else {
Action::DoNothing
}
});
}) {
let _ = removed_tracker.dump();
}
}
}
65 changes: 61 additions & 4 deletions src/tracked_spans.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
use std::fmt::{Debug, Formatter};
use std::num::NonZeroU64;
use sync::{AtomicU64, Ordering};

use std::sync::atomic::{AtomicU64, Ordering};
mod sync {
#[cfg(loom)]
pub(crate) use loom::sync::atomic::{AtomicU64, Ordering};

#[cfg(not(loom))]
pub(crate) use std::sync::atomic::{AtomicU64, Ordering};
}

/// Lock-free hashset that can hold a fixed number of U64s
///
Expand Down Expand Up @@ -68,13 +75,13 @@ impl TrackedSpans {
while attempt < self.size() {
let idx = self.hash(value, attempt);
let atomic = self.els.get(idx).expect("idx guaranteed to be less");
let old_val = atomic.load(Ordering::SeqCst);
let old_val = atomic.load(Ordering::Relaxed);
if old_val == value {
return Ok(InsertResult::AlreadyPresent);
}
if (old_val == 0 || old_val == TOMBSTONE)
&& atomic
.compare_exchange(old_val, value, Ordering::SeqCst, Ordering::SeqCst)
.compare_exchange(old_val, value, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
return Ok(InsertResult::NotPresent);
Expand All @@ -94,7 +101,7 @@ impl TrackedSpans {
while attempt < self.size() {
let idx = self.hash(value, attempt);
let atomic = self.els.get(idx).expect("idx guaranteed to be less");
let stored_value = atomic.load(Ordering::Acquire);
let stored_value = atomic.load(Ordering::Relaxed);
match stored_value {
0 => return None,
v if v == value => return Some(idx),
Expand Down Expand Up @@ -170,7 +177,57 @@ mod test {
assert!(!set.contains(nz(TOMBSTONE)));
}

#[test]
#[cfg(loom)]
fn test_concurrent_usage() {
let collection_size = 3;
tracing_subscriber::fmt::init();
loom::model(move || {
let tracked_spans = loom::sync::Arc::new(TrackedSpans::new(collection_size));
let second_structure = loom::sync::Arc::new(loom::sync::RwLock::new(HashSet::new()));
let mut threads = vec![];
for t in 0..2 {
let thread_copy = tracked_spans.clone();
let map_copy = second_structure.clone();
threads.push(loom::thread::spawn(move || {
let mut range: Box<dyn Iterator<Item = u64>> = Box::new(1..10);
if t % 2 == 0 {
range = Box::new((1..10).rev());
}
for i in range.take(2) {
if thread_copy.contains(nz(i)) {
assert!(map_copy.read().unwrap().contains(&i));
}
let mut guard = map_copy.write().unwrap();
guard.insert(i);
drop(guard);
if thread_copy.insert(nz(i)).is_ok() {
assert!(thread_copy.contains(nz(i)));
}
}
}));
}
let thread_copy = tracked_spans.clone();
let map_copy = second_structure.clone();
threads.push(loom::thread::spawn(move || {
for i in 1..5 {
if thread_copy.contains(nz(i)) {
assert!(map_copy.read().unwrap().contains(&i));
}
}
}));
for handle in threads {
handle.join().unwrap();
}
assert_eq!(
(1..10).filter(|i| tracked_spans.contains(nz(*i))).count(),
collection_size - 1
);
})
}

use proptest::prelude::*;

proptest! {
#[test]
fn test_insertion(values in prop::collection::vec(1..u64::MAX, 1..100), checks in prop::collection::vec(1..u64::MAX, 1..1000)) {
Expand Down
21 changes: 14 additions & 7 deletions src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ use tracing_subscriber::registry::LookupSpan;
const NESTED_EVENT_OFFSET: usize = 2;
const DURATION_WIDTH: usize = 6;

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub(crate) enum Action {
ForgetSpan,
DoNothing,
}

#[derive(Debug, Clone)]
pub(crate) struct EventInfo {
timestamp: SystemTime,
Expand Down Expand Up @@ -430,6 +436,7 @@ impl RootTracker {
}
}

/// Returns true if this span was tracked
pub(crate) fn end_tracking(&self, id: Id) -> bool {
self.examined_spans.remove(id.into_non_zero_u64())
}
Expand All @@ -447,25 +454,25 @@ impl RootTracker {
);
}

pub(crate) fn if_interested<T>(
pub(crate) fn if_interested(
&self,
ids: impl Iterator<Item = Id>,
f: impl Fn(&mut InterestTracker, &mut dyn Iterator<Item = Id>) -> T,
) -> Option<T> {
f: impl Fn(&mut InterestTracker, &mut dyn Iterator<Item = Id>) -> Action,
) -> Option<InterestTracker> {
let mut iter = ids.skip_while(|id| !self.examined_spans.contains(id.into_non_zero_u64()));
if let Some(root) = iter.next() {
assert!(self.examined_spans.contains(root.into_non_zero_u64()));
let mut tracker = self.span_metadata.write();
let mut with_root = iter::once(root.clone()).chain(iter);
if let Some(span_tracker) = tracker.get_mut(&root) {
Some(f(span_tracker, &mut with_root))
if f(span_tracker, &mut with_root) == Action::ForgetSpan {
return tracker.remove(&root);
}
} else {
eprintln!("This is a bug–span tracker could not be found");
None
}
} else {
None
}
None
}
}

Expand Down

0 comments on commit 54fadf8

Please sign in to comment.