Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use postcard encoding for all transports that require serialization #114

Merged
merged 13 commits into from
Nov 29, 2024
42 changes: 41 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ description = "A streaming rpc system based on quic"
rust-version = "1.76"

[dependencies]
bincode = { version = "1.3.3", optional = true }
bytes = { version = "1", optional = true }
derive_more = { version = "1.0.0-beta.6", features = ["from", "try_into", "display"] }
flume = { version = "0.11", optional = true }
Expand All @@ -28,12 +27,14 @@ serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1", default-features = false, features = ["macros", "sync"] }
tokio-serde = { version = "0.9", features = ["bincode"], optional = true }
tokio-util = { version = "0.7", features = ["rt"] }
postcard = { version = "1", features = ["use-std"], optional = true }
tracing = "0.1"
futures = { version = "0.3.30", optional = true }
anyhow = "1.0.73"

# Indirect dependencies, is needed to make the minimal crates versions work
slab = "0.4.9" # iroh-quinn
smallvec = "1.13.2"
time = "0.3.36" # serde

[dev-dependencies]
Expand All @@ -54,10 +55,10 @@ nested_enum_utils = "0.1.0"
tokio-util = { version = "0.7", features = ["rt"] }

[features]
hyper-transport = ["dep:flume", "dep:hyper", "dep:bincode", "dep:bytes", "dep:tokio-serde", "tokio-util/codec"]
quinn-transport = ["dep:flume", "dep:quinn", "dep:bincode", "dep:tokio-serde", "tokio-util/codec"]
hyper-transport = ["dep:flume", "dep:hyper", "dep:postcard", "dep:bytes", "dep:tokio-serde", "tokio-util/codec"]
quinn-transport = ["dep:flume", "dep:quinn", "dep:postcard", "dep:tokio-serde", "tokio-util/codec"]
flume-transport = ["dep:flume"]
iroh-net-transport = ["dep:iroh-net", "dep:flume", "dep:quinn", "dep:bincode", "dep:tokio-serde", "tokio-util/codec"]
iroh-net-transport = ["dep:iroh-net", "dep:flume", "dep:quinn", "dep:postcard", "dep:tokio-serde", "tokio-util/codec"]
macros = []
default = ["flume-transport"]

Expand Down
4 changes: 2 additions & 2 deletions src/transport/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl<'a, In: RpcMessage, Out: RpcMessage> OpenFuture<'a, In, Out> {
}
}

impl<'a, In: RpcMessage, Out: RpcMessage> Future for OpenFuture<'a, In, Out> {
impl<In: RpcMessage, Out: RpcMessage> Future for OpenFuture<'_, In, Out> {
type Output = anyhow::Result<(SendSink<Out>, RecvStream<In>)>;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<'a, In: RpcMessage, Out: RpcMessage> AcceptFuture<'a, In, Out> {
}
}

