Skip to content

Commit

Permalink
feat(rumqttc): batch request processing before flushing the network
Browse files Browse the repository at this point in the history
Collect up to BATCH_SIZE request from the client channel before flushing the network.

Fixes bytebeamio#810
  • Loading branch information
Felix Obenhuber committed Mar 26, 2024
1 parent 8e960bb commit 6dc0804
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 2 deletions.
1 change: 1 addition & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Process multiple outgoing client requests before flushing the network buffer (reduces number of system calls)

* `size()` method on `Packet` calculates size once serialized.
* `read()` and `write()` methods on `Packet`.
Expand Down
34 changes: 32 additions & 2 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ impl EventLoop {
/// Select on network and requests and generate keepalive pings when necessary
async fn select(&mut self) -> Result<Event, ConnectionError> {
let network = self.network.as_mut().unwrap();
let inflight_full = self.state.inflight >= self.state.max_outgoing_inflight;
let collision = self.state.collision.is_some();
let inflight_full = self.state.is_inflight_full();
let collision = self.state.has_collision();

// Read buffered events from previous polls before calling a new poll
if let Some(event) = self.state.events.pop_front() {
Expand Down Expand Up @@ -208,8 +208,38 @@ impl EventLoop {
Ok(self.state.events.pop_front().unwrap())
},
request = self.request_rx.recv_async(), if self.pending.is_empty() && !inflight_full && !collision => {
// Process first request
let request = request.map_err(|_| ConnectionError::RequestsDone)?;
self.state.handle_outgoing_packet(request)?;

// Take up to BATCH_SIZE - 1 requests from the channel until
// - the channel is empty
// - the inflight queue is full
// - there is a collision
// If the channel is closed this is reported in the next iteration from the async recv above.
for _ in 0..(BATCH_SIZE - 1) {
if self.request_rx.is_empty() || self.state.is_inflight_full() || self.state.has_collision()
{
break;
}

// Safe to call the blocking `recv` in here since we know the channel is not empty.
// Ensure a flush in case of any error.
if let Err(e) = self
.request_rx
.recv()
.map_err(|_| ConnectionError::RequestsDone)
.and_then(|request| {
self.state
.handle_outgoing_packet(request)
.map_err(Into::into)
})
{
network.flush().await?;
return Err(e);
}
}

network.flush().await?;
Ok(self.state.events.pop_front().unwrap())
},
Expand Down
10 changes: 10 additions & 0 deletions rumqttc/src/v5/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ impl MqttState {
self.inflight
}

/// Returns true if the inflight limit is reached
pub fn is_inflight_full(&self) -> bool {
self.inflight >= self.max_outgoing_inflight
}

/// Returns true if the state has a unresolved collision
pub fn has_collision(&self) -> bool {
self.collision.is_some()
}

/// Consolidates handling of all outgoing mqtt packet logic. Returns a packet which should
/// be put on to the network by the eventloop
pub fn handle_outgoing_packet(
Expand Down

0 comments on commit 6dc0804

Please sign in to comment.