Skip to content

Commit

Permalink
feat(tricky-pipe): erase() existing tx/rxs (#37)
Browse files Browse the repository at this point in the history
This commit adds `Sender::erase` and `Receiver::erase` methods that
convert an already-existing `Sender` or `Receiver` into a type-erased
`Sender`/`Receiver.
  • Loading branch information
hawkw committed Nov 29, 2023
1 parent 6b42ef5 commit 6c68024
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 126 deletions.
16 changes: 15 additions & 1 deletion source/tricky-pipe/src/bidi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::mpsc::{
};
use core::fmt;
use futures::FutureExt;
use serde::{de::DeserializeOwned, Serialize};

/// A bidirectional typed channel.
///
Expand Down Expand Up @@ -78,6 +79,20 @@ where
}
}

/// Erase the message types of this `BiDi`, returning a [`SerBiDi`].
pub fn erase(self) -> SerBiDi<E>
where
In: Serialize + Send + Sync + 'static,
Out: DeserializeOwned + Send + Sync + 'static,
{
SerBiDi {
tx: self.tx.erased(),
rx: self.rx.erased(),
seen_rx_error: self.seen_rx_error,
seen_tx_error: self.seen_tx_error,
}
}

/// Consumes `self`, extracting the inner [`Sender`] and [`Receiver`].
#[must_use]
pub fn split(self) -> (Sender<Out, E>, Receiver<In, E>) {
Expand All @@ -86,7 +101,6 @@ where

/// Wait until the channel is either ready to send a message *or* a new
/// incoming message is received, whichever occurs first.
#[must_use]
pub async fn wait(&mut self) -> WaitResult<Event<In, Permit<'_, Out, E>>, E> {
futures::select_biased! {
reserve = self.tx.reserve().fuse() => {
Expand Down
55 changes: 14 additions & 41 deletions source/tricky-pipe/src/mpsc/arc_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,30 +63,27 @@ impl<T: 'static, E: Clone + 'static> TrickyPipe<T, E> {
type_name: core::any::type_name::<T>,
};

fn erased(&self) -> ErasedPipe<E> {
fn pipe(&self) -> TypedPipe<T, E> {
let ptr = Arc::into_raw(self.0.clone()) as *const _;
unsafe { ErasedPipe::new(ptr, Self::CORE_VTABLE) }
TypedPipe::new(ptr, Self::CORE_VTABLE)
}

fn typed(&self) -> TypedPipe<T, E> {
unsafe { self.erased().typed() }
}
/// Try to obtain a [`Receiver<T>`] capable of receiving `T`-typed data
///
/// This method will only return [`Some`] on the first call. All subsequent calls
/// will return [`None`].
pub fn receiver(&self) -> Option<Receiver<T, E>> {
self.0.core.try_claim_rx()?;

Some(Receiver { pipe: self.typed() })
Some(Receiver { pipe: self.pipe() })
}

/// Obtain a [`Sender<T>`] capable of sending `T`-typed data
///
/// This function may be called multiple times.
pub fn sender(&self) -> Sender<T, E> {
self.0.core.add_tx();
Sender { pipe: self.typed() }
Sender { pipe: self.pipe() }
}

unsafe fn get_core(ptr: *const ()) -> *const Core<E> {
Expand All @@ -111,57 +108,33 @@ impl<T: 'static, E: Clone + 'static> TrickyPipe<T, E> {
test_println!(refs = Arc::strong_count(&arc), "erased_drop({ptr:p})");
drop(arc)
}
}

impl<T, E> TrickyPipe<T, E>
where
T: Serialize + Send + 'static,
E: Clone + Send + Sync,
{
/// Try to obtain a [`SerReceiver`] capable of receiving bytes containing
/// a serialized instance of `T`.
///
/// This method will only return [`Some`] on the first call. All subsequent calls
/// will return [`None`].
pub fn ser_receiver(&self) -> Option<SerReceiver<E>> {
pub fn ser_receiver(&self) -> Option<SerReceiver<E>>
where
T: Serialize + Send + 'static,
{
self.0.core.try_claim_rx()?;

Some(SerReceiver {
pipe: self.erased(),
vtable: Self::SER_VTABLE,
})
Some(SerReceiver::new(self.pipe()))
}

const SER_VTABLE: &'static SerVtable = &SerVtable {
#[cfg(any(test, feature = "alloc"))]
to_vec: SerVtable::to_vec::<T>,
#[cfg(any(test, feature = "alloc"))]
to_vec_framed: SerVtable::to_vec_framed::<T>,
to_slice: SerVtable::to_slice::<T>,
to_slice_framed: SerVtable::to_slice_framed::<T>,
drop_elem: SerVtable::drop_elem::<T>,
};
}

impl<T, E> TrickyPipe<T, E>
where
T: DeserializeOwned + Send + 'static,
E: Clone + Send + Sync,
{
/// Try to obtain a [`DeserSender`] capable of sending bytes containing
/// a serialized instance of `T`.
///
/// This method will only return [`Some`] on the first call. All subsequent calls
/// will return [`None`].
pub fn deser_sender(&self) -> DeserSender<E> {
pub fn deser_sender(&self) -> DeserSender<E>
where
T: DeserializeOwned + Send + 'static,
{
self.0.core.add_tx();
DeserSender {
pipe: self.erased(),
vtable: Self::DESER_VTABLE,
}
DeserSender::new(self.pipe())
}

const DESER_VTABLE: &'static DeserVtable = &DeserVtable::new::<T>();
}

impl<T, E: Clone> Clone for TrickyPipe<T, E> {
Expand Down
72 changes: 50 additions & 22 deletions source/tricky-pipe/src/mpsc/channel_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ pub(super) struct CoreVtable<E> {
pub(super) type_name: fn() -> &'static str,
}

pub(super) struct Vtables<T>(PhantomData<fn(T)>);

pub(super) struct SerVtable {
#[cfg(any(test, feature = "alloc"))]
pub(super) to_vec: SerVecFn,
Expand Down Expand Up @@ -600,21 +602,6 @@ impl ErasedSlice {
// == impl ErasedPipe ===

impl<E> ErasedPipe<E> {
pub(super) unsafe fn new(ptr: *const (), vtable: &'static CoreVtable<E>) -> Self {
Self { ptr, vtable }
}

/// # Safety
///
/// This `ErasedPipe` must have been type-erased from a tricky-pipe with
/// elements of type `T`!
pub(super) unsafe fn typed<T>(self) -> TypedPipe<T, E> {
TypedPipe {
pipe: self,
_t: PhantomData,
}
}

pub(super) fn core(&self) -> &Core<E> {
unsafe { &*(self.vtable.get_core)(self.ptr) }
}
Expand Down Expand Up @@ -658,6 +645,31 @@ unsafe impl<E: Send + Sync> Sync for ErasedPipe<E> {}
// === impl TypedPipe ===

impl<T: 'static, E> TypedPipe<T, E> {
pub(super) fn new(ptr: *const (), vtable: &'static CoreVtable<E>) -> Self {
Self {
pipe: ErasedPipe { ptr, vtable },
_t: PhantomData,
}
}

pub(super) unsafe fn erased(self) -> ErasedPipe<E> {
self.pipe
}

/// Clone this `TypedPipe` *without* incrementing the reference count. This
/// is intended to be used only when converting to a different reference
/// type, when the original `TypedPipe` will not have its destructor run.
///
/// # Safety
///
/// Do NOT `Drop` this `TypedPipe` after calling this method!!!!
pub(super) unsafe fn clone_no_ref_inc(&self) -> Self {
Self {
pipe: self.pipe.clone(),
_t: PhantomData,
}
}

pub(super) fn core(&self) -> &Core<E> {
self.pipe.core()
}
Expand All @@ -683,6 +695,27 @@ impl<T: 'static, E> Clone for TypedPipe<T, E> {
unsafe impl<T: Send, E: Send + Sync> Send for TypedPipe<T, E> {}
unsafe impl<T: Send, E: Send + Sync> Sync for TypedPipe<T, E> {}

// === impl MkVtables ===

impl<T: Serialize + Send + 'static> Vtables<T> {
pub(crate) const SERIALIZE: &'static SerVtable = &SerVtable {
#[cfg(any(test, feature = "alloc"))]
to_vec: SerVtable::to_vec::<T>,
#[cfg(any(test, feature = "alloc"))]
to_vec_framed: SerVtable::to_vec_framed::<T>,
to_slice: SerVtable::to_slice::<T>,
to_slice_framed: SerVtable::to_slice_framed::<T>,
drop_elem: SerVtable::drop_elem::<T>,
};
}

impl<T: DeserializeOwned + Send + 'static> Vtables<T> {
pub(crate) const DESERIALIZE: &'static DeserVtable = &DeserVtable {
from_bytes: DeserVtable::from_bytes::<T>,
from_bytes_framed: DeserVtable::from_bytes_framed::<T>,
};
}

// === impl SerVtable ===

impl SerVtable {
Expand Down Expand Up @@ -753,14 +786,9 @@ impl SerVtable {
}
}

impl DeserVtable {
pub(super) const fn new<T: DeserializeOwned + 'static>() -> Self {
Self {
from_bytes: Self::from_bytes::<T>,
from_bytes_framed: Self::from_bytes_framed::<T>,
}
}
// === impl DeserVtable ===

impl DeserVtable {
fn from_bytes<T: DeserializeOwned + 'static>(
elems: ErasedSlice,
idx: u8,
Expand Down
61 changes: 59 additions & 2 deletions source/tricky-pipe/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
//! Multi-Producer, Single-Consumer (MPSC) channels.
use self::{
channel_core::{DeserVtable, ErasedPipe, ErasedSlice, Reservation, SerVtable, TypedPipe},
channel_core::{
DeserVtable, ErasedPipe, ErasedSlice, Reservation, SerVtable, TypedPipe, Vtables,
},
error::*,
};
use crate::loom::cell::{self, CellWith, UnsafeCell};
use core::{
fmt,
future::Future,
mem::MaybeUninit,
mem::{self, MaybeUninit},
ops::{Deref, DerefMut},
pin::Pin,
ptr,
Expand Down Expand Up @@ -269,6 +271,23 @@ where
self.pipe.elems()[res.idx as usize].with(|ptr| unsafe { (*ptr).assume_init_read() })
}

/// Erases the message type of this `Receiver`, returning a [`SerReceiver`]
/// that receives serialized byte representations of messages.
pub fn erased(self) -> SerReceiver<E>
where
T: Serialize + Send + Sync,
{
// don't run the destructor for this `Receiver`, as we are converting it
// into a `DeserReceiver`, which will keep the channel open.
let this = mem::ManuallyDrop::new(self);
unsafe {
// Safety: since we are not dropping the `Receiver`, we can safely
// duplicate the `TypedPipe`, preserving the existing receiver
// refcount.
SerReceiver::new(this.pipe.clone_no_ref_inc())
}
}

/// Close this channel with an error. Any subsequent attempts to send
/// messages to this channel will fail with `error`.
///
Expand Down Expand Up @@ -379,6 +398,16 @@ impl<T, E: Clone> futures::Stream for Receiver<T, E> {
// === impl SerReceiver ===

impl<E: Clone> SerReceiver<E> {
fn new<T>(pipe: TypedPipe<T, E>) -> Self
where
T: Serialize + Send + 'static,
{
Self {
pipe: unsafe { pipe.erased() },
vtable: Vtables::<T>::SERIALIZE,
}
}

/// Attempts to receive the serialized representation of next message from
/// the channel, without waiting for a new message to be sent if none are
/// available.
Expand Down Expand Up @@ -683,6 +712,16 @@ unsafe impl<E: Send + Sync> Sync for SerRecvRef<'_, E> {}
// === impl DeserSender ===

impl<E: Clone> DeserSender<E> {
fn new<T>(pipe: TypedPipe<T, E>) -> Self
where
T: DeserializeOwned + Send + 'static,
{
Self {
pipe: unsafe { pipe.erased() },
vtable: Vtables::<T>::DESERIALIZE,
}
}

/// Reserve capacity to send a serialized message to the channel.
///
/// If the channel is currently at capacity, this method waits until
Expand Down Expand Up @@ -1147,6 +1186,24 @@ impl<T, E: Clone> Sender<T, E> {
Ok(Permit { cell, pipe })
}

/// Erases the message type of this `Sender`, returning a [`DeserSender`]
/// that sends messages from their serialized binary representations.
pub fn erased(self) -> DeserSender<E>
where
T: DeserializeOwned + Send + Sync,
{
// don't run the destructor for this `Sender`, as we are converting it
// into a `SerSender`, keeping the existing reference count held by
// this `Sender`.
let this = mem::ManuallyDrop::new(self);
DeserSender::new(unsafe {
// Safety: since we are not dropping the `Sender`, we can safely
// duplicate the `TypedPipe`, preserving the existing receiver
// refcount.
this.pipe.clone_no_ref_inc()
})
}

/// Close this channel with an error. Any subsequent attempts to send
/// messages to this channel will fail with `error`.
///
Expand Down
Loading

0 comments on commit 6c68024

Please sign in to comment.