Skip to content

Commit

Permalink
feat(rumqttc): batch request processing before flushing the network
Browse files Browse the repository at this point in the history
Collect up to BATCH_SIZE request from the client channel before flushing the network.

Fixes bytebeamio#810
  • Loading branch information
Felix Obenhuber committed Mar 19, 2024
1 parent 0bdc527 commit bca9060
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
34 changes: 32 additions & 2 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ impl EventLoop {
/// Select on network and requests and generate keepalive pings when necessary
async fn select(&mut self) -> Result<Event, ConnectionError> {
let network = self.network.as_mut().unwrap();
let inflight_full = self.state.inflight >= self.state.max_outgoing_inflight;
let collision = self.state.collision.is_some();
let inflight_full = self.state.is_inflight_full();
let collision = self.state.has_collision();

// Read buffered events from previous polls before calling a new poll
if let Some(event) = self.state.events.pop_front() {
Expand Down Expand Up @@ -209,8 +209,38 @@ impl EventLoop {
Ok(self.state.events.pop_front().unwrap())
},
request = self.request_rx.recv_async(), if self.pending.is_empty() && !inflight_full && !collision => {
// Process first request
let request = request.map_err(|_| ConnectionError::RequestsDone)?;
self.state.handle_outgoing_packet(request)?;

// Take up to BATCH_SIZE - 1 requests from the channel until
// - the channel is empty
// - the inflight queue is full
// - there is a collision
// If the channel is closed this is reported in the next iteration from the async recv above.
for _ in 0..(BATCH_SIZE - 1) {
if self.request_rx.is_empty() || self.state.is_inflight_full() || self.state.has_collision()
{
break;
}

// Safe to call the blocking `recv` in here since we know the channel is not empty.
// Ensure a flush in case of any error.
if let Err(e) = self
.request_rx
.recv()
.map_err(|_| ConnectionError::RequestsDone)
.and_then(|request| {
self.state
.handle_outgoing_packet(request)
.map_err(Into::into)
})
{
network.flush(&mut self.state.write).await?;
return Err(e);
}
}

network.flush(&mut self.state.write).await?;
Ok(self.state.events.pop_front().unwrap())
},
Expand Down
10 changes: 10 additions & 0 deletions rumqttc/src/v5/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,16 @@ impl MqttState {
self.inflight
}

/// Returns true if the inflight limit is reached
pub fn is_inflight_full(&self) -> bool {
self.inflight >= self.max_outgoing_inflight
}

/// Returns true if the state has a unresolved collision
pub fn has_collision(&self) -> bool {
self.collision.is_some()
}

/// Consolidates handling of all outgoing mqtt packet logic. Returns a packet which should
/// be put on to the network by the eventloop
pub fn handle_outgoing_packet(&mut self, request: Request) -> Result<(), StateError> {
Expand Down

0 comments on commit bca9060

Please sign in to comment.