diff --git a/eyeball-im-util/src/vector/traits.rs b/eyeball-im-util/src/vector/traits.rs index e24c705..5d217e0 100644 --- a/eyeball-im-util/src/vector/traits.rs +++ b/eyeball-im-util/src/vector/traits.rs @@ -22,18 +22,18 @@ pub trait VectorDiffContainer: { /// The element type of the [`Vector`][imbl::Vector] that diffs are being /// handled for. - type Element: Clone + Send + Sync + 'static; + type Element: Clone + 'static; #[doc(hidden)] type Family: VectorDiffContainerFamily = Self>; } -impl VectorDiffContainer for VectorDiff { +impl VectorDiffContainer for VectorDiff { type Element = T; type Family = VectorDiffFamily; } -impl VectorDiffContainer for Vec> { +impl VectorDiffContainer for Vec> { type Element = T; type Family = VecVectorDiffFamily; } @@ -69,7 +69,7 @@ pub trait VectorObserver: Sized { fn into_parts(self) -> (Vector, Self::Stream); } -impl VectorObserver for VectorSubscriber { +impl VectorObserver for VectorSubscriber { type Stream = VectorSubscriberStream; fn into_parts(self) -> (Vector, Self::Stream) { @@ -77,7 +77,7 @@ impl VectorObserver for VectorSubscriber } } -impl VectorObserver for BatchedVectorSubscriber { +impl VectorObserver for BatchedVectorSubscriber { type Stream = VectorSubscriberBatchedStream; fn into_parts(self) -> (Vector, Self::Stream) { @@ -102,7 +102,7 @@ where /// See that trait for which types implement this. pub trait VectorObserverExt: VectorObserver where - T: Clone + Send + Sync + 'static, + T: Clone + 'static, ::Item: VectorDiffContainer, { /// Filter the vector's values with the given function. @@ -197,7 +197,7 @@ where impl VectorObserverExt for O where - T: Clone + Send + Sync + 'static, + T: Clone + 'static, O: VectorObserver, ::Item: VectorDiffContainer, { diff --git a/eyeball-im/Cargo.toml b/eyeball-im/Cargo.toml index 292c789..d031f27 100644 --- a/eyeball-im/Cargo.toml +++ b/eyeball-im/Cargo.toml @@ -17,7 +17,6 @@ futures-core.workspace = true imbl = "3.0.0" serde = { version = "1.0", optional = true } tokio.workspace = true -tokio-util.workspace = true tracing = { workspace = true, optional = true } [dev-dependencies] diff --git a/eyeball-im/src/lib.rs b/eyeball-im/src/lib.rs index 628e47a..526a592 100644 --- a/eyeball-im/src/lib.rs +++ b/eyeball-im/src/lib.rs @@ -4,6 +4,7 @@ //! //! - `tracing`: Emit [tracing] events when updates are sent out +mod reusable_box; mod vector; pub use vector::{ diff --git a/eyeball-im/src/reusable_box.rs b/eyeball-im/src/reusable_box.rs new file mode 100644 index 0000000..81a82ce --- /dev/null +++ b/eyeball-im/src/reusable_box.rs @@ -0,0 +1,161 @@ +// Copy-pasted from https://docs.rs/tokio-util/latest/src/tokio_util/sync/reusable_box.rs.html +// Removed all `+ Send`s. + +use std::{ + alloc::Layout, + fmt, + future::{self, Future}, + mem::{self, ManuallyDrop}, + pin::Pin, + ptr, + task::{Context, Poll}, +}; + +/// A reusable `Pin + 'a>>`. +/// +/// This type lets you replace the future stored in the box without +/// reallocating when the size and alignment permits this. +pub(crate) struct ReusableBoxFuture<'a, T> { + boxed: Pin + 'a>>, +} + +impl<'a, T> ReusableBoxFuture<'a, T> { + /// Create a new `ReusableBoxFuture` containing the provided future. + pub(crate) fn new(future: F) -> Self + where + F: Future + 'a, + { + Self { boxed: Box::pin(future) } + } + + /// Replace the future currently stored in this box. + /// + /// This reallocates if and only if the layout of the provided future is + /// different from the layout of the currently stored future. + pub(crate) fn set(&mut self, future: F) + where + F: Future + 'a, + { + if let Err(future) = self.try_set(future) { + *self = Self::new(future); + } + } + + /// Replace the future currently stored in this box. + /// + /// This function never reallocates, but returns an error if the provided + /// future has a different size or alignment from the currently stored + /// future. + pub(crate) fn try_set(&mut self, future: F) -> Result<(), F> + where + F: Future + 'a, + { + // If we try to inline the contents of this function, the type checker complains + // because the bound `T: 'a` is not satisfied in the call to + // `pending()`. But by putting it in an inner function that doesn't have + // `T` as a generic parameter, we implicitly get the bound `F::Output: + // 'a` transitively through `F: 'a`, allowing us to call `pending()`. + #[inline(always)] + fn real_try_set<'a, F>( + this: &mut ReusableBoxFuture<'a, F::Output>, + future: F, + ) -> Result<(), F> + where + F: Future + 'a, + { + // future::Pending is a ZST so this never allocates. + let boxed = mem::replace(&mut this.boxed, Box::pin(future::pending())); + reuse_pin_box(boxed, future, |boxed| this.boxed = Pin::from(boxed)) + } + + real_try_set(self, future) + } + + /// Get a pinned reference to the underlying future. + pub(crate) fn get_pin(&mut self) -> Pin<&mut (dyn Future)> { + self.boxed.as_mut() + } + + /// Poll the future stored inside this box. + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + self.get_pin().poll(cx) + } +} + +impl Future for ReusableBoxFuture<'_, T> { + type Output = T; + + /// Poll the future stored inside this box. + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::into_inner(self).get_pin().poll(cx) + } +} + +// The only method called on self.boxed is poll, which takes &mut self, so this +// struct being Sync does not permit any invalid access to the Future, even if +// the future is not Sync. +unsafe impl Sync for ReusableBoxFuture<'_, T> {} + +impl fmt::Debug for ReusableBoxFuture<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReusableBoxFuture").finish() + } +} + +fn reuse_pin_box(boxed: Pin>, new_value: U, callback: F) -> Result +where + F: FnOnce(Box) -> O, +{ + let layout = Layout::for_value::(&*boxed); + if layout != Layout::new::() { + return Err(new_value); + } + + // SAFETY: We don't ever construct a non-pinned reference to the old `T` from + // now on, and we always drop the `T`. + let raw: *mut T = Box::into_raw(unsafe { Pin::into_inner_unchecked(boxed) }); + + // When dropping the old value panics, we still want to call `callback` — so + // move the rest of the code into a guard type. + let guard = CallOnDrop::new(|| { + let raw: *mut U = raw.cast::(); + unsafe { raw.write(new_value) }; + + // SAFETY: + // - `T` and `U` have the same layout. + // - `raw` comes from a `Box` that uses the same allocator as this one. + // - `raw` points to a valid instance of `U` (we just wrote it in). + let boxed = unsafe { Box::from_raw(raw) }; + + callback(boxed) + }); + + // Drop the old value. + unsafe { ptr::drop_in_place(raw) }; + + // Run the rest of the code. + Ok(guard.call()) +} + +struct CallOnDrop O> { + f: ManuallyDrop, +} + +impl O> CallOnDrop { + fn new(f: F) -> Self { + let f = ManuallyDrop::new(f); + Self { f } + } + fn call(self) -> O { + let mut this = ManuallyDrop::new(self); + let f = unsafe { ManuallyDrop::take(&mut this.f) }; + f() + } +} + +impl O> Drop for CallOnDrop { + fn drop(&mut self) { + let f = unsafe { ManuallyDrop::take(&mut self.f) }; + f(); + } +} diff --git a/eyeball-im/src/vector.rs b/eyeball-im/src/vector.rs index 27f3598..66778c6 100644 --- a/eyeball-im/src/vector.rs +++ b/eyeball-im/src/vector.rs @@ -22,7 +22,7 @@ pub struct ObservableVector { sender: Sender>, } -impl ObservableVector { +impl ObservableVector { /// Create a new `ObservableVector`. /// /// As of the time of writing, this is equivalent to @@ -290,7 +290,7 @@ impl ObservableVector { } } -impl Default for ObservableVector { +impl Default for ObservableVector { fn default() -> Self { Self::new() } @@ -315,7 +315,7 @@ impl ops::Deref for ObservableVector { } } -impl From> for ObservableVector { +impl From> for ObservableVector { fn from(values: Vector) -> Self { let mut this = Self::new(); this.append(values); diff --git a/eyeball-im/src/vector/entry.rs b/eyeball-im/src/vector/entry.rs index cf6b117..2cae284 100644 --- a/eyeball-im/src/vector/entry.rs +++ b/eyeball-im/src/vector/entry.rs @@ -10,7 +10,7 @@ pub struct ObservableVectorEntry<'a, T> { impl<'a, T> ObservableVectorEntry<'a, T> where - T: Clone + Send + Sync + 'static, + T: Clone + 'static, { pub(super) fn new(inner: &'a mut ObservableVector, index: usize) -> Self { Self { inner, index: EntryIndex::Owned(index) } @@ -115,7 +115,7 @@ pub struct ObservableVectorEntries<'a, T> { impl<'a, T> ObservableVectorEntries<'a, T> where - T: Clone + Send + Sync + 'static, + T: Clone + 'static, { pub(super) fn new(inner: &'a mut ObservableVector) -> Self { Self { inner, index: 0 } diff --git a/eyeball-im/src/vector/subscriber.rs b/eyeball-im/src/vector/subscriber.rs index a2d44e2..b2d69cf 100644 --- a/eyeball-im/src/vector/subscriber.rs +++ b/eyeball-im/src/vector/subscriber.rs @@ -1,4 +1,5 @@ use std::{ + fmt, hint::unreachable_unchecked, mem, pin::Pin, @@ -6,13 +7,14 @@ use std::{ vec, }; +use crate::reusable_box::ReusableBoxFuture; use futures_core::Stream; use imbl::Vector; use tokio::sync::broadcast::{ + self, error::{RecvError, TryRecvError}, Receiver, }; -use tokio_util::sync::ReusableBoxFuture; #[cfg(feature = "tracing")] use tracing::info; @@ -25,7 +27,7 @@ pub struct VectorSubscriber { rx: Receiver>, } -impl VectorSubscriber { +impl VectorSubscriber { pub(super) fn new(items: Vector, rx: Receiver>) -> Self { Self { values: items, rx } } @@ -38,12 +40,12 @@ impl VectorSubscriber { /// Turn this `VectorSubcriber` into a stream of `VectorDiff`s. pub fn into_stream(self) -> VectorSubscriberStream { - VectorSubscriberStream::new(ReusableBoxFuture::new(make_future(self.rx))) + VectorSubscriberStream::new(ReusableBoxRecvFuture::new(self.rx)) } /// Turn this `VectorSubcriber` into a stream of `Vec`s. pub fn into_batched_stream(self) -> VectorSubscriberBatchedStream { - VectorSubscriberBatchedStream::new(ReusableBoxFuture::new(make_future(self.rx))) + VectorSubscriberBatchedStream::new(ReusableBoxRecvFuture::new(self.rx)) } /// Destructure this `VectorSubscriber` into the initial values and a stream @@ -53,7 +55,7 @@ impl VectorSubscriber { /// separately, but guarantees that the values are not unnecessarily cloned. pub fn into_values_and_stream(self) -> (Vector, VectorSubscriberStream) { let Self { values, rx } = self; - (values, VectorSubscriberStream::new(ReusableBoxFuture::new(make_future(rx)))) + (values, VectorSubscriberStream::new(ReusableBoxRecvFuture::new(rx))) } /// Destructure this `VectorSubscriber` into the initial values and a stream @@ -64,13 +66,10 @@ impl VectorSubscriber { /// are not unnecessarily cloned. pub fn into_values_and_batched_stream(self) -> (Vector, VectorSubscriberBatchedStream) { let Self { values, rx } = self; - (values, VectorSubscriberBatchedStream::new(ReusableBoxFuture::new(make_future(rx)))) + (values, VectorSubscriberBatchedStream::new(ReusableBoxRecvFuture::new(rx))) } } -type ReusableBoxRecvFuture = - ReusableBoxFuture<'static, SubscriberFutureReturn>>; - /// A stream of `VectorDiff`s created from a [`VectorSubscriber`]. /// /// Use its [`Stream`] implementation to interact with it (futures-util and @@ -100,7 +99,7 @@ enum VectorSubscriberStreamState { // Not clear why this explicit impl is needed, but it's not unsafe so it is fine impl Unpin for VectorSubscriberStreamState {} -impl Stream for VectorSubscriberStream { +impl Stream for VectorSubscriberStream { type Item = VectorDiff; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -130,7 +129,7 @@ impl Stream for VectorSubscriberStream { } }; - self.inner.set(make_future(rx)); + self.inner.set(rx); poll } VectorSubscriberStreamState::YieldBatch { iter, .. } => { @@ -146,7 +145,7 @@ impl Stream for VectorSubscriberStream { _ => unsafe { unreachable_unchecked() }, }; - self.inner.set(make_future(rx)); + self.inner.set(rx); } Poll::Ready(Some(diff)) @@ -171,7 +170,7 @@ impl VectorSubscriberBatchedStream { } } -impl Stream for VectorSubscriberBatchedStream { +impl Stream for VectorSubscriberBatchedStream { type Item = Vec>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -208,14 +207,12 @@ impl Stream for VectorSubscriberBatchedStream< } }; - self.inner.set(make_future(rx)); + self.inner.set(rx); poll } } -fn handle_lag( - rx: &mut Receiver>, -) -> Option> { +fn handle_lag(rx: &mut Receiver>) -> Option> { let mut msg = None; loop { match rx.try_recv() { @@ -247,7 +244,53 @@ fn handle_lag( type SubscriberFutureReturn = (Result, Receiver); -async fn make_future(mut rx: Receiver) -> SubscriberFutureReturn { +struct ReusableBoxRecvFuture { + inner: ReusableBoxFuture<'static, SubscriberFutureReturn>>, +} + +async fn make_recv_future(mut rx: Receiver) -> SubscriberFutureReturn { let result = rx.recv().await; (result, rx) } + +impl ReusableBoxRecvFuture +where + T: Clone + 'static, +{ + fn set(&mut self, rx: Receiver>) { + self.inner.set(make_recv_future(rx)); + } + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll>> { + self.inner.poll(cx) + } +} + +impl ReusableBoxRecvFuture +where + T: Clone + 'static, +{ + fn new(rx: Receiver>) -> Self { + Self { inner: ReusableBoxFuture::new(make_recv_future(rx)) } + } +} + +fn assert_send(_val: T) {} +#[allow(unused)] +fn assert_make_future_send() { + #[derive(Clone)] + struct IsSend(*mut ()); + unsafe impl Send for IsSend {} + + let (_sender, receiver): (_, Receiver) = broadcast::channel(1); + + assert_send(make_recv_future(receiver)); +} +// SAFETY: make_future is Send if T is, as proven by assert_make_future_send. +unsafe impl Send for ReusableBoxRecvFuture {} + +impl fmt::Debug for ReusableBoxRecvFuture { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReusableBoxRecvFuture").finish() + } +} diff --git a/eyeball-im/src/vector/transaction.rs b/eyeball-im/src/vector/transaction.rs index adc8e9d..7178721 100644 --- a/eyeball-im/src/vector/transaction.rs +++ b/eyeball-im/src/vector/transaction.rs @@ -21,7 +21,7 @@ pub struct ObservableVectorTransaction<'o, T: Clone> { batch: Vec>, } -impl<'o, T: Clone + Send + Sync + 'static> ObservableVectorTransaction<'o, T> { +impl<'o, T: Clone + 'static> ObservableVectorTransaction<'o, T> { pub(super) fn new(inner: &'o mut ObservableVector) -> Self { let values = inner.values.clone(); Self { inner, values, batch: Vec::new() } @@ -316,7 +316,7 @@ pub struct ObservableVectorTransactionEntry<'a, 'o, T: Clone> { impl<'a, 'o, T> ObservableVectorTransactionEntry<'a, 'o, T> where - T: Clone + Send + Sync + 'static, + T: Clone + 'static, { pub(super) fn new(inner: &'a mut ObservableVectorTransaction<'o, T>, index: usize) -> Self { Self { inner, index: EntryIndex::Owned(index) } @@ -397,7 +397,7 @@ pub struct ObservableVectorTransactionEntries<'a, 'o, T: Clone> { impl<'a, 'o, T> ObservableVectorTransactionEntries<'a, 'o, T> where - T: Clone + Send + Sync + 'static, + T: Clone + 'static, { pub(super) fn new(inner: &'a mut ObservableVectorTransaction<'o, T>) -> Self { Self { inner, index: 0 }