Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(socketio): Async adapter #395

Merged
merged 38 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
297edb2
wip
Totodore Nov 3, 2024
ad85341
doc: wip
Totodore Nov 3, 2024
c48787b
doc: wip
Totodore Nov 3, 2024
8647d34
doc: wip
Totodore Nov 4, 2024
85ce496
wip
Totodore Nov 4, 2024
65574c7
wip
Totodore Nov 4, 2024
e094c06
merge
Totodore Nov 4, 2024
113384b
wip: async adapter
Totodore Nov 4, 2024
1ad4be7
doc: make example works with async adapter
Totodore Nov 4, 2024
d47758f
Merge branch 'feat-adapter-rework' into async-adapter
Totodore Nov 5, 2024
4ec5721
chore(ci): run ci on all PR
Totodore Nov 5, 2024
b384743
feat(socketio/adapter): replace RoomParam by BroadcastOptions
Totodore Nov 8, 2024
6917604
* Remove Debug bound on adapter
Totodore Nov 11, 2024
c0ecf9c
feat(adapter): factor out adapter to core
Totodore Nov 15, 2024
3f21c98
feat(core/adapter): move flags to bit repr
Totodore Nov 15, 2024
421eb81
fix(socketio/ns): deadlock for disconnect_many
Totodore Nov 15, 2024
4e6ce7f
fix(adapter): test
Totodore Nov 16, 2024
22b8942
wip: move local adapter as a dep of core adapter.
Totodore Dec 4, 2024
21b9d65
feat: move from hashset to vec for options
Totodore Dec 5, 2024
9d19bf0
feat: use smallvec for broadcast flags
Totodore Dec 5, 2024
c159e21
feat(engineio): add `to_vec` for `Str`
Totodore Dec 7, 2024
040d9a6
feat: add serde for `Packet`, `AckError` & `Value`
Totodore Dec 7, 2024
5c77fd7
chore(clippy): fix clippy lints
Totodore Dec 15, 2024
a2c6af7
fix: typos
Totodore Dec 16, 2024
2f11d14
feat: type erase emitter to avoid type overflow
Totodore Oct 10, 2024
67bcc99
fix: tests
Totodore Dec 17, 2024
db50c6d
feat: typestate pattern for SocketIoBuilder + tests fixes
Totodore Dec 17, 2024
d188154
feat: revert to basic adapter init
Totodore Dec 17, 2024
114154a
feat:
Totodore Dec 19, 2024
cb30a86
feat(socketio): add broadcast operator on io struct (#401)
Totodore Dec 19, 2024
ad49977
Merge remote-tracking branch 'origin/main' into async-adapter
Totodore Dec 19, 2024
0ae33fe
feat(core): serialization size optimization.
Totodore Dec 20, 2024
6152069
wip: reduce adapter complexity by removing useless methods.
Totodore Dec 20, 2024
ee9de93
chore(clippy): fix lint `into_iter_on_ref`
Totodore Dec 21, 2024
b299d6e
fix(test): spawn init future only if not ready
Totodore Dec 21, 2024
dac1033
doc(socketio/ns): hide ns::Emitter in docs
Totodore Dec 21, 2024
87d4379
fix(examples): use new sync/async API
Totodore Dec 21, 2024
83b8865
doc(socketio): improve doc related to local/remote adapters
Totodore Dec 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/github-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ on:
branches:
- main
pull_request:
branches:
- main

jobs:
format:
Expand Down
4 changes: 2 additions & 2 deletions crates/engineioxide/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ impl EngineIoHandler for MyHandler {
let cnt = self.user_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
socket.emit(cnt.to_string()).ok();
}
fn on_message(&self, msg: Str, socket: Arc<Socket<SocketState>>) {
fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<SocketState>>) {
*socket.data.id.lock().unwrap() = msg.into(); // bind a provided user id to a socket
}
fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<SocketState>>) { }
}

// Create a new engineio layer
Expand Down
8 changes: 4 additions & 4 deletions crates/engineioxide/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
//! type Data = ();
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//!
//! let config = EngineIoConfig::builder()
Expand Down Expand Up @@ -150,12 +150,12 @@ impl EngineIoConfigBuilder {
/// println!("socket disconnect {}", socket.id);
/// }
///
/// fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) {
/// fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) {
/// println!("Ping pong message {:?}", msg);
/// socket.emit(msg).unwrap();
/// }
///
/// fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) {
/// fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) {
/// println!("Ping pong binary message {:?}", data);
/// socket.emit_binary(data).unwrap();
/// }
Expand Down
4 changes: 2 additions & 2 deletions crates/engineioxide/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ mod tests {
println!("socket disconnect {} {:?}", socket.id, reason);
}

fn on_message(&self, msg: Str, socket: Arc<Socket<Self::Data>>) {
fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<Self::Data>>) {
println!("Ping pong message {:?}", msg);
socket.emit(msg).ok();
}

fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>) {
fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<Self::Data>>) {
println!("Ping pong binary message {:?}", data);
socket.emit_binary(data).ok();
}
Expand Down
8 changes: 4 additions & 4 deletions crates/engineioxide/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
//! let cnt = self.user_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
//! socket.emit(cnt.to_string()).ok();
//! }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<SocketState>>) {
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<SocketState>>) {
//! *socket.data.id.lock().unwrap() = msg.into(); // bind a provided user id to a socket
//! }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! }
//!
//! // Create an engine io service with the given handler
Expand All @@ -60,8 +60,8 @@ pub trait EngineIoHandler: std::fmt::Debug + Send + Sync + 'static {
fn on_disconnect(&self, socket: Arc<Socket<Self::Data>>, reason: DisconnectReason);

/// Called when a message is received from the client.
fn on_message(&self, msg: Str, socket: Arc<Socket<Self::Data>>);
fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<Self::Data>>);

