Skip to content

Commit

Permalink
Merge pull request #7 from rcoh/fix-leak
Browse files Browse the repository at this point in the history
Fix leak
  • Loading branch information
rcoh authored May 8, 2023
2 parents a2e82a2 + 95a8603 commit d3115fa
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 28 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ tracing-subscriber = { version = "0.3.5", features = ["env-filter"] }
tokio = { version = "1", features = ["full"]}
proptest = "1.1.0"
env_logger = "0.10.0"
loom = "0.5"

[target.'cfg(loom)'.dependencies]
loom = "0.5"

[profile.release]
debug = 1
30 changes: 20 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tracing::span::{Attributes, Record};
use tracing::subscriber::set_global_default;
use tracing::{Event as TracingEvent, Id, Span, Subscriber};

use crate::tracker::Action;
use tracing_subscriber::layer::Context;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::{Layer, Registry};
Expand Down Expand Up @@ -412,15 +413,18 @@ impl TeXRayLayer {
&self,
span: &Id,
ctx: &Context<'a, S>,
f: impl Fn(&mut InterestTracker, Vec<Id>),
) where
f: impl Fn(&mut InterestTracker, Vec<Id>) -> Action,
) -> Option<InterestTracker>
where
S: Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
{
if let Some(path) = ctx.span_scope(span) {
self.tracker
.if_interested(path.from_root().map(|s| s.id()), |tracker, path| {
f(tracker, path.collect::<Vec<_>>())
});
})
} else {
None
}
}

Expand Down Expand Up @@ -468,14 +472,16 @@ where
check_initialized!(self);
self.for_tracker(id, &ctx, |tracker, path| {
tracker.new_span(path).record_metadata(attrs);
Action::DoNothing
});
}

fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
check_initialized!(self);
self.for_tracker(id, &ctx, |tracker, path| {
tracker.record_metadata(&path, values)
})
tracker.record_metadata(&path, values);
Action::DoNothing
});
}

fn on_event(&self, event: &TracingEvent<'_>, ctx: Context<'_, S>) {
Expand All @@ -487,6 +493,7 @@ where
event.record(&mut tracker.field_recorder(&mut metadata));
let tracked_event = EventInfo::now(metadata);
tracker.add_event(path, tracked_event);
Action::DoNothing
});
}
}
Expand All @@ -495,18 +502,21 @@ where
check_initialized!(self);
self.for_tracker(id, &ctx, |tracker, path| {
tracker.open(path, SpanInfo::for_span(id, &ctx));
Action::DoNothing
});
}

fn on_close(&self, id: Id, ctx: Context<'_, S>) {
check_initialized!(self);
self.for_tracker(&id, &ctx, |tracker, path| {
if let Some(removed_tracker) = self.for_tracker(&id, &ctx, |tracker, path| {
tracker.exit(path, SystemTime::now());
if self.tracker.end_tracking(id.clone()) {
let _ = tracker
.dump()
.map_err(|err| eprintln!("failed to dump output: {}", err));
Action::ForgetSpan
} else {
Action::DoNothing
}
});
}) {
let _ = removed_tracker.dump();
}
}
}
90 changes: 79 additions & 11 deletions src/tracked_spans.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
use std::fmt::{Debug, Formatter};
use std::num::NonZeroU64;
use sync::{AtomicU64, Ordering};

use std::sync::atomic::{AtomicU64, Ordering};
mod sync {
#[cfg(loom)]
pub(crate) use loom::sync::atomic::{AtomicU64, Ordering};

#[cfg(not(loom))]
pub(crate) use std::sync::atomic::{AtomicU64, Ordering};
}

/// Lock-free hashset that can hold a fixed number of U64s
///
Expand Down Expand Up @@ -40,16 +47,20 @@ impl TrackedSpans {
Self { els: storage }
}

fn tombstone_nel(&self) -> usize {
self.els.len() - 1
}

fn hash(&self, value: u64, attempt: usize) -> usize {
// to store the TOMBSTONE value, we reserve the final slot in the array to hold it,
// if present
if value == TOMBSTONE {
if attempt != 0 {
unreachable!("tombstone will never fail if missing")
}
self.els.len() - 1
self.tombstone_nel()
} else {
((value + attempt as u64) % (self.els.len() as u64 - 1)) as usize
((value + attempt as u64) % (self.size() as u64)) as usize
}
}

