Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use postcard encoding for all transports that require serialization #114

Merged
merged 13 commits into from
Nov 29, 2024
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ anyhow = "1.0.73"
# Indirect dependencies, is needed to make the minimal crates versions work
educe = "0.4.20" # tokio-serde
slab = "0.4.9" # iroh-quinn
tokio-serde-postcard = "0.1.0"

[dev-dependencies]
anyhow = "1.0.73"
Expand Down
10 changes: 5 additions & 5 deletions src/transport/quinn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::sync::oneshot;
use tracing::{debug_span, Instrument};

use super::{
util::{FramedBincodeRead, FramedBincodeWrite},
util::{FramedPostcardRead, FramedPostcardWrite},
StreamTypes,
};
use crate::{
Expand Down Expand Up @@ -658,7 +658,7 @@ impl<In: RpcMessage, Out: RpcMessage> Connector for QuinnConnector<In, Out> {
/// If you want to send bytes directly, use [SendSink::into_inner] to get the
/// underlying [quinn::SendStream].
#[pin_project]
pub struct SendSink<Out>(#[pin] FramedBincodeWrite<quinn::SendStream, Out>);
pub struct SendSink<Out>(#[pin] FramedPostcardWrite<quinn::SendStream, Out>);

impl<Out> fmt::Debug for SendSink<Out> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -668,7 +668,7 @@ impl<Out> fmt::Debug for SendSink<Out> {

impl<Out: Serialize> SendSink<Out> {
fn new(inner: quinn::SendStream) -> Self {
let inner = FramedBincodeWrite::new(inner, MAX_FRAME_LENGTH);
let inner = FramedPostcardWrite::new(inner, MAX_FRAME_LENGTH);
Self(inner)
}
}
Expand Down Expand Up @@ -715,7 +715,7 @@ impl<Out: Serialize> Sink<Out> for SendSink<Out> {
/// If you want to receive bytes directly, use [RecvStream::into_inner] to get
/// the underlying [quinn::RecvStream].
#[pin_project]
pub struct RecvStream<In>(#[pin] FramedBincodeRead<quinn::RecvStream, In>);
pub struct RecvStream<In>(#[pin] FramedPostcardRead<quinn::RecvStream, In>);

impl<In> fmt::Debug for RecvStream<In> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -725,7 +725,7 @@ impl<In> fmt::Debug for RecvStream<In> {

impl<In: DeserializeOwned> RecvStream<In> {
fn new(inner: quinn::RecvStream) -> Self {
let inner = FramedBincodeRead::new(inner, MAX_FRAME_LENGTH);
let inner = FramedPostcardRead::new(inner, MAX_FRAME_LENGTH);
Self(inner)
}
}
Expand Down
112 changes: 112 additions & 0 deletions src/transport/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,118 @@ use tokio_util::codec::LengthDelimitedCodec;

type BincodeEncoding =
bincode::config::WithOtherIntEncoding<bincode::DefaultOptions, bincode::config::FixintEncoding>;

#[pin_project]
pub struct FramedPostcardRead<T, In>(
#[pin]
tokio_serde::SymmetricallyFramed<
tokio_util::codec::FramedRead<T, tokio_util::codec::LengthDelimitedCodec>,
In,
tokio_serde_postcard::SymmetricalPostcard<In>,
>,
);

impl<T: AsyncRead, In: DeserializeOwned> FramedPostcardRead<T, In> {
/// Wrap a socket in a length delimited codec and bincode with fast fixint encoding
rklaehn marked this conversation as resolved.
Show resolved Hide resolved
pub fn new(inner: T, max_frame_length: usize) -> Self {
// configure length delimited codec with max frame length
let framing = LengthDelimitedCodec::builder()
.max_frame_length(max_frame_length)
.new_codec();
// create the actual framing. This turns the AsyncRead/AsyncWrite into a Stream/Sink of Bytes/BytesMut
let framed = tokio_util::codec::FramedRead::new(inner, framing);
let postcard = tokio_serde_postcard::Postcard::new();
// create the actual framing. This turns the Stream/Sink of Bytes/BytesMut into a Stream/Sink of In/Out
let framed = tokio_serde::Framed::new(framed, postcard);
Self(framed)
}
}

impl<T, In> FramedPostcardRead<T, In> {
/// Get the underlying binary stream
///
/// This can be useful if you want to drop the framing and use the underlying stream directly
/// after exchanging some messages.
pub fn into_inner(self) -> T {
self.0.into_inner().into_inner()
}
}

impl<T: AsyncRead, In: DeserializeOwned> Stream for FramedPostcardRead<T, In> {
type Item = Result<In, std::io::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.project().0).poll_next(cx)
}
}

/// Wrapper that wraps a bidirectional binary stream in a length delimited codec and bincode with fast fixint encoding
/// to get a bidirectional stream of rpc Messages
#[pin_project]
pub struct FramedPostcardWrite<T, Out>(
#[pin]
tokio_serde::SymmetricallyFramed<
tokio_util::codec::FramedWrite<T, tokio_util::codec::LengthDelimitedCodec>,
Out,
tokio_serde_postcard::SymmetricalPostcard<Out>,
>,
);

impl<T: AsyncWrite, Out: Serialize> FramedPostcardWrite<T, Out> {
/// Wrap a socket in a length delimited codec and bincode with fast fixint encoding
rklaehn marked this conversation as resolved.
Show resolved Hide resolved
pub fn new(inner: T, max_frame_length: usize) -> Self {
// configure length delimited codec with max frame length
let framing = LengthDelimitedCodec::builder()
.max_frame_length(max_frame_length)
.new_codec();
// create the actual framing. This turns the AsyncRead/AsyncWrite into a Stream/Sink of Bytes/BytesMut
let framed = tokio_util::codec::FramedWrite::new(inner, framing);
let bincode = tokio_serde_postcard::SymmetricalPostcard::new();
// create the actual framing. This turns the Stream/Sink of Bytes/BytesMut into a Stream/Sink of In/Out
let framed = tokio_serde::SymmetricallyFramed::new(framed, bincode);
Self(framed)
}
}

impl<T, Out> FramedPostcardWrite<T, Out> {
/// Get the underlying binary stream
///
/// This can be useful if you want to drop the framing and use the underlying stream directly
/// after exchanging some messages.
pub fn into_inner(self) -> T {
self.0.into_inner().into_inner()
}
}

impl<T: AsyncWrite, Out: Serialize> Sink<Out> for FramedPostcardWrite<T, Out> {
type Error = std::io::Error;

fn poll_ready(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.project().0).poll_ready(cx)
}

fn start_send(self: Pin<&mut Self>, item: Out) -> Result<(), Self::Error> {
Pin::new(&mut self.project().0).start_send(item)
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.project().0).poll_flush(cx)
}

fn poll_close(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.project().0).poll_close(cx)
}
}

/// Wrapper that wraps a bidirectional binary stream in a length delimited codec and bincode with fast fixint encoding
/// to get a bidirectional stream of rpc Messages
#[pin_project]
Expand Down
Loading