Skip to content

Commit

Permalink
feat(engineio): add a Str type for Message packet (#310)
Browse files Browse the repository at this point in the history
* feat(engineio): add a `Str` type for message type

* feat(socketio/packet): Parse a `Packet` from a `Str` directly
  • Loading branch information
Totodore authored Apr 21, 2024
1 parent 25a3568 commit 867f2b5
Show file tree
Hide file tree
Showing 20 changed files with 211 additions and 76 deletions.
3 changes: 2 additions & 1 deletion e2e/engineioxide/engineioxide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use engineioxide::{
handler::EngineIoHandler,
service::EngineIoService,
socket::{DisconnectReason, Socket},
Str,
};
use hyper::server::conn::http1;
use hyper_util::rt::TokioIo;
Expand All @@ -28,7 +29,7 @@ impl EngineIoHandler for MyHandler {
println!("socket disconnect {}: {:?}", socket.id, reason);
}

fn on_message(&self, msg: String, socket: Arc<Socket<Self::Data>>) {
fn on_message(&self, msg: Str, socket: Arc<Socket<Self::Data>>) {
println!("Ping pong message {:?}", msg);
socket.emit(msg).ok();
}
Expand Down
6 changes: 3 additions & 3 deletions engineioxide/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ engineioxide = { version = "0.3.0", features = ["v3"] }
use bytes::Bytes;
use engineioxide::layer::EngineIoLayer;
use engineioxide::handler::EngineIoHandler;
use engineioxide::{Socket, DisconnectReason};
use engineioxide::{Socket, DisconnectReason, Str};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use axum::routing::get;
Expand All @@ -50,8 +50,8 @@ impl EngineIoHandler for MyHandler {
let cnt = self.user_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
socket.emit(cnt.to_string()).ok();
}
fn on_message(&self, msg: String, socket: Arc<Socket<SocketState>>) {
*socket.data.id.lock().unwrap() = msg; // bind a provided user id to a socket
fn on_message(&self, msg: Str, socket: Arc<Socket<SocketState>>) {
*socket.data.id.lock().unwrap() = msg.into(); // bind a provided user id to a socket
}
fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
}
Expand Down
34 changes: 27 additions & 7 deletions engineioxide/benches/packet_decode.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,51 @@
use bytes::Bytes;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion};
use engineioxide::Packet;

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("engineio_packet/decode");
group.bench_function("Decode packet ping/pong", |b| {
let packet: String = Packet::Ping.try_into().unwrap();
b.iter(|| Packet::try_from(packet.as_str()).unwrap())
b.iter_batched(
|| packet.clone(),
|p| Packet::try_from(p).unwrap(),
BatchSize::SmallInput,
)
});
group.bench_function("Decode packet ping/pong upgrade", |b| {
let packet: String = Packet::PingUpgrade.try_into().unwrap();
b.iter(|| Packet::try_from(packet.as_str()).unwrap())
b.iter_batched(
|| packet.clone(),
|p| Packet::try_from(p).unwrap(),
BatchSize::SmallInput,
)
});
group.bench_function("Decode packet message", |b| {
let packet: String = Packet::Message(black_box("Hello").to_string())
let packet: String = Packet::Message(black_box("Hello").into())
.try_into()
.unwrap();
b.iter(|| Packet::try_from(packet.as_str()).unwrap())
b.iter_batched(
|| packet.clone(),
|p| Packet::try_from(p).unwrap(),
BatchSize::SmallInput,
)
});
group.bench_function("Decode packet noop", |b| {
let packet: String = Packet::Noop.try_into().unwrap();
b.iter(|| Packet::try_from(packet.as_str()).unwrap())
b.iter_batched(
|| packet.clone(),
|p| Packet::try_from(p).unwrap(),
BatchSize::SmallInput,
)
});
group.bench_function("Decode packet binary b64", |b| {
const BYTES: Bytes = Bytes::from_static(&[0x00, 0x01, 0x02, 0x03, 0x04, 0x05]);
let packet: String = Packet::Binary(BYTES).try_into().unwrap();
b.iter(|| Packet::try_from(packet.clone()).unwrap())
b.iter_batched(
|| packet.clone(),
|p| Packet::try_from(p).unwrap(),
BatchSize::SmallInput,
)
});

group.finish();
Expand Down
40 changes: 32 additions & 8 deletions engineioxide/benches/packet_encode.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::Bytes;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion};
use engineioxide::{config::EngineIoConfig, sid::Sid, OpenPacket, Packet, TransportType};