/// Called when a binary message is received from the client.
fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>);
fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<Self::Data>>);
}
4 changes: 2 additions & 2 deletions crates/engineioxide/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
//! type Data = ();
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//! // Create a new engineio layer
//! let layer = EngineIoLayer::new(Arc::new(MyHandler));
Expand Down
4 changes: 2 additions & 2 deletions crates/engineioxide/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
//! type Data = ();
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//!
//! // Create a new engine.io service that will return a 404 not found response for other requests
Expand Down
4 changes: 2 additions & 2 deletions crates/engineioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@
//! fn on_disconnect(&self, socket: Arc<Socket<SocketState>>, reason: DisconnectReason) {
//! let cnt = self.user_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
//! }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<SocketState>>) {
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<SocketState>>) {
//! *socket.data.id.lock().unwrap() = msg.into(); // bind a provided user id to a socket
//! }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! }
//!
//! let svc = EngineIoService::new(Arc::new(MyHandler::default()));
Expand Down
5 changes: 5 additions & 0 deletions crates/engineioxide/src/str.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ impl From<Str> for String {
unsafe { String::from_utf8_unchecked(vec) }
}
}
impl From<Str> for Vec<u8> {
fn from(value: Str) -> Self {
Vec::from(value.0)
}
}
impl Serialize for Str {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
Expand Down
4 changes: 2 additions & 2 deletions crates/engineioxide/tests/disconnect_reason.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ impl EngineIoHandler for MyHandler {
self.disconnect_tx.try_send(reason).unwrap();
}

fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) {
fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) {
println!("Ping pong message {:?}", msg);
socket.emit(msg).ok();
}

fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) {
fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) {
println!("Ping pong binary message {:?}", data);
socket.emit_binary(data).ok();
}
Expand Down
4 changes: 1 addition & 3 deletions crates/parser-common/src/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use socketioxide_core::{
Str, Value,
};

