Skip to content

Commit

Permalink
fix(rumqttc): fix possible starvation with pending requests
Browse files Browse the repository at this point in the history
Store the pending throttle interval within the EventLoop.

Fixes: bytebeamio#814
  • Loading branch information
Felix Obenhuber committed Mar 26, 2024
1 parent a9767c4 commit 194bb5c
Showing 1 changed file with 59 additions and 29 deletions.
88 changes: 59 additions & 29 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ use crate::eventloop::socket_connect;
use crate::framed::AsyncReadWrite;

use flume::Receiver;
use futures_util::{Stream, StreamExt};
use tokio::select;
use tokio::time::{self, error::Elapsed, Instant, Sleep};

use std::collections::VecDeque;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use super::mqttbytes::v5::ConnectReturnCode;
Expand Down Expand Up @@ -77,8 +79,8 @@ pub struct EventLoop {
pub state: MqttState,
/// Request stream
request_rx: Receiver<Request>,
/// Pending packets from last session
pub pending: VecDeque<Request>,
/// Pending requests from the last session
pending: PendingRequests,
/// Network connection to the broker
network: Option<Network>,
/// Keep alive time
Expand All @@ -98,9 +100,9 @@ impl EventLoop {
/// When connection encounters critical errors (like auth failure), user has a choice to
/// access and update `options`, `state` and `requests`.
pub fn new(options: MqttOptions, request_rx: Receiver<Request>) -> EventLoop {
let pending = VecDeque::new();
let inflight_limit = options.outgoing_inflight_upper_limit.unwrap_or(u16::MAX);
let manual_acks = options.manual_acks;
let pending = PendingRequests::new(options.pending_throttle);

EventLoop {
options,
Expand Down Expand Up @@ -161,8 +163,6 @@ impl EventLoop {
/// Select on network and requests and generate keepalive pings when necessary
async fn select(&mut self) -> Result<Event, ConnectionError> {
let network = self.network.as_mut().unwrap();
// let await_acks = self.state.await_acks;

let inflight_full = self.state.inflight >= self.state.max_outgoing_inflight;
let collision = self.state.collision.is_some();

Expand Down Expand Up @@ -202,17 +202,16 @@ impl EventLoop {
// After collision with pkid 1 -> [1b ,2, x, 4, 5].
// 1a is saved to state and event loop is set to collision mode stopping new
// outgoing requests (along with 1b).
o = Self::next_request(
&mut self.pending,
&self.request_rx,
self.options.pending_throttle
), if !self.pending.is_empty() || (!inflight_full && !collision) => match o {
Ok(request) => {
self.state.handle_outgoing_packet(request)?;
network.flush().await?;
Ok(self.state.events.pop_front().unwrap())
}
Err(_) => Err(ConnectionError::RequestsDone),
Some(request) = self.pending.next(), if !inflight_full && !collision => {
self.state.handle_outgoing_packet(request)?;
network.flush().await?;
Ok(self.state.events.pop_front().unwrap())
},
request = self.request_rx.recv_async(), if self.pending.is_empty() && !inflight_full && !collision => {
let request = request.map_err(|_| ConnectionError::RequestsDone)?;
self.state.handle_outgoing_packet(request)?;
network.flush().await?;
Ok(self.state.events.pop_front().unwrap())
},
// Pull a bunch of packets from network, reply in bunch and yield the first item
o = network.readb(&mut self.state, BATCH_SIZE) => {
Expand All @@ -231,26 +230,57 @@ impl EventLoop {
network.flush().await?;
Ok(self.state.events.pop_front().unwrap())
}
else => unreachable!("Eventloop select is exhaustive"),
}
}
}

async fn next_request(
pending: &mut VecDeque<Request>,
rx: &Receiver<Request>,
pending_throttle: Duration,
) -> Result<Request, ConnectionError> {
if !pending.is_empty() {
time::sleep(pending_throttle).await;
// We must call .next() AFTER sleep() otherwise .next() would
// advance the iterator but the future might be canceled before return
Ok(pending.pop_front().unwrap())
/// Pending requets yielded with a configured rate. If the queue is empty the stream will yield pending.
struct PendingRequests {
/// Interval
interval: Option<time::Interval>,
/// Pending requests
requests: VecDeque<Request>,
}

impl PendingRequests {
pub fn new(interval: Duration) -> Self {
let interval = (!interval.is_zero()).then(|| time::interval(interval));
PendingRequests {
interval,
requests: VecDeque::new(),
}
}

pub fn is_empty(&self) -> bool {
self.requests.is_empty()
}

pub fn extend(&mut self, requests: impl IntoIterator<Item = Request>) {
self.requests.extend(requests);
}
}

impl Stream for PendingRequests {
type Item = Request;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Request>> {
if self.is_empty() {
Poll::Pending
} else {
match rx.recv_async().await {
Ok(r) => Ok(r),
Err(_) => Err(ConnectionError::RequestsDone),
match self.interval.as_mut() {
Some(interval) => match interval.poll_tick(cx) {
Poll::Ready(_) => Poll::Ready(self.requests.pop_front()),
Poll::Pending => Poll::Pending,
},
None => Poll::Ready(self.requests.pop_front()),
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.requests.len(), Some(self.requests.len()))
}
}

/// This stream internally processes requests from the request stream provided to the eventloop
Expand Down

0 comments on commit 194bb5c

Please sign in to comment.