Skip to content

Commit

Permalink
refactor: move write buffer into Network
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Mar 24, 2024
1 parent cd5d6da commit 284f81a
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 169 deletions.
19 changes: 11 additions & 8 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct EventLoop {
/// Pending packets from last session
pub pending: VecDeque<Request>,
/// Network connection to the broker
network: Option<Network>,
pub network: Option<Network>,
/// Keep alive time
keepalive_timeout: Option<Pin<Box<Sleep>>>,
pub network_options: NetworkOptions,
Expand All @@ -104,11 +104,10 @@ impl EventLoop {
let pending = VecDeque::new();
let max_inflight = mqtt_options.inflight;
let manual_acks = mqtt_options.manual_acks;
let max_outgoing_packet_size = mqtt_options.max_outgoing_packet_size;

EventLoop {
mqtt_options,
state: MqttState::new(max_inflight, manual_acks, max_outgoing_packet_size),
state: MqttState::new(max_inflight, manual_acks),
requests_tx,
requests_rx,
pending,
Expand Down Expand Up @@ -189,7 +188,7 @@ impl EventLoop {
o = network.readb(&mut self.state) => {
o?;
// flush all the acks and return first incoming packet
match time::timeout(network_timeout, network.flush(&mut self.state.write)).await {
match time::timeout(network_timeout, network.flush()).await {
Ok(inner) => inner?,
Err(_)=> return Err(ConnectionError::FlushTimeout),
};
Expand Down Expand Up @@ -229,8 +228,10 @@ impl EventLoop {
self.mqtt_options.pending_throttle
), if !self.pending.is_empty() || (!inflight_full && !collision) => match o {
Ok(request) => {
self.state.handle_outgoing_packet(request)?;
match time::timeout(network_timeout, network.flush(&mut self.state.write)).await {
if let Some(outgoing) = self.state.handle_outgoing_packet(request)? {
network.write(outgoing)?;
}
match time::timeout(network_timeout, network.flush()).await {
Ok(inner) => inner?,
Err(_)=> return Err(ConnectionError::FlushTimeout),
};
Expand All @@ -245,8 +246,10 @@ impl EventLoop {
let timeout = self.keepalive_timeout.as_mut().unwrap();
timeout.as_mut().reset(Instant::now() + self.mqtt_options.keep_alive);

self.state.handle_outgoing_packet(Request::PingReq(PingReq))?;
match time::timeout(network_timeout, network.flush(&mut self.state.write)).await {
if let Some(outgoing) = self.state.handle_outgoing_packet(Request::PingReq(PingReq))? {
network.write(outgoing)?;
}
match time::timeout(network_timeout, network.flush()).await {
Ok(inner) => inner?,
Err(_)=> return Err(ConnectionError::FlushTimeout),
};
Expand Down
21 changes: 16 additions & 5 deletions rumqttc/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub struct Network {
socket: Box<dyn AsyncReadWrite>,
/// Buffered reads
read: BytesMut,
/// Buffered writes
pub write: BytesMut,
/// Use to decode MQTT packets
codec: Codec,
/// Maximum readv count
Expand All @@ -30,6 +32,7 @@ impl Network {
Network {
socket,
read: BytesMut::with_capacity(10 * 1024),
write: BytesMut::with_capacity(10 * 1024),
codec: Codec {
max_incoming_size,
max_outgoing_size,
Expand Down Expand Up @@ -87,7 +90,9 @@ impl Network {
loop {
match self.codec.decode(&mut self.read) {
Ok(Some(packet)) => {
state.handle_incoming_packet(packet)?;
if let Some(packet) = state.handle_incoming_packet(packet)? {
self.write(packet)?;
}

count += 1;
if count >= self.max_readb_count {
Expand All @@ -109,6 +114,12 @@ impl Network {
Ok(())
}

pub fn write(&mut self, packet: Packet) -> Result<(), crate::state::StateError> {
self.codec
.encode(packet, &mut self.write)
.map_err(Into::into)
}

pub async fn connect(&mut self, connect: Connect) -> io::Result<()> {
let mut write = BytesMut::new();
self.codec
Expand All @@ -119,13 +130,13 @@ impl Network {
Ok(())
}

pub async fn flush(&mut self, write: &mut BytesMut) -> io::Result<()> {
if write.is_empty() {
pub async fn flush(&mut self) -> io::Result<()> {
if self.write.is_empty() {
return Ok(());
}

self.socket.write_all(&write[..]).await?;
write.clear();
self.socket.write_all(&self.write[..]).await?;
self.write.clear();
Ok(())
}
}
Expand Down
37 changes: 35 additions & 2 deletions rumqttc/src/mqttbytes/v4/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,43 @@ impl Decoder for Codec {

impl Encoder<Packet> for Codec {
type Error = Error;

fn encode(&mut self, item: Packet, dst: &mut BytesMut) -> Result<(), Self::Error> {
item.write(dst, self.max_outgoing_size)?;

Ok(())
}
}
}

#[cfg(test)]
mod tests {
use bytes::BytesMut;
use tokio_util::codec::Encoder;

use super::Codec;
use crate::{mqttbytes::Error, Packet, Publish, QoS};

#[test]
fn outgoing_max_packet_size_check() {
let mut buf = BytesMut::new();
let mut codec = Codec {
max_incoming_size: 100,
max_outgoing_size: 200,
};

let mut small_publish = Publish::new("hello/world", QoS::AtLeastOnce, vec![1; 100]);
small_publish.pkid = 1;
codec
.encode(Packet::Publish(small_publish), &mut buf)
.unwrap();

let large_publish = Publish::new("hello/world", QoS::AtLeastOnce, vec![1; 265]);
match codec.encode(Packet::Publish(large_publish), &mut buf) {
Err(Error::OutgoingPacketTooLarge {
pkt_size: 281,
max: 200,
}) => {}
_ => unreachable!(),
}
}
}
Loading

0 comments on commit 284f81a

Please sign in to comment.