Skip to content

Commit

Permalink
fix(rumqttc): do not store the request tx handle in the event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
Felix Obenhuber committed Mar 26, 2024
1 parent af55848 commit 7f087c5
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 13 deletions.
6 changes: 3 additions & 3 deletions rumqttc/src/v5/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{ConnectionError, Event, EventLoop, MqttOptions, Request};
use crate::valid_topic;

use bytes::Bytes;
use flume::{SendError, Sender, TrySendError};
use flume::{bounded, SendError, Sender, TrySendError};
use futures_util::FutureExt;
use tokio::runtime::{self, Runtime};
use tokio::time::timeout;
Expand Down Expand Up @@ -54,8 +54,8 @@ impl AsyncClient {
///
/// `cap` specifies the capacity of the bounded async channel.
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
let eventloop = EventLoop::new(options, cap);
let request_tx = eventloop.requests_tx.clone();
let (request_tx, request_rx) = bounded(cap);
let eventloop = EventLoop::new(options, request_rx);

let client = AsyncClient { request_tx };

Expand Down
16 changes: 6 additions & 10 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::{Incoming, MqttOptions, MqttState, Outgoing, Request, StateError, Tra
use crate::eventloop::socket_connect;
use crate::framed::AsyncReadWrite;

use flume::{bounded, Receiver, Sender};
use flume::Receiver;
use tokio::select;
use tokio::time::{self, error::Elapsed, Instant, Sleep};

Expand Down Expand Up @@ -73,9 +73,7 @@ pub struct EventLoop {
/// Current state of the connection
pub state: MqttState,
/// Request stream
requests_rx: Receiver<Request>,
/// Requests handle to send requests
pub(crate) requests_tx: Sender<Request>,
request_rx: Receiver<Request>,
/// Pending packets from last session
pub pending: VecDeque<Request>,
/// Network connection to the broker
Expand All @@ -96,17 +94,15 @@ impl EventLoop {
///
/// When connection encounters critical errors (like auth failure), user has a choice to
/// access and update `options`, `state` and `requests`.
pub fn new(options: MqttOptions, cap: usize) -> EventLoop {
let (requests_tx, requests_rx) = bounded(cap);
pub fn new(options: MqttOptions, request_rx: Receiver<Request>) -> EventLoop {
let pending = VecDeque::new();
let inflight_limit = options.outgoing_inflight_upper_limit.unwrap_or(u16::MAX);
let manual_acks = options.manual_acks;

EventLoop {
options,
state: MqttState::new(inflight_limit, manual_acks),
requests_tx,
requests_rx,
request_rx,
pending,
network: None,
keepalive_timeout: None,
Expand All @@ -126,7 +122,7 @@ impl EventLoop {
self.pending.extend(self.state.clean());

// drain requests from channel which weren't yet received
let requests_in_channel = self.requests_rx.drain();
let requests_in_channel = self.request_rx.drain();
self.pending.extend(requests_in_channel);
}

Expand Down Expand Up @@ -205,7 +201,7 @@ impl EventLoop {
// outgoing requests (along with 1b).
o = Self::next_request(
&mut self.pending,
&self.requests_rx,
&self.request_rx,
self.options.pending_throttle
), if !self.pending.is_empty() || (!inflight_full && !collision) => match o {
Ok(request) => {
Expand Down

0 comments on commit 7f087c5

Please sign in to comment.