pub fn deserialize_packet(
data: Str,
) -> Result<(Packet, Option<usize>), ParseError<serde_json::Error>> {
pub fn deserialize_packet(data: Str) -> Result<(Packet, Option<usize>), ParseError> {
if data.is_empty() {
return Err(ParseError::InvalidPacketType);
}
Expand Down
44 changes: 17 additions & 27 deletions crates/parser-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use bytes::Bytes;
use serde::{Deserialize, Serialize};
use socketioxide_core::{
packet::{Packet, PacketData},
parser::{Parse, ParseError, ParserState},
parser::{Parse, ParseError, ParserError, ParserState},
Str, Value,
};

Expand All @@ -59,17 +59,11 @@ mod value;
pub struct CommonParser;

impl Parse for CommonParser {
type EncodeError = serde_json::Error;
type DecodeError = serde_json::Error;
fn encode(self, packet: Packet) -> Value {
ser::serialize_packet(packet)
}

fn decode_str(
self,
state: &ParserState,
value: Str,
) -> Result<Packet, ParseError<Self::DecodeError>> {
fn decode_str(self, state: &ParserState, value: Str) -> Result<Packet, ParseError> {
let (packet, incoming_binary_cnt) = de::deserialize_packet(value)?;
if packet.inner.is_binary() {
let incoming_binary_cnt = incoming_binary_cnt.ok_or(ParseError::InvalidAttachments)?;
Expand All @@ -87,11 +81,7 @@ impl Parse for CommonParser {
}
}

fn decode_bin(
self,
state: &ParserState,
data: Bytes,
) -> Result<Packet, ParseError<Self::DecodeError>> {
fn decode_bin(self, state: &ParserState, data: Bytes) -> Result<Packet, ParseError> {
let packet = &mut *state.partial_bin_packet.lock().unwrap();
match packet {
Some(Packet {
Expand All @@ -117,41 +107,41 @@ impl Parse for CommonParser {
self,
data: &T,
event: Option<&str>,
) -> Result<Value, Self::EncodeError> {
value::to_value(data, event)
) -> Result<Value, ParserError> {
value::to_value(data, event).map_err(ParserError::new)
}

#[inline]
fn decode_value<'de, T: Deserialize<'de>>(
self,
value: &'de mut Value,
with_event: bool,
) -> Result<T, Self::DecodeError> {
value::from_value(value, with_event)
) -> Result<T, ParserError> {
value::from_value(value, with_event).map_err(ParserError::new)
}

fn decode_default<'de, T: Deserialize<'de>>(
self,
value: Option<&'de Value>,
) -> Result<T, Self::DecodeError> {
) -> Result<T, ParserError> {
if let Some(value) = value {
let data = value
.as_str()
.expect("CommonParser only supports string values");
serde_json::from_str(data)
serde_json::from_str(data).map_err(ParserError::new)
} else {
serde_json::from_str("{}")
serde_json::from_str("{}").map_err(ParserError::new)
}
}

fn encode_default<T: ?Sized + Serialize>(self, data: &T) -> Result<Value, Self::EncodeError> {
let value = serde_json::to_string(data)?;
fn encode_default<T: ?Sized + Serialize>(self, data: &T) -> Result<Value, ParserError> {
let value = serde_json::to_string(data).map_err(ParserError::new)?;
Ok(Value::Str(Str::from(value), None))
}

#[inline]
fn read_event(self, value: &Value) -> Result<&str, Self::DecodeError> {
value::read_event(value)
fn read_event(self, value: &Value) -> Result<&str, ParserError> {
value::read_event(value).map_err(ParserError::new)
}
}

Expand Down Expand Up @@ -590,16 +580,16 @@ mod test {
fn decode_default_none() {
// Common parser should deserialize by default to an empty map to match the behavior of the
// socket.io client when deserializing incoming connect message without an auth payload.
let data: serde_json::Result<HashMap<String, ()>> = CommonParser.decode_default(None);
let data = CommonParser.decode_default::<HashMap<String, ()>>(None);
assert!(matches!(data, Ok(d) if d.is_empty()));
}

#[test]
fn decode_default_some() {
// Common parser should deserialize by default to an empty map to match the behavior of the
// socket.io client when deserializing incoming connect message without an auth payload.
let data: serde_json::Result<String> =
CommonParser.decode_default(Some(&Value::Str("\"test\"".into(), None)));
let data =
CommonParser.decode_default::<String>(Some(&Value::Str("\"test\"".into(), None)));
assert!(matches!(data, Ok(d) if d == "test"));
}

Expand Down
16 changes: 9 additions & 7 deletions crates/parser-msgpack/src/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use rmp::{
use rmp_serde::decode::Error as DecodeError;
use socketioxide_core::{
packet::{Packet, PacketData},
parser::ParseError,
parser::{ParseError, ParserError},
Str, Value,
};

pub fn deserialize_packet(buff: Bytes) -> Result<Packet, ParseError<DecodeError>> {
pub fn deserialize_packet(buff: Bytes) -> Result<Packet, ParseError> {
let mut reader = Cursor::new(buff);
let maplen = read_map_len(&mut reader).map_err(|e| {
use DecodeError::*;
Expand All @@ -22,17 +22,17 @@ pub fn deserialize_packet(buff: Bytes) -> Result<Packet, ParseError<DecodeError>
ValueReadError::InvalidDataRead(e) => InvalidDataRead(e),
ValueReadError::TypeMismatch(e) => TypeMismatch(e),
};
ParseError::ParserError(e)
ParseError::ParserError(ParserError::new(e))
})?;

// Bound check to prevent DoS attacks.
// other implementations might add some other keys that we don't support
// Therefore, we limit the number of keys to 20
if maplen == 0 || maplen > 20 {
Err(DecodeError::Uncategorized(format!(
Err(ParserError::new(DecodeError::Uncategorized(format!(
"packet length too big or empty: {}",
maplen
)))?;
))))?;
}

let mut index = 0xff;
Expand All @@ -41,7 +41,8 @@ pub fn deserialize_packet(buff: Bytes) -> Result<Packet, ParseError<DecodeError>
let mut id = None;

for _ in 0..maplen {
parse_key_value(&mut reader, &mut index, &mut nsp, &mut data_pos, &mut id)?;
parse_key_value(&mut reader, &mut index, &mut nsp, &mut data_pos, &mut id)
.map_err(ParserError::new)?;
}
let buff = reader.into_inner();
let mut data = buff.slice(data_pos.clone());
Expand All @@ -64,7 +65,8 @@ pub fn deserialize_packet(buff: Bytes) -> Result<Packet, ParseError<DecodeError>
struct ErrorMessage {
message: String,
}
let ErrorMessage { message } = rmp_serde::decode::from_slice(&buff[data_pos])?;
let ErrorMessage { message } =
rmp_serde::decode::from_slice(&buff[data_pos]).map_err(ParserError::new)?;
PacketData::ConnectError(message)
}
5 => PacketData::BinaryEvent(data, id),
Expand Down
Loading
Loading