Skip to content

Commit c0ecf9c

Browse files
committed
feat(adapter): factor out adapter to core
1 parent 6917604 commit c0ecf9c

File tree

17 files changed

+619
-464
lines changed

17 files changed

+619
-464
lines changed

crates/socketioxide-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ engineioxide = { version = "0.15.0", path = "../engineioxide" }
1818
serde.workspace = true
1919
thiserror.workspace = true
2020
arbitrary = { version = "1.3.2", features = ["derive"], optional = true }
21+
futures-core.workspace = true
2122

2223
[features]
2324
fuzzing = ["dep:arbitrary"]
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
//! The adapter module contains the [`CoreAdapter`] trait and other related types.
2+
//!
3+
//! It is used to implement communication between socket.io servers to share messages and state.
4+
use std::{
5+
borrow::Cow, collections::HashSet, error::Error as StdError, future::Future, time::Duration,
6+
};
7+
8+
use engineioxide::{sid::Sid, Str};
9+
use futures_core::Stream;
10+
11+
use crate::{
12+
errors::{AdapterError, DisconnectError, SocketError},
13+
packet::Packet,
14+
parser::Parse,
15+
Value,
16+
};
17+
18+
/// A room identifier
19+
pub type Room = Cow<'static, str>;
20+
21+
/// Flags that can be used to modify the behavior of the broadcast methods.
22+
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
23+
pub enum BroadcastFlags {
24+
/// Broadcast only to the current server
25+
Local,
26+
/// Broadcast to all clients except the sender
27+
Broadcast,
28+
}
29+
30+
/// Options that can be used to modify the behavior of the broadcast methods.
31+
#[derive(Clone, Debug, Default)]
32+
pub struct BroadcastOptions {
33+
/// The flags to apply to the broadcast.
34+
pub flags: HashSet<BroadcastFlags>,
35+
/// The rooms to broadcast to.
36+
pub rooms: HashSet<Room>,
37+
/// The rooms to exclude from the broadcast.
38+
pub except: HashSet<Room>,
39+
/// The socket id of the sender.
40+
pub sid: Option<Sid>,
41+
}
42+
43+
/// A trait for types that can be used as a room parameter.
44+
///
45+
/// [`String`], [`Vec<String>`], [`Vec<&str>`], [`&'static str`](str) and const arrays are implemented by default.
46+
pub trait RoomParam: Send + 'static {
47+
/// The type of the iterator returned by `into_room_iter`.
48+
type IntoIter: Iterator<Item = Room>;
49+
50+
/// Convert `self` into an iterator of rooms.
51+
fn into_room_iter(self) -> Self::IntoIter;
52+
}
53+
54+
impl RoomParam for Room {
55+
type IntoIter = std::iter::Once<Room>;
56+
#[inline(always)]
57+
fn into_room_iter(self) -> Self::IntoIter {
58+
std::iter::once(self)
59+
}
60+
}
61+
impl RoomParam for String {
62+
type IntoIter = std::iter::Once<Room>;
63+
#[inline(always)]
64+
fn into_room_iter(self) -> Self::IntoIter {
65+
std::iter::once(Cow::Owned(self))
66+
}
67+
}
68+
impl RoomParam for Vec<String> {
69+
type IntoIter = std::iter::Map<std::vec::IntoIter<String>, fn(String) -> Room>;
70+
#[inline(always)]
71+
fn into_room_iter(self) -> Self::IntoIter {
72+
self.into_iter().map(Cow::Owned)
73+
}
74+
}
75+
impl RoomParam for Vec<&'static str> {
76+
type IntoIter = std::iter::Map<std::vec::IntoIter<&'static str>, fn(&'static str) -> Room>;
77+
#[inline(always)]
78+
fn into_room_iter(self) -> Self::IntoIter {
79+
self.into_iter().map(Cow::Borrowed)
80+
}
81+
}
82+
83+
impl RoomParam for Vec<Room> {
84+
type IntoIter = std::vec::IntoIter<Room>;
85+
#[inline(always)]
86+
fn into_room_iter(self) -> Self::IntoIter {
87+
self.into_iter()
88+
}
89+
}
90+
impl RoomParam for &'static str {
91+
type IntoIter = std::iter::Once<Room>;
92+
#[inline(always)]
93+
fn into_room_iter(self) -> Self::IntoIter {
94+
std::iter::once(Cow::Borrowed(self))
95+
}
96+
}
97+
impl<const COUNT: usize> RoomParam for [&'static str; COUNT] {
98+
type IntoIter =
99+
std::iter::Map<std::array::IntoIter<&'static str, COUNT>, fn(&'static str) -> Room>;
100+
101+
#[inline(always)]
102+
fn into_room_iter(self) -> Self::IntoIter {
103+
self.into_iter().map(Cow::Borrowed)
104+
}
105+
}
106+
impl<const COUNT: usize> RoomParam for [String; COUNT] {
107+
type IntoIter = std::iter::Map<std::array::IntoIter<String, COUNT>, fn(String) -> Room>;
108+
#[inline(always)]
109+
fn into_room_iter(self) -> Self::IntoIter {
110+
self.into_iter().map(Cow::Owned)
111+
}
112+
}
113+
impl RoomParam for Sid {
114+
type IntoIter = std::iter::Once<Room>;
115+
#[inline(always)]
116+
fn into_room_iter(self) -> Self::IntoIter {
117+
std::iter::once(Cow::Owned(self.to_string()))
118+
}
119+
}
120+
121+
/// The [`SocketEmitter`] will be implmented by the socketioxide library.
122+
/// It is simply used as an abstraction to allow the adapter to communicate
123+
/// with the socket server without the need to depend on the socketioxide lib.
124+
pub trait SocketEmitter: Send + Sync + 'static {
125+
/// An error that can occur when sending data an acknowledgment.
126+
type AckError: StdError + Send + 'static;
127+
/// A stream that emits the acknowledgments of multiple sockets.
128+
type AckStream: Stream<Item = (Sid, Result<Value, Self::AckError>)> + Send + 'static;
129+
130+
/// Get all the socket ids in the namespace.
131+
fn get_all_sids(&self) -> Vec<Sid>;
132+
/// Send data to the list of socket ids.
133+
fn send_many(&self, sids: Vec<Sid>, data: Value) -> Result<(), Vec<SocketError>>;
134+
/// Send data to the list of socket ids and get a stream of acks.
135+
fn send_many_with_ack(
136+
&self,
137+
sids: Vec<Sid>,
138+
packet: Packet,
139+
timeout: Option<Duration>,
140+
) -> Self::AckStream;
141+
/// Disconnect all the sockets in the list.
142+
fn disconnect_many(&self, sid: Vec<Sid>) -> Result<(), Vec<DisconnectError>>;
143+
/// Get the path of the namespace.
144+
fn path(&self) -> Str;
145+
/// Get the parser of the namespace.
146+
fn parser(&self) -> impl Parse;
147+
}
148+
149+
/// An adapter is responsible for managing the state of the namespace.
150+
/// This adapter can be implemented to share the state between multiple servers.
151+
/// The default adapter is the [`LocalAdapter`], which stores the state in memory.
152+
pub trait CoreAdapter<E: SocketEmitter>: Sized + Send + Sync + 'static {
153+
/// An error that can occur when using the adapter. The default [`LocalAdapter`] has an [`Infallible`] error.
154+
type Error: StdError + Into<AdapterError> + Send + 'static;
155+
/// A shared state between all the namespace [`CoreAdapter`].
156+
/// This can be used to share a connection for example.
157+
type State: Send + Sync + 'static;
158+
159+
/// Creates a new adapter with the given state and socket server.
160+
fn new(state: &Self::State, sockets: E) -> Self;
161+
162+
/// Initializes the adapter.
163+
fn init(&self) -> impl Future<Output = Result<(), Self::Error>> + Send;
164+
/// Closes the adapter.
165+
fn close(&self) -> impl Future<Output = Result<(), Self::Error>> + Send;
166+
167+
/// Returns the number of servers.
168+
fn server_count(&self) -> impl Future<Output = Result<u16, Self::Error>> + Send;
169+
170+
/// Adds the socket to all the rooms.
171+
fn add_all(
172+
&self,
173+
sid: Sid,
174+
rooms: impl RoomParam,
175+
) -> impl Future<Output = Result<(), Self::Error>> + Send;
176+
/// Removes the socket from the rooms.
177+
fn del(
178+
&self,
179+
sid: Sid,
180+
rooms: impl RoomParam,
181+
) -> impl Future<Output = Result<(), Self::Error>> + Send;
182+
/// Removes the socket from all the rooms.
183+
fn del_all(&self, sid: Sid) -> impl Future<Output = Result<(), Self::Error>> + Send;
184+
185+
/// Broadcasts the packet to the sockets that match the [`BroadcastOptions`].
186+
fn broadcast(
187+
&self,
188+
packet: Packet,
189+
opts: BroadcastOptions,
190+
) -> impl Future<Output = Result<(), Vec<SocketError>>> + Send;
191+
192+
/// Broadcasts the packet to the sockets that match the [`BroadcastOptions`] and return a stream of ack responses.
193+
fn broadcast_with_ack(
194+
&self,
195+
packet: Packet,
196+
opts: BroadcastOptions,
197+
timeout: Option<Duration>,
198+
) -> impl Future<Output = Result<E::AckStream, Self::Error>> + Send;
199+
200+
/// Returns the sockets ids that match the [`BroadcastOptions`].
201+
fn sockets(
202+
&self,
203+
opts: BroadcastOptions,
204+
) -> impl Future<Output = Result<Vec<Sid>, Self::Error>> + Send;
205+
206+
/// Returns the rooms of the socket.
207+
fn socket_rooms(&self, sid: Sid)
208+
-> impl Future<Output = Result<Vec<Room>, Self::Error>> + Send;
209+
210+
/// Adds the sockets that match the [`BroadcastOptions`] to the rooms.
211+
fn add_sockets(
212+
&self,
213+
opts: BroadcastOptions,
214+
rooms: impl RoomParam,
215+
) -> impl Future<Output = Result<(), Self::Error>> + Send;
216+
217+
/// Removes the sockets that match the [`BroadcastOptions`] from the rooms.
218+
fn del_sockets(
219+
&self,
220+
opts: BroadcastOptions,
221+
rooms: impl RoomParam,
222+
) -> impl Future<Output = Result<(), Self::Error>> + Send;
223+
224+
/// Disconnects the sockets that match the [`BroadcastOptions`].
225+
fn disconnect_socket(
226+
&self,
227+
opts: BroadcastOptions,
228+
) -> impl Future<Output = Result<(), Vec<DisconnectError>>> + Send;
229+
230+
/// Returns all the rooms for this adapter.
231+
fn rooms(&self) -> impl Future<Output = Result<Vec<Room>, Self::Error>> + Send;
232+
233+
//TODO: implement
234+
// fn server_side_emit(&self, packet: Packet, opts: BroadcastOptions) -> Result<u64, Error>;
235+
// fn persist_session(&self, sid: i64);
236+
// fn restore_session(&self, sid: i64) -> Session;
237+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
//! All the errors that can be returned by the library. Mostly when using the [adapter](crate::adapter) module.
2+
use std::{convert::Infallible, fmt};
3+
/// Error type when using the underlying engine.io socket
4+
#[derive(Debug, thiserror::Error)]
5+
pub enum SocketError {
6+
/// The socket channel is full.
7+
/// You might need to increase the channel size with the [`SocketIoBuilder::max_buffer_size`] method.
8+
///
9+
/// [`SocketIoBuilder::max_buffer_size`]: crate::SocketIoBuilder#method.max_buffer_size
10+
#[error("internal channel full error")]
11+
InternalChannelFull,
12+
13+
/// The socket is already closed
14+
#[error("socket closed")]
15+
Closed,
16+
17+
/// An error occured while broadcasting to other nodes.
18+
#[error("adapter error: {0:?}")]
19+
Adapter(#[from] AdapterError),
20+
}
21+
22+
/// Error type for the [`Adapter`](crate::adapter::Adapter) trait.
23+
#[derive(Debug, thiserror::Error)]
24+
pub struct AdapterError(#[from] pub Box<dyn std::error::Error + Send>);
25+
impl fmt::Display for AdapterError {
26+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27+
fmt::Display::fmt(&self.0, f)
28+
}
29+
}
30+
impl From<Infallible> for AdapterError {
31+
fn from(_: Infallible) -> Self {
32+
panic!("Infallible should never be constructed, this is a bug")
33+
}
34+
}
35+
36+
/// Error type for sending operations.
37+
#[derive(thiserror::Error, Debug)]
38+
pub enum DisconnectError {
39+
/// The socket channel is full.
40+
/// You might need to increase the channel size with the [`SocketIoBuilder::max_buffer_size`] method.
41+
///
42+
/// [`SocketIoBuilder::max_buffer_size`]: crate::SocketIoBuilder#method.max_buffer_size
43+
#[error("internal channel full error")]
44+
InternalChannelFull,
45+
46+
/// An error occured while broadcasting to other nodes.
47+
#[error("adapter error: {0:?}")]
48+
Adapter(#[from] AdapterError),
49+
}

crates/socketioxide-core/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
//! This crate is the core of the socketioxide crate.
3333
//! It contains basic types and interfaces for the socketioxide crate and the parser sub-crates.
3434
35+
pub mod adapter;
36+
pub mod errors;
3537
pub mod packet;
3638
pub mod parser;
3739

crates/socketioxide/src/ack.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
//! - [`AckStream`]: A [`Stream`]/[`Future`] of data received from the client.
66
use std::{
77
pin::Pin,
8+
sync::Arc,
89
task::{Context, Poll},
910
time::Duration,
1011
};
@@ -16,11 +17,7 @@ use serde::de::DeserializeOwned;
1617
use tokio::{sync::oneshot::Receiver, time::Timeout};
1718

1819
use crate::{
19-
adapter::Adapter,
20-
errors::{AckError, SocketError},
21-
extract::SocketRef,
22-
packet::Packet,
23-
parser::Parser,
20+
adapter::Adapter, errors::AckError, packet::Packet, parser::Parser, socket::Socket, SocketError,
2421
};
2522
use socketioxide_core::{parser::Parse, Value};
2623
pub(crate) type AckResult<T> = Result<T, AckError>;
@@ -138,7 +135,7 @@ impl AckInnerStream {
138135
/// (5s by default) if no custom timeout is specified.
139136
pub fn broadcast<A: Adapter>(
140137
packet: Packet,
141-
sockets: Vec<SocketRef<A>>,
138+
sockets: Vec<Arc<Socket<A>>>,
142139
duration: Option<Duration>,
143140
) -> Self {
144141
let rxs = FuturesUnordered::new();

0 commit comments

Comments
 (0)