From 03de02780dfebe4db4ee61b59d1f3bcfb5eead5c Mon Sep 17 00:00:00 2001 From: Felix Obenhuber Date: Tue, 19 Mar 2024 12:32:57 +0100 Subject: [PATCH] feat(rumqttc): batch request processing before flushing the network Collect up to BATCH_SIZE request from the client channel before flushing the network. Fixes #810 --- rumqttc/CHANGELOG.md | 1 + rumqttc/src/v5/eventloop.rs | 34 ++++++++++++++++++++++++++++++++-- rumqttc/src/v5/state.rs | 10 ++++++++++ 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index cc5c0c1e3..3fbb36d1c 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Process multiple outgoing client requests before flushing the network buffer (reduces number of system calls) ### Changed diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index c3e8288bc..4548fb48a 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -164,8 +164,8 @@ impl EventLoop { /// Select on network and requests and generate keepalive pings when necessary async fn select(&mut self) -> Result { let network = self.network.as_mut().unwrap(); - let inflight_full = self.state.inflight >= self.state.max_outgoing_inflight; - let collision = self.state.collision.is_some(); + let inflight_full = self.state.is_inflight_full(); + let collision = self.state.has_collision(); // Read buffered events from previous polls before calling a new poll if let Some(event) = self.state.events.pop_front() { @@ -209,8 +209,38 @@ impl EventLoop { Ok(self.state.events.pop_front().unwrap()) }, request = self.request_rx.recv_async(), if self.pending.is_empty() && !inflight_full && !collision => { + // Process first request let request = request.map_err(|_| ConnectionError::RequestsDone)?; self.state.handle_outgoing_packet(request)?; + + // Take up to BATCH_SIZE - 1 requests from the channel until + // - the channel is empty + // - the inflight queue is full + // - there is a collision + // If the channel is closed this is reported in the next iteration from the async recv above. + for _ in 0..(BATCH_SIZE - 1) { + if self.request_rx.is_empty() || self.state.is_inflight_full() || self.state.has_collision() + { + break; + } + + // Safe to call the blocking `recv` in here since we know the channel is not empty. + // Ensure a flush in case of any error. + if let Err(e) = self + .request_rx + .recv() + .map_err(|_| ConnectionError::RequestsDone) + .and_then(|request| { + self.state + .handle_outgoing_packet(request) + .map_err(Into::into) + }) + { + network.flush(&mut self.state.write).await?; + return Err(e); + } + } + network.flush(&mut self.state.write).await?; Ok(self.state.events.pop_front().unwrap()) }, diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 8473f1f4c..3566d283a 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -179,6 +179,16 @@ impl MqttState { self.inflight } + /// Returns true if the inflight limit is reached + pub fn is_inflight_full(&self) -> bool { + self.inflight >= self.max_outgoing_inflight + } + + /// Returns true if the state has a unresolved collision + pub fn has_collision(&self) -> bool { + self.collision.is_some() + } + /// Consolidates handling of all outgoing mqtt packet logic. Returns a packet which should /// be put on to the network by the eventloop pub fn handle_outgoing_packet(&mut self, request: Request) -> Result<(), StateError> {