fn criterion_benchmark(c: &mut Criterion) {
Expand All @@ -10,28 +10,52 @@ fn criterion_benchmark(c: &mut Criterion) {
black_box(Sid::ZERO),
&EngineIoConfig::default(),
));
b.iter(|| TryInto::<String>::try_into(packet.clone()))
b.iter_batched(
|| packet.clone(),
|p| TryInto::<String>::try_into(p),
BatchSize::SmallInput,
)
});
group.bench_function("Encode packet ping/pong", |b| {
let packet = Packet::Ping;
b.iter(|| TryInto::<String>::try_into(packet.clone()))
b.iter_batched(
|| packet.clone(),
|p| TryInto::<String>::try_into(p),
BatchSize::SmallInput,
)
});
group.bench_function("Encode packet ping/pong upgrade", |b| {
let packet = Packet::PingUpgrade;
b.iter(|| TryInto::<String>::try_into(packet.clone()))
b.iter_batched(
|| packet.clone(),
|p| TryInto::<String>::try_into(p),
BatchSize::SmallInput,
)
});
group.bench_function("Encode packet message", |b| {
let packet = Packet::Message(black_box("Hello").to_string());
b.iter(|| TryInto::<String>::try_into(packet.clone()))
let packet = Packet::Message(black_box("Hello").into());
b.iter_batched(
|| packet.clone(),
|p| TryInto::<String>::try_into(p),
BatchSize::SmallInput,
)
});
group.bench_function("Encode packet noop", |b| {
let packet = Packet::Noop;
b.iter(|| TryInto::<String>::try_into(packet.clone()))
b.iter_batched(
|| packet.clone(),
|p| TryInto::<String>::try_into(p),
BatchSize::SmallInput,
)
});
group.bench_function("Encode packet binary b64", |b| {
const BYTES: Bytes = Bytes::from_static(&[0x00, 0x01, 0x02, 0x03, 0x04, 0x05]);
let packet = Packet::Binary(BYTES);
b.iter(|| TryInto::<String>::try_into(packet.clone()))
b.iter_batched(
|| packet.clone(),
|p| TryInto::<String>::try_into(p),
BatchSize::SmallInput,
)
});

