Skip to content

Commit 21a6407

Browse files
using clouser
1 parent cbcb35d commit 21a6407

File tree

6 files changed

+88
-72
lines changed

6 files changed

+88
-72
lines changed

Cargo.lock

Lines changed: 0 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engineio/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ async-stream = "0.3.5"
3030
thiserror = "1.0"
3131
native-tls = "0.2.11"
3232
url = "2.5.0"
33-
rmp = "0.8"
3433

3534
[dev-dependencies]
3635
criterion = { version = "0.5.1", features = ["async_tokio"] }

engineio/src/asynchronous/async_transports/polling.rs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -101,24 +101,11 @@ impl Stream for PollingTransport {
101101

102102
#[async_trait]
103103
impl AsyncTransport for PollingTransport {
104-
async fn emit(&self, data: Bytes, is_binary_att: bool) -> Result<()> {
105-
let data_to_send = if is_binary_att {
106-
// the binary attachment gets `base64` encoded
107-
let mut packet_bytes = BytesMut::with_capacity(data.len() + 1);
108-
packet_bytes.put_u8(b'b');
109-
110-
let encoded_data = general_purpose::STANDARD.encode(data);
111-
packet_bytes.put(encoded_data.as_bytes());
112-
113-
packet_bytes.freeze()
114-
} else {
115-
data
116-
};
117-
104+
async fn emit(&self, data: Bytes) -> Result<()> {
118105
let status = self
119106
.client
120107
.post(self.address().await?)
121-
.body(data_to_send)
108+
.body(data)
122109
.send()
123110
.await?
124111
.status()

engineio/src/packet.rs

Lines changed: 81 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,93 @@
11
use base64::{engine::general_purpose, Engine as _};
22
use bytes::{BufMut, Bytes, BytesMut};
3-
use rmp::{decode, encode};
43
use serde::{Deserialize, Serialize};
54
use std::char;
65
use std::fmt::{Display, Formatter, Result as FmtResult, Write};
76
use std::ops::Index;
87

98
use crate::error::{Error, Result};
109

11-
pub mod normal;
12-
pub mod message_pack;
13-
14-
// /// Serializer of Engine.IO packet
15-
// #[derive(Copy, Clone, Debug, Eq, PartialEq)]
16-
// pub enum PacketSerializer {
17-
// /// Normal serializer
18-
// Normal,
19-
// /// MessagePack serializer
20-
// MessagePack,
21-
// }
22-
23-
// impl Default for PacketSerializer {
24-
// fn default() -> Self {
25-
// PacketSerializer::Normal
26-
// }
27-
// }
28-
29-
pub trait PacketSerializer{
30-
fn from_bytes(bytes: &[u8]) -> Result<Packet>;
31-
fn to_bytes(&self) -> Result<Bytes>;
10+
pub struct PacketSerializer {
11+
decode: Box<dyn Fn(Bytes) -> Result<Packet> + Send + Sync>,
12+
encode: Box<dyn Fn(Packet) -> Bytes + Send + Sync>,
13+
}
14+
15+
fn default_decode(bytes: Bytes) -> Result<Packet> {
16+
if bytes.is_empty() {
17+
return Err(Error::IncompletePacket());
18+
}
19+
20+
let is_base64 = *bytes.first().ok_or(Error::IncompletePacket())? == b'b';
21+
22+
// only 'messages' packets could be encoded
23+
let packet_id = if is_base64 {
24+
PacketId::MessageBinary
25+
} else {
26+
(*bytes.first().ok_or(Error::IncompletePacket())?).try_into()?
27+
};
28+
29+
if bytes.len() == 1 && packet_id == PacketId::Message {
30+
return Err(Error::IncompletePacket());
31+
}
32+
33+
let data: Bytes = bytes.slice(1..);
34+
35+
Ok(Packet {
36+
packet_id,
37+
data: if is_base64 {
38+
Bytes::from(general_purpose::STANDARD.decode(data.as_ref())?)
39+
} else {
40+
data
41+
},
42+
})
43+
}
44+
45+
fn default_encode(packet: Packet) -> Bytes {
46+
let mut result = BytesMut::with_capacity(packet.data.len() + 1);
47+
result.put_u8(packet.packet_id.to_string_byte());
48+
if packet.packet_id == PacketId::MessageBinary {
49+
result.extend(general_purpose::STANDARD.encode(packet.data).into_bytes());
50+
} else {
51+
result.put(packet.data);
52+
}
53+
result.freeze()
54+
}
55+
56+
57+
impl PacketSerializer {
58+
const SEPARATOR: char = '\x1e';
59+
60+
pub fn new(
61+
decode: Box<dyn Fn(Bytes) -> Result<Packet> + Send + Sync>,
62+
encode: Box<dyn Fn(Packet) -> Bytes + Send + Sync>,
63+
) -> Self {
64+
Self {
65+
decode,
66+
encode,
67+
}
68+
}
69+
70+
pub fn default() -> Self {
71+
let decode = Box::new(default_decode);
72+
let encode = Box::new(default_encode);
73+
Self::new(decode, encode)
74+
}
75+
76+
pub fn decode(&self, datas: Bytes) -> Result<Packet> {
77+
(self.decode)(datas)
78+
}
79+
80+
pub fn decode_payload(&self, datas: Bytes) -> Result<Payload> {
81+
datas
82+
.split(|&c| c as char == PacketSerializer::SEPARATOR)
83+
.map(|slice| self.decode(datas.slice_ref(slice)))
84+
.collect::<Result<Vec<Packet>>>()
85+
.map(Payload)
86+
}
87+
88+
pub fn encode(&self, packet: Packet) -> Bytes {
89+
(self.encode)(packet)
90+
}
3291
}
3392

3493
/// Enumeration of the `engine.io` `Packet` types.

engineio/src/socket.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub const DEFAULT_MAX_POLL_TIMEOUT: Duration = Duration::from_secs(45);
2222
#[derive(Clone)]
2323
pub struct Socket {
2424
transport: Arc<TransportType>,
25-
serializer: dyn PacketSerializer,
25+
serializer: PacketSerializer,
2626
on_close: OptionalCallback<()>,
2727
on_data: OptionalCallback<Bytes>,
2828
on_error: OptionalCallback<String>,
@@ -150,7 +150,8 @@ impl Socket {
150150
continue;
151151
}
152152

153-
let payload = Payload::try_from(data)?;
153+
// let payload = Payload::try_from(data)?;
154+
let payload = self.serializer.decode_payload(data)?;
154155
let mut iter = payload.into_iter();
155156

156157
if let Some(packet) = iter.next() {

engineio/src/transports/polling.rs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,23 +49,11 @@ impl PollingTransport {
4949
}
5050

5151
impl Transport for PollingTransport {
52-
fn emit(&self, data: Bytes, is_binary_att: bool) -> Result<()> {
53-
let data_to_send = if is_binary_att {
54-
// the binary attachment gets `base64` encoded
55-
let mut packet_bytes = BytesMut::with_capacity(data.len() + 1);
56-
packet_bytes.put_u8(b'b');
57-
58-
let encoded_data = general_purpose::STANDARD.encode(data);
59-
packet_bytes.put(encoded_data.as_bytes());
60-
61-
packet_bytes.freeze()
62-
} else {
63-
data
64-
};
52+
fn emit(&self, data: Bytes, _is_binary_att: bool) -> Result<()> {
6553
let status = self
6654
.client
6755
.post(self.address()?)
68-
.body(data_to_send)
56+
.body(data)
6957
.send()?
7058
.status()
7159
.as_u16();

0 commit comments

Comments
 (0)