Skip to content

Commit

Permalink
drop value in rx
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed May 14, 2024
1 parent f5acc3c commit 76f4b49
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 31 deletions.
4 changes: 1 addition & 3 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ time = []
tokio-macros = { version = "~2.2.0", path = "../tokio-macros", optional = true }

pin-project-lite = "0.2.11"
rustversion = { git = "https://github.com/wenym1/rustversion", rev = "0b11410" }

# Everything else is optional...
bytes = { version = "1.0.0", optional = true }
Expand Down Expand Up @@ -153,8 +152,7 @@ wasm-bindgen-test = "0.3.0"
mio-aio = { version = "0.8.0", features = ["tokio"] }

[target.'cfg(loom)'.dev-dependencies]
loom = { git = "https://github.com/wenym1/loom", rev = "4edf9435", features = ["futures", "checkpoint"] }
#loom = { path = "/Users/william/repo/loom", features = ["futures", "checkpoint"] }
loom = { version = "0.7", features = ["futures", "checkpoint"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
52 changes: 24 additions & 28 deletions tokio/src/sync/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,32 +627,7 @@ impl<T> Sender<T> {
)
});

#[rustversion::since(1.70)]
fn consume_inner<T>(inner: Arc<Inner<T>>) -> Result<(), T> {
if let Some(inner) = Arc::into_inner(inner) {
if let Some(t) = inner.value.with_mut(|ptr| unsafe {
// SAFETY: we have successfully returned with `Some`, which means we are the
// only accessor to `ptr`.
//
// Note: value can be `None` even though we have previously set it as `Some`,
// because the value may have been consumed by receiver before we reach here.
(*ptr).take()
}) {
Err(t)
} else {
Ok(())
}
} else {
Ok(())
}
}

#[rustversion::before(1.70)]
fn consume_inner<T>(_inner: Arc<Inner<T>>) -> Result<(), T> {
Ok(())
}

consume_inner(inner)
Ok(())
}

/// Waits for the associated [`Receiver`] handle to close.
Expand Down Expand Up @@ -1097,7 +1072,16 @@ impl<T> Receiver<T> {
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
if let Some(inner) = self.inner.as_ref() {
inner.close();
let state = inner.close();

if state.is_complete() {
drop(unsafe {
// SAFETY: we have ensured that the `VALUE_SENT` bit has been set,
// so only the receiver can access the value.
inner.consume_value()
});
}

#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
Expand Down Expand Up @@ -1227,14 +1211,16 @@ impl<T> Inner<T> {
}

/// Called by `Receiver` to indicate that the value will never be received.
fn close(&self) {
fn close(&self) -> State {
let prev = State::set_closed(&self.state);

if prev.is_tx_task_set() && !prev.is_complete() {
unsafe {
self.tx_task.with_task(Waker::wake_by_ref);
}
}

prev
}

/// Consumes the value. This function does not check `state`.
Expand Down Expand Up @@ -1273,6 +1259,16 @@ impl<T> Drop for Inner<T> {
self.tx_task.drop_task();
}
}

unsafe {
// SAFETY: we have `&mut self`, and therefore we have
// exclusive access to the value.
//
// Note: the assertion holds because if the value has been sent by sender,
// we must ensure that the value must have been consumed by the receiver before
// dropping the `Inner`.
debug_assert!(self.consume_value().is_none());
}
}
}

Expand Down

0 comments on commit 76f4b49

Please sign in to comment.