diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index f8a582f3f..4c8002b75 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -31,6 +31,9 @@ use { #[cfg(feature = "proxy")] use crate::proxy::ProxyError; +/// Number of packets or requests processed before flusing the network +const BATCH_SIZE: usize = 10; + /// Critical errors during eventloop polling #[derive(Debug, thiserror::Error)] pub enum ConnectionError { @@ -212,7 +215,7 @@ impl EventLoop { Err(_) => Err(ConnectionError::RequestsDone), }, // Pull a bunch of packets from network, reply in bunch and yield the first item - o = network.readb(&mut self.state) => { + o = network.readb(&mut self.state, BATCH_SIZE) => { o?; // flush all the acks and return first incoming packet network.flush().await?; diff --git a/rumqttc/src/v5/framed.rs b/rumqttc/src/v5/framed.rs index c7e06a250..0fcec0004 100644 --- a/rumqttc/src/v5/framed.rs +++ b/rumqttc/src/v5/framed.rs @@ -14,8 +14,6 @@ use super::{Incoming, StateError}; pub struct Network { /// Frame MQTT packets from network connection framed: Framed, Codec>, - /// Maximum readv count - max_readb_count: usize, } impl Network { pub fn new(socket: impl AsyncReadWrite + 'static, max_incoming_size: Option) -> Network { @@ -26,10 +24,7 @@ impl Network { }; let framed = Framed::new(socket, codec); - Network { - framed, - max_readb_count: 10, - } + Network { framed } } pub fn set_max_outgoing_size(&mut self, max_outgoing_size: Option) { @@ -48,7 +43,11 @@ impl Network { /// Read packets in bulk. This allow replies to be in bulk. This method is used /// after the connection is established to read a bunch of incoming packets - pub async fn readb(&mut self, state: &mut MqttState) -> Result<(), StateError> { + pub async fn readb( + &mut self, + state: &mut MqttState, + batch_size: usize, + ) -> Result<(), StateError> { // wait for the first read let mut res = self.framed.next().await; let mut count = 1; @@ -60,7 +59,7 @@ impl Network { } count += 1; - if count >= self.max_readb_count { + if count >= batch_size { break; } }