Skip to content

Commit

Permalink
feat(rumqttc): declare rx batch size in event loop
Browse files Browse the repository at this point in the history
This number shall be reused for the tx side as well.
  • Loading branch information
Felix Obenhuber committed Mar 26, 2024
1 parent 6877f5b commit 73d63c6
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
5 changes: 4 additions & 1 deletion rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ use {
#[cfg(feature = "proxy")]
use crate::proxy::ProxyError;

/// Number of packets or requests processed before flusing the network
const BATCH_SIZE: usize = 10;

/// Critical errors during eventloop polling
#[derive(Debug, thiserror::Error)]
pub enum ConnectionError {
Expand Down Expand Up @@ -212,7 +215,7 @@ impl EventLoop {
Err(_) => Err(ConnectionError::RequestsDone),
},
// Pull a bunch of packets from network, reply in bunch and yield the first item
o = network.readb(&mut self.state) => {
o = network.readb(&mut self.state, BATCH_SIZE) => {
o?;
// flush all the acks and return first incoming packet
network.flush().await?;
Expand Down
15 changes: 7 additions & 8 deletions rumqttc/src/v5/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ use super::{Incoming, StateError};
pub struct Network {
/// Frame MQTT packets from network connection
framed: Framed<Box<dyn AsyncReadWrite>, Codec>,
/// Maximum readv count
max_readb_count: usize,
}
impl Network {
pub fn new(socket: impl AsyncReadWrite + 'static, max_incoming_size: Option<u32>) -> Network {
Expand All @@ -26,10 +24,7 @@ impl Network {
};
let framed = Framed::new(socket, codec);

Network {
framed,
max_readb_count: 10,
}
Network { framed }
}

pub fn set_max_outgoing_size(&mut self, max_outgoing_size: Option<u32>) {
Expand All @@ -47,7 +42,11 @@ impl Network {

/// Read packets in bulk. This allow replies to be in bulk. This method is used
/// after the connection is established to read a bunch of incoming packets
pub async fn readb(&mut self, state: &mut MqttState) -> Result<(), StateError> {
pub async fn readb(
&mut self,
state: &mut MqttState,
batch_size: usize,
) -> Result<(), StateError> {
// wait for the first read
let mut res = self.framed.next().await;
let mut count = 1;
Expand All @@ -59,7 +58,7 @@ impl Network {
}

count += 1;
if count >= self.max_readb_count {
if count >= batch_size {
break;
}
}
Expand Down

0 comments on commit 73d63c6

Please sign in to comment.