From a9767c4de196eb0e5ce7ec29296bef8649046441 Mon Sep 17 00:00:00 2001 From: Felix Obenhuber Date: Tue, 19 Mar 2024 09:56:38 +0100 Subject: [PATCH] feat(rumqttc): declare rx batch size in event loop This number shall be reused for the tx side as well. --- rumqttc/src/v5/eventloop.rs | 5 ++++- rumqttc/src/v5/framed.rs | 15 +++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index f8a582f3f..4c8002b75 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -31,6 +31,9 @@ use { #[cfg(feature = "proxy")] use crate::proxy::ProxyError; +/// Number of packets or requests processed before flusing the network +const BATCH_SIZE: usize = 10; + /// Critical errors during eventloop polling #[derive(Debug, thiserror::Error)] pub enum ConnectionError { @@ -212,7 +215,7 @@ impl EventLoop { Err(_) => Err(ConnectionError::RequestsDone), }, // Pull a bunch of packets from network, reply in bunch and yield the first item - o = network.readb(&mut self.state) => { + o = network.readb(&mut self.state, BATCH_SIZE) => { o?; // flush all the acks and return first incoming packet network.flush().await?; diff --git a/rumqttc/src/v5/framed.rs b/rumqttc/src/v5/framed.rs index c7e06a250..0fcec0004 100644 --- a/rumqttc/src/v5/framed.rs +++ b/rumqttc/src/v5/framed.rs @@ -14,8 +14,6 @@ use super::{Incoming, StateError}; pub struct Network { /// Frame MQTT packets from network connection framed: Framed, Codec>, - /// Maximum readv count - max_readb_count: usize, } impl Network { pub fn new(socket: impl AsyncReadWrite + 'static, max_incoming_size: Option) -> Network { @@ -26,10 +24,7 @@ impl Network { }; let framed = Framed::new(socket, codec); - Network { - framed, - max_readb_count: 10, - } + Network { framed } } pub fn set_max_outgoing_size(&mut self, max_outgoing_size: Option) { @@ -48,7 +43,11 @@ impl Network { /// Read packets in bulk. This allow replies to be in bulk. This method is used /// after the connection is established to read a bunch of incoming packets - pub async fn readb(&mut self, state: &mut MqttState) -> Result<(), StateError> { + pub async fn readb( + &mut self, + state: &mut MqttState, + batch_size: usize, + ) -> Result<(), StateError> { // wait for the first read let mut res = self.framed.next().await; let mut count = 1; @@ -60,7 +59,7 @@ impl Network { } count += 1; - if count >= self.max_readb_count { + if count >= batch_size { break; } }