diff --git a/source/tricky-pipe/src/bidi.rs b/source/tricky-pipe/src/bidi.rs index 576e9b0..d55da35 100644 --- a/source/tricky-pipe/src/bidi.rs +++ b/source/tricky-pipe/src/bidi.rs @@ -10,6 +10,7 @@ use crate::mpsc::{ }; use core::fmt; use futures::FutureExt; +use serde::{de::DeserializeOwned, Serialize}; /// A bidirectional typed channel. /// @@ -78,6 +79,20 @@ where } } + /// Erase the message types of this `BiDi`, returning a [`SerBiDi`]. + pub fn erase(self) -> SerBiDi + where + In: Serialize + Send + Sync + 'static, + Out: DeserializeOwned + Send + Sync + 'static, + { + SerBiDi { + tx: self.tx.erased(), + rx: self.rx.erased(), + seen_rx_error: self.seen_rx_error, + seen_tx_error: self.seen_tx_error, + } + } + /// Consumes `self`, extracting the inner [`Sender`] and [`Receiver`]. #[must_use] pub fn split(self) -> (Sender, Receiver) { @@ -86,7 +101,6 @@ where /// Wait until the channel is either ready to send a message *or* a new /// incoming message is received, whichever occurs first. - #[must_use] pub async fn wait(&mut self) -> WaitResult>, E> { futures::select_biased! { reserve = self.tx.reserve().fuse() => { diff --git a/source/tricky-pipe/src/mpsc/arc_impl.rs b/source/tricky-pipe/src/mpsc/arc_impl.rs index 159cc9f..4d1a911 100644 --- a/source/tricky-pipe/src/mpsc/arc_impl.rs +++ b/source/tricky-pipe/src/mpsc/arc_impl.rs @@ -63,14 +63,11 @@ impl TrickyPipe { type_name: core::any::type_name::, }; - fn erased(&self) -> ErasedPipe { + fn pipe(&self) -> TypedPipe { let ptr = Arc::into_raw(self.0.clone()) as *const _; - unsafe { ErasedPipe::new(ptr, Self::CORE_VTABLE) } + TypedPipe::new(ptr, Self::CORE_VTABLE) } - fn typed(&self) -> TypedPipe { - unsafe { self.erased().typed() } - } /// Try to obtain a [`Receiver`] capable of receiving `T`-typed data /// /// This method will only return [`Some`] on the first call. All subsequent calls @@ -78,7 +75,7 @@ impl TrickyPipe { pub fn receiver(&self) -> Option> { self.0.core.try_claim_rx()?; - Some(Receiver { pipe: self.typed() }) + Some(Receiver { pipe: self.pipe() }) } /// Obtain a [`Sender`] capable of sending `T`-typed data @@ -86,7 +83,7 @@ impl TrickyPipe { /// This function may be called multiple times. pub fn sender(&self) -> Sender { self.0.core.add_tx(); - Sender { pipe: self.typed() } + Sender { pipe: self.pipe() } } unsafe fn get_core(ptr: *const ()) -> *const Core { @@ -111,57 +108,33 @@ impl TrickyPipe { test_println!(refs = Arc::strong_count(&arc), "erased_drop({ptr:p})"); drop(arc) } -} -impl TrickyPipe -where - T: Serialize + Send + 'static, - E: Clone + Send + Sync, -{ /// Try to obtain a [`SerReceiver`] capable of receiving bytes containing /// a serialized instance of `T`. /// /// This method will only return [`Some`] on the first call. All subsequent calls /// will return [`None`]. - pub fn ser_receiver(&self) -> Option> { + pub fn ser_receiver(&self) -> Option> + where + T: Serialize + Send + 'static, + { self.0.core.try_claim_rx()?; - Some(SerReceiver { - pipe: self.erased(), - vtable: Self::SER_VTABLE, - }) + Some(SerReceiver::new(self.pipe())) } - const SER_VTABLE: &'static SerVtable = &SerVtable { - #[cfg(any(test, feature = "alloc"))] - to_vec: SerVtable::to_vec::, - #[cfg(any(test, feature = "alloc"))] - to_vec_framed: SerVtable::to_vec_framed::, - to_slice: SerVtable::to_slice::, - to_slice_framed: SerVtable::to_slice_framed::, - drop_elem: SerVtable::drop_elem::, - }; -} - -impl TrickyPipe -where - T: DeserializeOwned + Send + 'static, - E: Clone + Send + Sync, -{ /// Try to obtain a [`DeserSender`] capable of sending bytes containing /// a serialized instance of `T`. /// /// This method will only return [`Some`] on the first call. All subsequent calls /// will return [`None`]. - pub fn deser_sender(&self) -> DeserSender { + pub fn deser_sender(&self) -> DeserSender + where + T: DeserializeOwned + Send + 'static, + { self.0.core.add_tx(); - DeserSender { - pipe: self.erased(), - vtable: Self::DESER_VTABLE, - } + DeserSender::new(self.pipe()) } - - const DESER_VTABLE: &'static DeserVtable = &DeserVtable::new::(); } impl Clone for TrickyPipe { diff --git a/source/tricky-pipe/src/mpsc/channel_core.rs b/source/tricky-pipe/src/mpsc/channel_core.rs index a57bbeb..2b62907 100644 --- a/source/tricky-pipe/src/mpsc/channel_core.rs +++ b/source/tricky-pipe/src/mpsc/channel_core.rs @@ -113,6 +113,8 @@ pub(super) struct CoreVtable { pub(super) type_name: fn() -> &'static str, } +pub(super) struct Vtables(PhantomData); + pub(super) struct SerVtable { #[cfg(any(test, feature = "alloc"))] pub(super) to_vec: SerVecFn, @@ -600,21 +602,6 @@ impl ErasedSlice { // == impl ErasedPipe === impl ErasedPipe { - pub(super) unsafe fn new(ptr: *const (), vtable: &'static CoreVtable) -> Self { - Self { ptr, vtable } - } - - /// # Safety - /// - /// This `ErasedPipe` must have been type-erased from a tricky-pipe with - /// elements of type `T`! - pub(super) unsafe fn typed(self) -> TypedPipe { - TypedPipe { - pipe: self, - _t: PhantomData, - } - } - pub(super) fn core(&self) -> &Core { unsafe { &*(self.vtable.get_core)(self.ptr) } } @@ -658,6 +645,31 @@ unsafe impl Sync for ErasedPipe {} // === impl TypedPipe === impl TypedPipe { + pub(super) fn new(ptr: *const (), vtable: &'static CoreVtable) -> Self { + Self { + pipe: ErasedPipe { ptr, vtable }, + _t: PhantomData, + } + } + + pub(super) unsafe fn erased(self) -> ErasedPipe { + self.pipe + } + + /// Clone this `TypedPipe` *without* incrementing the reference count. This + /// is intended to be used only when converting to a different reference + /// type, when the original `TypedPipe` will not have its destructor run. + /// + /// # Safety + /// + /// Do NOT `Drop` this `TypedPipe` after calling this method!!!! + pub(super) unsafe fn clone_no_ref_inc(&self) -> Self { + Self { + pipe: self.pipe.clone(), + _t: PhantomData, + } + } + pub(super) fn core(&self) -> &Core { self.pipe.core() } @@ -683,6 +695,27 @@ impl Clone for TypedPipe { unsafe impl Send for TypedPipe {} unsafe impl Sync for TypedPipe {} +// === impl MkVtables === + +impl Vtables { + pub(crate) const SERIALIZE: &'static SerVtable = &SerVtable { + #[cfg(any(test, feature = "alloc"))] + to_vec: SerVtable::to_vec::, + #[cfg(any(test, feature = "alloc"))] + to_vec_framed: SerVtable::to_vec_framed::, + to_slice: SerVtable::to_slice::, + to_slice_framed: SerVtable::to_slice_framed::, + drop_elem: SerVtable::drop_elem::, + }; +} + +impl Vtables { + pub(crate) const DESERIALIZE: &'static DeserVtable = &DeserVtable { + from_bytes: DeserVtable::from_bytes::, + from_bytes_framed: DeserVtable::from_bytes_framed::, + }; +} + // === impl SerVtable === impl SerVtable { @@ -753,14 +786,9 @@ impl SerVtable { } } -impl DeserVtable { - pub(super) const fn new() -> Self { - Self { - from_bytes: Self::from_bytes::, - from_bytes_framed: Self::from_bytes_framed::, - } - } +// === impl DeserVtable === +impl DeserVtable { fn from_bytes( elems: ErasedSlice, idx: u8, diff --git a/source/tricky-pipe/src/mpsc/mod.rs b/source/tricky-pipe/src/mpsc/mod.rs index 2d4ab07..247c868 100644 --- a/source/tricky-pipe/src/mpsc/mod.rs +++ b/source/tricky-pipe/src/mpsc/mod.rs @@ -1,13 +1,15 @@ //! Multi-Producer, Single-Consumer (MPSC) channels. use self::{ - channel_core::{DeserVtable, ErasedPipe, ErasedSlice, Reservation, SerVtable, TypedPipe}, + channel_core::{ + DeserVtable, ErasedPipe, ErasedSlice, Reservation, SerVtable, TypedPipe, Vtables, + }, error::*, }; use crate::loom::cell::{self, CellWith, UnsafeCell}; use core::{ fmt, future::Future, - mem::MaybeUninit, + mem::{self, MaybeUninit}, ops::{Deref, DerefMut}, pin::Pin, ptr, @@ -269,6 +271,23 @@ where self.pipe.elems()[res.idx as usize].with(|ptr| unsafe { (*ptr).assume_init_read() }) } + /// Erases the message type of this `Receiver`, returning a [`SerReceiver`] + /// that receives serialized byte representations of messages. + pub fn erased(self) -> SerReceiver + where + T: Serialize + Send + Sync, + { + // don't run the destructor for this `Receiver`, as we are converting it + // into a `DeserReceiver`, which will keep the channel open. + let this = mem::ManuallyDrop::new(self); + unsafe { + // Safety: since we are not dropping the `Receiver`, we can safely + // duplicate the `TypedPipe`, preserving the existing receiver + // refcount. + SerReceiver::new(this.pipe.clone_no_ref_inc()) + } + } + /// Close this channel with an error. Any subsequent attempts to send /// messages to this channel will fail with `error`. /// @@ -379,6 +398,16 @@ impl futures::Stream for Receiver { // === impl SerReceiver === impl SerReceiver { + fn new(pipe: TypedPipe) -> Self + where + T: Serialize + Send + 'static, + { + Self { + pipe: unsafe { pipe.erased() }, + vtable: Vtables::::SERIALIZE, + } + } + /// Attempts to receive the serialized representation of next message from /// the channel, without waiting for a new message to be sent if none are /// available. @@ -683,6 +712,16 @@ unsafe impl Sync for SerRecvRef<'_, E> {} // === impl DeserSender === impl DeserSender { + fn new(pipe: TypedPipe) -> Self + where + T: DeserializeOwned + Send + 'static, + { + Self { + pipe: unsafe { pipe.erased() }, + vtable: Vtables::::DESERIALIZE, + } + } + /// Reserve capacity to send a serialized message to the channel. /// /// If the channel is currently at capacity, this method waits until @@ -1147,6 +1186,24 @@ impl Sender { Ok(Permit { cell, pipe }) } + /// Erases the message type of this `Sender`, returning a [`DeserSender`] + /// that sends messages from their serialized binary representations. + pub fn erased(self) -> DeserSender + where + T: DeserializeOwned + Send + Sync, + { + // don't run the destructor for this `Sender`, as we are converting it + // into a `SerSender`, keeping the existing reference count held by + // this `Sender`. + let this = mem::ManuallyDrop::new(self); + DeserSender::new(unsafe { + // Safety: since we are not dropping the `Sender`, we can safely + // duplicate the `TypedPipe`, preserving the existing receiver + // refcount. + this.pipe.clone_no_ref_inc() + }) + } + /// Close this channel with an error. Any subsequent attempts to send /// messages to this channel will fail with `error`. /// diff --git a/source/tricky-pipe/src/mpsc/static_impl.rs b/source/tricky-pipe/src/mpsc/static_impl.rs index 336e637..87ee2b4 100644 --- a/source/tricky-pipe/src/mpsc/static_impl.rs +++ b/source/tricky-pipe/src/mpsc/static_impl.rs @@ -1,5 +1,5 @@ use super::{ - channel_core::{Core, CoreVtable, ErasedPipe, ErasedSlice}, + channel_core::{Core, CoreVtable, ErasedSlice}, *, }; @@ -15,7 +15,7 @@ pub struct StaticTrickyPipe { impl StaticTrickyPipe where T: 'static, - E: 'static, + E: Clone + 'static, { const EMPTY_CELL: Cell = UnsafeCell::new(MaybeUninit::uninit()); @@ -45,12 +45,8 @@ where type_name: core::any::type_name::, }; - fn erased(&'static self) -> ErasedPipe { - unsafe { ErasedPipe::new(self as *const _ as *const (), Self::CORE_VTABLE) } - } - - fn typed(&'static self) -> TypedPipe { - unsafe { self.erased().typed() } + fn pipe(&'static self) -> TypedPipe { + TypedPipe::new(self as *const _ as *const (), Self::CORE_VTABLE) } /// Try to obtain a [`Receiver`] capable of receiving `T`-typed data @@ -60,7 +56,7 @@ where pub fn receiver(&'static self) -> Option> { self.core.try_claim_rx()?; - Some(Receiver { pipe: self.typed() }) + Some(Receiver { pipe: self.pipe() }) } /// Obtain a [`Sender`] capable of sending `T`-typed data @@ -68,7 +64,33 @@ where /// This function may be called multiple times. pub fn sender(&'static self) -> Sender { self.core.add_tx(); - Sender { pipe: self.typed() } + Sender { pipe: self.pipe() } + } + + /// Try to obtain a [`SerReceiver`] capable of receiving bytes containing + /// a serialized instance of `T`. + /// + /// This method will only return [`Some`] on the first call. All subsequent calls + /// will return [`None`]. + pub fn ser_receiver(&'static self) -> Option> + where + T: Serialize + Send + 'static, + { + self.core.try_claim_rx()?; + + Some(SerReceiver::new(self.pipe())) + } + + /// Try to obtain a [`DeserSender`] capable of sending bytes containing + /// a serialized instance of `T`. + /// + /// This method will only return [`Some`] on the first call. All subsequent calls + /// will return [`None`]. + pub fn deser_sender(&'static self) -> DeserSender + where + T: DeserializeOwned + Send + 'static, + { + DeserSender::new(self.pipe()) } fn get_core(ptr: *const ()) -> *const Core { @@ -92,56 +114,6 @@ where fn erased_drop(_: *const ()) {} } -impl StaticTrickyPipe -where - T: Serialize + Send + 'static, -{ - /// Try to obtain a [`SerReceiver`] capable of receiving bytes containing - /// a serialized instance of `T`. - /// - /// This method will only return [`Some`] on the first call. All subsequent calls - /// will return [`None`]. - pub fn ser_receiver(&'static self) -> Option> { - self.core.try_claim_rx()?; - - Some(SerReceiver { - pipe: self.erased(), - vtable: Self::SER_VTABLE, - }) - } - - const SER_VTABLE: &'static SerVtable = &SerVtable { - #[cfg(any(test, feature = "alloc"))] - to_vec: SerVtable::to_vec::, - #[cfg(any(test, feature = "alloc"))] - to_vec_framed: SerVtable::to_vec_framed::, - to_slice: SerVtable::to_slice::, - to_slice_framed: SerVtable::to_slice_framed::, - drop_elem: SerVtable::drop_elem::, - }; -} - -impl StaticTrickyPipe -where - T: DeserializeOwned + Send + 'static, - E:, -{ - /// Try to obtain a [`DeserSender`] capable of sending bytes containing - /// a serialized instance of `T`. - /// - /// This method will only return [`Some`] on the first call. All subsequent calls - /// will return [`None`]. - pub fn deser_sender(&'static self) -> DeserSender { - self.core.add_tx(); - DeserSender { - pipe: self.erased(), - vtable: Self::DESER_VTABLE, - } - } - - const DESER_VTABLE: &'static DeserVtable = &DeserVtable::new::(); -} - unsafe impl Send for StaticTrickyPipe where T: Send,