impl<'a, In: RpcMessage, Out: RpcMessage> Future for AcceptFuture<'a, In, Out> {
impl<In: RpcMessage, Out: RpcMessage> Future for AcceptFuture<'_, In, Out> {
type Output = anyhow::Result<(SendSink<Out>, RecvStream<In>)>;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Expand Down
12 changes: 6 additions & 6 deletions src/transport/hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ async fn try_forward_all<In: RpcMessage>(
let mut sent = 0;
while let Some(msg) = try_get_length_prefixed(&buffer[sent..]) {
sent += msg.len() + 4;
let item = bincode::deserialize::<In>(msg).map_err(RecvError::DeserializeError);
let item = postcard::from_bytes::<In>(msg).map_err(RecvError::DeserializeError);
if let Err(_cause) = req_tx.send_async(item).await {
// The receiver is gone, so we can't send any more data.
//
Expand Down Expand Up @@ -434,7 +434,7 @@ impl<Out: RpcMessage> SendSink<Out> {
fn serialize(&self, item: Out) -> Result<Bytes, SendError> {
let mut data = Vec::with_capacity(1024);
data.extend_from_slice(&[0u8; 4]);
bincode::serialize_into(&mut data, &item).map_err(SendError::SerializeError)?;
let mut data = postcard::to_extend(&item, data).map_err(SendError::SerializeError)?;
let len = data.len() - 4;
if len > self.config.max_payload_size {
return Err(SendError::SizeError(len));
Expand Down Expand Up @@ -503,8 +503,8 @@ impl<Out: RpcMessage> Sink<Out> for SendSink<Out> {
/// Send error for hyper channels.
#[derive(Debug)]
pub enum SendError {
/// Error when bincode serializing the message.
SerializeError(bincode::Error),
/// Error when postcard serializing the message.
SerializeError(postcard::Error),
/// The message is too large to be sent.
SizeError(usize),
/// The connection has been closed.
Expand All @@ -522,8 +522,8 @@ impl error::Error for SendError {}
/// Receive error for hyper channels.
#[derive(Debug)]
pub enum RecvError {
/// Error when bincode deserializing the message.
DeserializeError(bincode::Error),
/// Error when postcard deserializing the message.
DeserializeError(postcard::Error),
/// Hyper network error.
NetworkError(hyper::Error),
}
Expand Down
14 changes: 7 additions & 7 deletions src/transport/iroh_net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::{sync::oneshot, task::yield_now};
use tracing::{debug_span, Instrument};

use super::{
util::{FramedBincodeRead, FramedBincodeWrite},
util::{FramedPostcardRead, FramedPostcardWrite},
StreamTypes,
};
use crate::{
Expand Down Expand Up @@ -658,12 +658,12 @@ impl<In: RpcMessage, Out: RpcMessage> Connector for IrohNetConnector<In, Out> {
}
}

/// A sink that wraps a quinn SendStream with length delimiting and bincode
/// A sink that wraps a quinn SendStream with length delimiting and postcard
///
/// If you want to send bytes directly, use [SendSink::into_inner] to get the
/// underlying [quinn::SendStream].
#[pin_project]
pub struct SendSink<Out>(#[pin] FramedBincodeWrite<quinn::SendStream, Out>);
pub struct SendSink<Out>(#[pin] FramedPostcardWrite<quinn::SendStream, Out>);

impl<Out> fmt::Debug for SendSink<Out> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -673,7 +673,7 @@ impl<Out> fmt::Debug for SendSink<Out> {

impl<Out: Serialize> SendSink<Out> {
fn new(inner: quinn::SendStream) -> Self {
let inner = FramedBincodeWrite::new(inner, MAX_FRAME_LENGTH);
let inner = FramedPostcardWrite::new(inner, MAX_FRAME_LENGTH);
Self(inner)
}
}
Expand Down Expand Up @@ -706,12 +706,12 @@ impl<Out: Serialize> Sink<Out> for SendSink<Out> {
}
}

/// A stream that wraps a quinn RecvStream with length delimiting and bincode
/// A stream that wraps a quinn RecvStream with length delimiting and postcard
///
/// If you want to receive bytes directly, use [RecvStream::into_inner] to get
/// the underlying [quinn::RecvStream].
#[pin_project]
pub struct RecvStream<In>(#[pin] FramedBincodeRead<quinn::RecvStream, In>);
pub struct RecvStream<In>(#[pin] FramedPostcardRead<quinn::RecvStream, In>);

impl<In> fmt::Debug for RecvStream<In> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -721,7 +721,7 @@ impl<In> fmt::Debug for RecvStream<In> {

impl<In: DeserializeOwned> RecvStream<In> {
fn new(inner: quinn::RecvStream) -> Self {
let inner = FramedBincodeRead::new(inner, MAX_FRAME_LENGTH);
let inner = FramedPostcardRead::new(inner, MAX_FRAME_LENGTH);
Self(inner)
}
}
Expand Down
6 changes: 1 addition & 5 deletions src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@ pub mod misc;
#[cfg(feature = "quinn-transport")]
pub mod quinn;

#[cfg(any(
feature = "quinn-transport",
feature = "hyper-transport",
feature = "iroh-net-transport"
))]
#[cfg(any(feature = "quinn-transport", feature = "iroh-net-transport"))]
mod util;

/// Errors that can happen when creating and using a [`Connector`] or [`Listener`].
Expand Down
16 changes: 8 additions & 8 deletions src/transport/quinn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::sync::oneshot;
use tracing::{debug_span, Instrument};

use super::{
util::{FramedBincodeRead, FramedBincodeWrite},
util::{FramedPostcardRead, FramedPostcardWrite},
StreamTypes,
};
use crate::{
Expand Down Expand Up @@ -578,7 +578,7 @@ impl<'a, T> Receiver<'a, T> {
}
}

impl<'a, T> Stream for Receiver<'a, T> {
impl<T> Stream for Receiver<'_, T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down Expand Up @@ -653,12 +653,12 @@ impl<In: RpcMessage, Out: RpcMessage> Connector for QuinnConnector<In, Out> {
}
}

/// A sink that wraps a quinn SendStream with length delimiting and bincode
/// A sink that wraps a quinn SendStream with length delimiting and postcard
///
/// If you want to send bytes directly, use [SendSink::into_inner] to get the
/// underlying [quinn::SendStream].
#[pin_project]
pub struct SendSink<Out>(#[pin] FramedBincodeWrite<quinn::SendStream, Out>);
pub struct SendSink<Out>(#[pin] FramedPostcardWrite<quinn::SendStream, Out>);

impl<Out> fmt::Debug for SendSink<Out> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -668,7 +668,7 @@ impl<Out> fmt::Debug for SendSink<Out> {

impl<Out: Serialize> SendSink<Out> {
fn new(inner: quinn::SendStream) -> Self {
let inner = FramedBincodeWrite::new(inner, MAX_FRAME_LENGTH);
let inner = FramedPostcardWrite::new(inner, MAX_FRAME_LENGTH);
Self(inner)
}
}
Expand Down Expand Up @@ -710,12 +710,12 @@ impl<Out: Serialize> Sink<Out> for SendSink<Out> {
}
}

/// A stream that wraps a quinn RecvStream with length delimiting and bincode
/// A stream that wraps a quinn RecvStream with length delimiting and postcard
///
/// If you want to receive bytes directly, use [RecvStream::into_inner] to get
/// the underlying [quinn::RecvStream].
#[pin_project]
pub struct RecvStream<In>(#[pin] FramedBincodeRead<quinn::RecvStream, In>);
pub struct RecvStream<In>(#[pin] FramedPostcardRead<quinn::RecvStream, In>);

impl<In> fmt::Debug for RecvStream<In> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -725,7 +725,7 @@ impl<In> fmt::Debug for RecvStream<In> {

impl<In: DeserializeOwned> RecvStream<In> {
fn new(inner: quinn::RecvStream) -> Self {
let inner = FramedBincodeRead::new(inner, MAX_FRAME_LENGTH);
let inner = FramedPostcardRead::new(inner, MAX_FRAME_LENGTH);
Self(inner)
}
}
Expand Down
Loading
Loading