Expand All @@ -61,20 +72,20 @@ impl TrackedSpans {
///
/// If the value was able to be inserted:
/// - Some(false) will be returned if the value was already present
/// - Some(true) will be returne
/// - Some(true) will be returned
pub(crate) fn insert(&self, value: NonZeroU64) -> Result<InsertResult, MapFull> {
let value = value.get();
let mut attempt = 0_usize;
while attempt < self.size() {
let idx = self.hash(value, attempt);
let atomic = self.els.get(idx).expect("idx guaranteed to be less");
let old_val = atomic.load(Ordering::SeqCst);
let old_val = atomic.load(Ordering::Relaxed);
if old_val == value {
return Ok(InsertResult::AlreadyPresent);
}
if (old_val == 0 || old_val == TOMBSTONE)
&& atomic
.compare_exchange(old_val, value, Ordering::SeqCst, Ordering::SeqCst)
.compare_exchange(old_val, value, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
return Ok(InsertResult::NotPresent);
Expand All @@ -85,16 +96,16 @@ impl TrackedSpans {
}

pub(crate) fn contains(&self, value: NonZeroU64) -> bool {
self.idx(value).is_some()
self.index_of(value).is_some()
}

fn idx(&self, value: NonZeroU64) -> Option<usize> {
fn index_of(&self, value: NonZeroU64) -> Option<usize> {
let value = value.get();
let mut attempt = 0;
while attempt < self.size() {
let idx = self.hash(value, attempt);
let atomic = self.els.get(idx).expect("idx guaranteed to be less");
let stored_value = atomic.load(Ordering::Acquire);
let stored_value = atomic.load(Ordering::Relaxed);
match stored_value {
0 => return None,
v if v == value => return Some(idx),
Expand All @@ -105,10 +116,10 @@ impl TrackedSpans {
}

pub(crate) fn remove(&self, value: NonZeroU64) -> bool {
if let Some(idx) = self.idx(value) {
if let Some(idx) = self.index_of(value) {
// if we've already removed that value, no worries
let new_value = match value.get() {
TOMBSTONE => 0,
TOMBSTONE if idx == self.tombstone_nel() => 0,
_ => TOMBSTONE,
};
self.els[idx]
Expand Down Expand Up @@ -155,6 +166,12 @@ mod test {
set.remove(nz(1));
set.insert(nz(1000)).expect("space now");
assert!(set.contains(nz(1000)));

for _ in 0..1000 {
set.remove(nz(1000));
set.insert(nz(1000)).expect("space now");
}
assert!(set.contains(nz(1000)));
}

#[test]
Expand All @@ -167,10 +184,61 @@ mod test {
set.insert(nz(TOMBSTONE)).unwrap();
assert!(set.contains(nz(TOMBSTONE)));
assert!(set.remove(nz(TOMBSTONE)));
assert!(!set.remove(nz(TOMBSTONE)));
assert!(!set.contains(nz(TOMBSTONE)));
}

#[test]
#[cfg(loom)]
fn test_concurrent_usage() {
let collection_size = 3;
tracing_subscriber::fmt::init();
loom::model(move || {
let tracked_spans = loom::sync::Arc::new(TrackedSpans::new(collection_size));
let second_structure = loom::sync::Arc::new(loom::sync::RwLock::new(HashSet::new()));
let mut threads = vec![];
for t in 0..2 {
let thread_copy = tracked_spans.clone();
let map_copy = second_structure.clone();
threads.push(loom::thread::spawn(move || {
let mut range: Box<dyn Iterator<Item = u64>> = Box::new(1..10);
if t % 2 == 0 {
range = Box::new((1..10).rev());
}
for i in range.take(2) {
if thread_copy.contains(nz(i)) {
assert!(map_copy.read().unwrap().contains(&i));
}
let mut guard = map_copy.write().unwrap();
guard.insert(i);
drop(guard);
if thread_copy.insert(nz(i)).is_ok() {
assert!(thread_copy.contains(nz(i)));
}
}
}));
}
let thread_copy = tracked_spans.clone();
let map_copy = second_structure.clone();
threads.push(loom::thread::spawn(move || {
for i in 1..5 {
if thread_copy.contains(nz(i)) {
assert!(map_copy.read().unwrap().contains(&i));
}
}
}));
for handle in threads {
handle.join().unwrap();
}
assert_eq!(
(1..10).filter(|i| tracked_spans.contains(nz(*i))).count(),
collection_size - 1
);
})
}

use proptest::prelude::*;

proptest! {
#[test]
fn test_insertion(values in prop::collection::vec(1..u64::MAX, 1..100), checks in prop::collection::vec(1..u64::MAX, 1..1000)) {
Expand Down
21 changes: 14 additions & 7 deletions src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ use tracing_subscriber::registry::LookupSpan;
const NESTED_EVENT_OFFSET: usize = 2;
const DURATION_WIDTH: usize = 6;

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub(crate) enum Action {
ForgetSpan,
DoNothing,
}

#[derive(Debug, Clone)]
pub(crate) struct EventInfo {
timestamp: SystemTime,
Expand Down Expand Up @@ -430,6 +436,7 @@ impl RootTracker {
}
}

/// Returns true if this span was tracked
pub(crate) fn end_tracking(&self, id: Id) -> bool {
self.examined_spans.remove(id.into_non_zero_u64())
}
Expand All @@ -447,25 +454,25 @@ impl RootTracker {
);
}

pub(crate) fn if_interested<T>(
pub(crate) fn if_interested(
&self,
ids: impl Iterator<Item = Id>,
f: impl Fn(&mut InterestTracker, &mut dyn Iterator<Item = Id>) -> T,
) -> Option<T> {
f: impl Fn(&mut InterestTracker, &mut dyn Iterator<Item = Id>) -> Action,
) -> Option<InterestTracker> {
let mut iter = ids.skip_while(|id| !self.examined_spans.contains(id.into_non_zero_u64()));
if let Some(root) = iter.next() {
assert!(self.examined_spans.contains(root.into_non_zero_u64()));
let mut tracker = self.span_metadata.write();
let mut with_root = iter::once(root.clone()).chain(iter);
if let Some(span_tracker) = tracker.get_mut(&root) {
Some(f(span_tracker, &mut with_root))
if f(span_tracker, &mut with_root) == Action::ForgetSpan {
return tracker.remove(&root);
}
} else {
eprintln!("This is a bug–span tracker could not be found");
None
}
} else {
None
}
None
}
}

Expand Down

0 comments on commit d3115fa

Please sign in to comment.