From 298323d996707757c36fa2e8632021d987300f4f Mon Sep 17 00:00:00 2001 From: Felix Obenhuber Date: Tue, 19 Mar 2024 09:50:43 +0100 Subject: [PATCH] fix(rumqttc): do not store the request tx handle in the event loop Fixes #815 --- rumqttc/src/v5/client.rs | 6 +++--- rumqttc/src/v5/eventloop.rs | 16 ++++++---------- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/rumqttc/src/v5/client.rs b/rumqttc/src/v5/client.rs index 910da504f..e38f087fb 100644 --- a/rumqttc/src/v5/client.rs +++ b/rumqttc/src/v5/client.rs @@ -11,7 +11,7 @@ use super::{ConnectionError, Event, EventLoop, MqttOptions, Request}; use crate::valid_topic; use bytes::Bytes; -use flume::{SendError, Sender, TrySendError}; +use flume::{bounded, SendError, Sender, TrySendError}; use futures_util::FutureExt; use tokio::runtime::{self, Runtime}; use tokio::time::timeout; @@ -54,8 +54,8 @@ impl AsyncClient { /// /// `cap` specifies the capacity of the bounded async channel. pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) { - let eventloop = EventLoop::new(options, cap); - let request_tx = eventloop.requests_tx.clone(); + let (request_tx, request_rx) = bounded(cap); + let eventloop = EventLoop::new(options, request_rx); let client = AsyncClient { request_tx }; diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index 36c10971d..d6ddb32ce 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -4,7 +4,7 @@ use super::{Incoming, MqttOptions, MqttState, Outgoing, Request, StateError, Tra use crate::eventloop::socket_connect; use crate::framed::N; -use flume::{bounded, Receiver, Sender}; +use flume::Receiver; use tokio::select; use tokio::time::{self, error::Elapsed, Instant, Sleep}; @@ -74,9 +74,7 @@ pub struct EventLoop { /// Current state of the connection pub state: MqttState, /// Request stream - requests_rx: Receiver, - /// Requests handle to send requests - pub(crate) requests_tx: Sender, + request_rx: Receiver, /// Pending packets from last session pub pending: VecDeque, /// Network connection to the broker @@ -97,8 +95,7 @@ impl EventLoop { /// /// When connection encounters critical errors (like auth failure), user has a choice to /// access and update `options`, `state` and `requests`. - pub fn new(options: MqttOptions, cap: usize) -> EventLoop { - let (requests_tx, requests_rx) = bounded(cap); + pub fn new(options: MqttOptions, request_rx: Receiver) -> EventLoop { let pending = VecDeque::new(); let inflight_limit = options.outgoing_inflight_upper_limit.unwrap_or(u16::MAX); let manual_acks = options.manual_acks; @@ -106,8 +103,7 @@ impl EventLoop { EventLoop { options, state: MqttState::new(inflight_limit, manual_acks), - requests_tx, - requests_rx, + request_rx, pending, network: None, keepalive_timeout: None, @@ -127,7 +123,7 @@ impl EventLoop { self.pending.extend(self.state.clean()); // drain requests from channel which weren't yet received - let requests_in_channel = self.requests_rx.drain(); + let requests_in_channel = self.request_rx.drain(); self.pending.extend(requests_in_channel); } @@ -206,7 +202,7 @@ impl EventLoop { // outgoing requests (along with 1b). o = Self::next_request( &mut self.pending, - &self.requests_rx, + &self.request_rx, self.options.pending_throttle ), if !self.pending.is_empty() || (!inflight_full && !collision) => match o { Ok(request) => {