diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index 4c8002b75..3e8519693 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -5,12 +5,14 @@ use crate::eventloop::socket_connect; use crate::framed::AsyncReadWrite; use flume::Receiver; +use futures_util::{Stream, StreamExt}; use tokio::select; use tokio::time::{self, error::Elapsed, Instant, Sleep}; use std::collections::VecDeque; use std::io; use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; use super::mqttbytes::v5::ConnectReturnCode; @@ -77,8 +79,8 @@ pub struct EventLoop { pub state: MqttState, /// Request stream request_rx: Receiver, - /// Pending packets from last session - pub pending: VecDeque, + /// Pending requests from the last session + pending: PendingRequests, /// Network connection to the broker network: Option, /// Keep alive time @@ -98,9 +100,9 @@ impl EventLoop { /// When connection encounters critical errors (like auth failure), user has a choice to /// access and update `options`, `state` and `requests`. pub fn new(options: MqttOptions, request_rx: Receiver) -> EventLoop { - let pending = VecDeque::new(); let inflight_limit = options.outgoing_inflight_upper_limit.unwrap_or(u16::MAX); let manual_acks = options.manual_acks; + let pending = PendingRequests::new(options.pending_throttle); EventLoop { options, @@ -161,8 +163,6 @@ impl EventLoop { /// Select on network and requests and generate keepalive pings when necessary async fn select(&mut self) -> Result { let network = self.network.as_mut().unwrap(); - // let await_acks = self.state.await_acks; - let inflight_full = self.state.inflight >= self.state.max_outgoing_inflight; let collision = self.state.collision.is_some(); @@ -202,17 +202,16 @@ impl EventLoop { // After collision with pkid 1 -> [1b ,2, x, 4, 5]. // 1a is saved to state and event loop is set to collision mode stopping new // outgoing requests (along with 1b). - o = Self::next_request( - &mut self.pending, - &self.request_rx, - self.options.pending_throttle - ), if !self.pending.is_empty() || (!inflight_full && !collision) => match o { - Ok(request) => { - self.state.handle_outgoing_packet(request)?; - network.flush().await?; - Ok(self.state.events.pop_front().unwrap()) - } - Err(_) => Err(ConnectionError::RequestsDone), + Some(request) = self.pending.next(), if !inflight_full && !collision => { + self.state.handle_outgoing_packet(request)?; + network.flush().await?; + Ok(self.state.events.pop_front().unwrap()) + }, + request = self.request_rx.recv_async(), if self.pending.is_empty() && !inflight_full && !collision => { + let request = request.map_err(|_| ConnectionError::RequestsDone)?; + self.state.handle_outgoing_packet(request)?; + network.flush().await?; + Ok(self.state.events.pop_front().unwrap()) }, // Pull a bunch of packets from network, reply in bunch and yield the first item o = network.readb(&mut self.state, BATCH_SIZE) => { @@ -231,26 +230,57 @@ impl EventLoop { network.flush().await?; Ok(self.state.events.pop_front().unwrap()) } + else => unreachable!("Eventloop select is exhaustive"), } } +} - async fn next_request( - pending: &mut VecDeque, - rx: &Receiver, - pending_throttle: Duration, - ) -> Result { - if !pending.is_empty() { - time::sleep(pending_throttle).await; - // We must call .next() AFTER sleep() otherwise .next() would - // advance the iterator but the future might be canceled before return - Ok(pending.pop_front().unwrap()) +/// Pending requets yielded with a configured rate. If the queue is empty the stream will yield pending. +struct PendingRequests { + /// Interval + interval: Option, + /// Pending requests + requests: VecDeque, +} + +impl PendingRequests { + pub fn new(interval: Duration) -> Self { + let interval = (!interval.is_zero()).then(|| time::interval(interval)); + PendingRequests { + interval, + requests: VecDeque::new(), + } + } + + pub fn is_empty(&self) -> bool { + self.requests.is_empty() + } + + pub fn extend(&mut self, requests: impl IntoIterator) { + self.requests.extend(requests); + } +} + +impl Stream for PendingRequests { + type Item = Request; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.is_empty() { + Poll::Pending } else { - match rx.recv_async().await { - Ok(r) => Ok(r), - Err(_) => Err(ConnectionError::RequestsDone), + match self.interval.as_mut() { + Some(interval) => match interval.poll_tick(cx) { + Poll::Ready(_) => Poll::Ready(self.requests.pop_front()), + Poll::Pending => Poll::Pending, + }, + None => Poll::Ready(self.requests.pop_front()), } } } + + fn size_hint(&self) -> (usize, Option) { + (self.requests.len(), Some(self.requests.len())) + } } /// This stream internally processes requests from the request stream provided to the eventloop