group.finish();
Expand Down
7 changes: 4 additions & 3 deletions engineioxide/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! # use engineioxide::service::EngineIoService;
//! # use engineioxide::handler::EngineIoHandler;
//! # use std::time::Duration;
//! # use engineioxide::{Socket, DisconnectReason};
//! # use engineioxide::{Socket, DisconnectReason, Str};
//! # use std::sync::Arc;
//! #[derive(Debug, Clone)]
//! struct MyHandler;
Expand All @@ -15,7 +15,7 @@
//! type Data = ();
//! fn on_connect(&self, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: String, socket: Arc<Socket<()>>) { }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//!
Expand Down Expand Up @@ -131,6 +131,7 @@ impl EngineIoConfigBuilder {
/// ```
/// # use bytes::Bytes;
/// # use engineioxide::{
/// Str,
/// layer::EngineIoLayer,
/// handler::EngineIoHandler,
/// socket::{Socket, DisconnectReason},
Expand All @@ -149,7 +150,7 @@ impl EngineIoConfigBuilder {
/// println!("socket disconnect {}", socket.id);
/// }
///
/// fn on_message(&self, msg: String, socket: Arc<Socket<()>>) {
/// fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) {
/// println!("Ping pong message {:?}", msg);
/// socket.emit(msg).unwrap();
/// }
Expand Down
3 changes: 2 additions & 1 deletion engineioxide/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl<H: EngineIoHandler> EngineIo<H> {

#[cfg(test)]
mod tests {
use crate::str::Str;
use bytes::Bytes;
use http::Request;

Expand All @@ -114,7 +115,7 @@ mod tests {
println!("socket disconnect {} {:?}", socket.id, reason);
}

fn on_message(&self, msg: String, socket: Arc<Socket<Self::Data>>) {
fn on_message(&self, msg: Str, socket: Arc<Socket<Self::Data>>) {
println!("Ping pong message {:?}", msg);
socket.emit(msg).ok();
}
Expand Down
11 changes: 6 additions & 5 deletions engineioxide/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! # use bytes::Bytes;
//! # use engineioxide::service::EngineIoService;
//! # use engineioxide::handler::EngineIoHandler;
//! # use engineioxide::{Socket, DisconnectReason};
//! # use engineioxide::{Socket, DisconnectReason, Str};
//! # use std::sync::{Arc, Mutex};
//! # use std::sync::atomic::{AtomicUsize, Ordering};
//! // Global state
Expand All @@ -30,8 +30,8 @@
//! let cnt = self.user_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
//! socket.emit(cnt.to_string()).ok();
//! }
//! fn on_message(&self, msg: String, socket: Arc<Socket<SocketState>>) {
//! *socket.data.id.lock().unwrap() = msg; // bind a provided user id to a socket
//! fn on_message(&self, msg: Str, socket: Arc<Socket<SocketState>>) {
//! *socket.data.id.lock().unwrap() = msg.into(); // bind a provided user id to a socket
//! }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! }
Expand All @@ -44,6 +44,7 @@ use std::sync::Arc;
use bytes::Bytes;

use crate::socket::{DisconnectReason, Socket};
use crate::str::Str;

/// The [`EngineIoHandler`] trait can be implemented on any struct to handle socket events
///
Expand All @@ -59,7 +60,7 @@ pub trait EngineIoHandler: std::fmt::Debug + Send + Sync + 'static {
fn on_disconnect(&self, socket: Arc<Socket<Self::Data>>, reason: DisconnectReason);

/// Called when a message is received from the client.
fn on_message(&self, msg: String, socket: Arc<Socket<Self::Data>>);
fn on_message(&self, msg: Str, socket: Arc<Socket<Self::Data>>);

/// Called when a binary message is received from the client.
fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>);
Expand All @@ -76,7 +77,7 @@ impl<T: EngineIoHandler> EngineIoHandler for Arc<T> {
(**self).on_disconnect(socket, reason)
}

fn on_message(&self, msg: String, socket: Arc<Socket<Self::Data>>) {
fn on_message(&self, msg: Str, socket: Arc<Socket<Self::Data>>) {
(**self).on_message(msg, socket)
}

Expand Down
4 changes: 2 additions & 2 deletions engineioxide/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! # use bytes::Bytes;
//! # use engineioxide::layer::EngineIoLayer;
//! # use engineioxide::handler::EngineIoHandler;
//! # use engineioxide::{Socket, DisconnectReason};
//! # use engineioxide::{Socket, DisconnectReason, Str};
//! # use std::sync::Arc;
//! # use axum::routing::get;
//! #[derive(Debug, Clone)]
Expand All @@ -15,7 +15,7 @@
//! type Data = ();
//! fn on_connect(&self, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: String, socket: Arc<Socket<()>>) { }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//! // Create a new engineio layer
Expand Down
2 changes: 2 additions & 0 deletions engineioxide/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
)]
#![doc = include_str!("../Readme.md")]

pub use crate::str::Str;
pub use service::{ProtocolVersion, TransportType};
pub use socket::{DisconnectReason, Socket};

Expand All @@ -50,4 +51,5 @@ mod engine;
mod errors;
mod packet;
mod peekable;
mod str;
mod transport;
17 changes: 9 additions & 8 deletions engineioxide/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde::Serialize;
use crate::config::EngineIoConfig;
use crate::errors::Error;
use crate::sid::Sid;
use crate::str::Str;
use crate::TransportType;

/// A Packet type to use when receiving and sending data from the client
Expand All @@ -27,7 +28,7 @@ pub enum Packet {
PongUpgrade,

/// Message packet used to send a message to the client
Message(String),
Message(Str),
/// Upgrade packet to upgrade the connection from polling to websocket
Upgrade,

Expand Down Expand Up @@ -58,7 +59,7 @@ impl Packet {
}

/// If the packet is a message packet (text), it returns the message
pub(crate) fn into_message(self) -> String {
pub(crate) fn into_message(self) -> Str {
match self {
Packet::Message(msg) => msg,
_ => panic!("Packet is not a message"),
Expand Down Expand Up @@ -143,9 +144,9 @@ impl TryInto<String> for Packet {
}
}
/// Deserialize a [Packet] from a [String] according to the Engine.IO protocol
impl TryFrom<&str> for Packet {
impl TryFrom<Str> for Packet {
type Error = Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
fn try_from(value: Str) -> Result<Self, Self::Error> {
let packet_type = value
.as_bytes()
.first()
Expand All @@ -157,17 +158,17 @@ impl TryFrom<&str> for Packet {
b'2' => Packet::Ping,
b'3' if is_upgrade => Packet::PongUpgrade,
b'3' => Packet::Pong,
b'4' => Packet::Message(value[1..].to_string()),
b'4' => Packet::Message(value.slice(1..)),
b'5' => Packet::Upgrade,
b'6' => Packet::Noop,
b'b' if value.as_bytes().get(1) == Some(&b'4') => Packet::BinaryV3(
general_purpose::STANDARD
.decode(value[2..].as_bytes())?
.decode(value.slice(2..).as_bytes())?
.into(),
),
b'b' => Packet::Binary(
general_purpose::STANDARD
.decode(value[1..].as_bytes())?
.decode(value.slice(1..).as_bytes())?
.into(),
),
c => Err(Error::InvalidPacketType(Some(*c as char)))?,
Expand All @@ -179,7 +180,7 @@ impl TryFrom<&str> for Packet {
impl TryFrom<String> for Packet {
type Error = Error;
fn try_from(value: String) -> Result<Self, Self::Error> {
Packet::try_from(value.as_str())
Packet::try_from(Str::from(value))
}
}

Expand Down
4 changes: 2 additions & 2 deletions engineioxide/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! # use engineioxide::layer::EngineIoLayer;
//! # use engineioxide::handler::EngineIoHandler;
//! # use engineioxide::service::EngineIoService;
//! # use engineioxide::{Socket, DisconnectReason};
//! # use engineioxide::{Socket, DisconnectReason, Str};
//! # use std::sync::Arc;
//! #[derive(Debug)]
//! struct MyHandler;
Expand All @@ -17,7 +17,7 @@
//! type Data = ();
//! fn on_connect(&self, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: String, socket: Arc<Socket<()>>) { }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//!
Expand Down
Loading

0 comments on commit 867f2b5

Please sign in to comment.