From 15c39681f2471cf687f963134e39290c8d55f758 Mon Sep 17 00:00:00 2001 From: Max Bruckner Date: Mon, 24 Jun 2024 16:03:30 +0200 Subject: [PATCH] fix(rumqttc): subscribe_many not validating filters properly (#882) --- rumqttc/src/client.rs | 77 ++++++++++++++++++++-------------------- rumqttc/src/v5/client.rs | 76 +++++++++++++++++++-------------------- rumqttc/src/v5/mod.rs | 6 ++++ 3 files changed, 82 insertions(+), 77 deletions(-) diff --git a/rumqttc/src/client.rs b/rumqttc/src/client.rs index 15cd5f5ad..a0c907049 100644 --- a/rumqttc/src/client.rs +++ b/rumqttc/src/client.rs @@ -150,25 +150,23 @@ impl AsyncClient { /// Sends a MQTT Subscribe to the `EventLoop` pub async fn subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { - let topic = topic.into(); - let subscribe = Subscribe::new(&topic, qos); - let request = Request::Subscribe(subscribe); - if !valid_filter(&topic) { - return Err(ClientError::Request(request)); + let subscribe = Subscribe::new(topic, qos); + if !subscribe_has_valid_filters(&subscribe) { + return Err(ClientError::Request(subscribe.into())); } - self.request_tx.send_async(request).await?; + + self.request_tx.send_async(subscribe.into()).await?; Ok(()) } /// Attempts to send a MQTT Subscribe to the `EventLoop` pub fn try_subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { - let topic = topic.into(); - let subscribe = Subscribe::new(&topic, qos); - let request = Request::Subscribe(subscribe); - if !valid_filter(&topic) { - return Err(ClientError::TryRequest(request)); + let subscribe = Subscribe::new(topic, qos); + if !subscribe_has_valid_filters(&subscribe) { + return Err(ClientError::TryRequest(subscribe.into())); } - self.request_tx.try_send(request)?; + + self.request_tx.try_send(subscribe.into())?; Ok(()) } @@ -177,14 +175,12 @@ impl AsyncClient { where T: IntoIterator, { - let mut topics_iter = topics.into_iter(); - let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path)); - let subscribe = Subscribe::new_many(topics_iter); - let request = Request::Subscribe(subscribe); - if !is_valid_filters { - return Err(ClientError::Request(request)); + let subscribe = Subscribe::new_many(topics); + if !subscribe_has_valid_filters(&subscribe) { + return Err(ClientError::Request(subscribe.into())); } - self.request_tx.send_async(request).await?; + + self.request_tx.send_async(subscribe.into()).await?; Ok(()) } @@ -193,14 +189,11 @@ impl AsyncClient { where T: IntoIterator, { - let mut topics_iter = topics.into_iter(); - let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path)); - let subscribe = Subscribe::new_many(topics_iter); - let request = Request::Subscribe(subscribe); - if !is_valid_filters { - return Err(ClientError::TryRequest(request)); + let subscribe = Subscribe::new_many(topics); + if !subscribe_has_valid_filters(&subscribe) { + return Err(ClientError::TryRequest(subscribe.into())); } - self.request_tx.try_send(request)?; + self.request_tx.try_send(subscribe.into())?; Ok(()) } @@ -341,13 +334,12 @@ impl Client { /// Sends a MQTT Subscribe to the `EventLoop` pub fn subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { - let topic = topic.into(); - let subscribe = Subscribe::new(&topic, qos); - let request = Request::Subscribe(subscribe); - if !valid_filter(&topic) { - return Err(ClientError::Request(request)); + let subscribe = Subscribe::new(topic, qos); + if !subscribe_has_valid_filters(&subscribe) { + return Err(ClientError::Request(subscribe.into())); } - self.client.request_tx.send(request)?; + + self.client.request_tx.send(subscribe.into())?; Ok(()) } @@ -362,14 +354,12 @@ impl Client { where T: IntoIterator, { - let mut topics_iter = topics.into_iter(); - let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path)); - let subscribe = Subscribe::new_many(topics_iter); - let request = Request::Subscribe(subscribe); - if !is_valid_filters { - return Err(ClientError::Request(request)); + let subscribe = Subscribe::new_many(topics); + if !subscribe_has_valid_filters(&subscribe) { + return Err(ClientError::Request(subscribe.into())); } - self.client.request_tx.send(request)?; + + self.client.request_tx.send(subscribe.into())?; Ok(()) } @@ -408,6 +398,15 @@ impl Client { } } +#[must_use] +fn subscribe_has_valid_filters(subscribe: &Subscribe) -> bool { + !subscribe.filters.is_empty() + && subscribe + .filters + .iter() + .all(|filter| valid_filter(&filter.path)) +} + /// Error type returned by [`Connection::recv`] #[derive(Debug, Eq, PartialEq)] pub struct RecvError; diff --git a/rumqttc/src/v5/client.rs b/rumqttc/src/v5/client.rs index f8629b8c5..4913d1d0f 100644 --- a/rumqttc/src/v5/client.rs +++ b/rumqttc/src/v5/client.rs @@ -6,9 +6,9 @@ use super::mqttbytes::v5::{ Filter, PubAck, PubRec, Publish, PublishProperties, Subscribe, SubscribeProperties, Unsubscribe, UnsubscribeProperties, }; -use super::mqttbytes::{valid_filter, QoS}; +use super::mqttbytes::QoS; use super::{ConnectionError, Event, EventLoop, MqttOptions, Request}; -use crate::valid_topic; +use crate::{valid_filter, valid_topic}; use bytes::Bytes; use flume::{SendError, Sender, TrySendError}; @@ -256,13 +256,12 @@ impl AsyncClient { properties: Option, ) -> Result<(), ClientError> { let filter = Filter::new(topic, qos); - let is_filter_valid = valid_filter(&filter.path); let subscribe = Subscribe::new(filter, properties); - let request: Request = Request::Subscribe(subscribe); - if !is_filter_valid { - return Err(ClientError::Request(request)); + if !subscribe_has_valid_filters(&subscribe) { + return Err(ClientError::Request(subscribe.into())); } - self.request_tx.send_async(request).await?; + + self.request_tx.send_async(subscribe.into()).await?; Ok(()) } @@ -287,13 +286,12 @@ impl AsyncClient { properties: Option, ) -> Result<(), ClientError> { let filter = Filter::new(topic, qos); - let is_filter_valid = valid_filter(&filter.path); let subscribe = Subscribe::new(filter, properties); - let request = Request::Subscribe(subscribe); - if !is_filter_valid { - return Err(ClientError::TryRequest(request)); + if !subscribe_has_valid_filters(&subscribe) { + return Err(ClientError::TryRequest(subscribe.into())); } - self.request_tx.try_send(request)?; + + self.request_tx.try_send(subscribe.into())?; Ok(()) } @@ -319,15 +317,13 @@ impl AsyncClient { where T: IntoIterator, { - let mut topics_iter = topics.into_iter(); - let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path)); - let subscribe = Subscribe::new_many(topics_iter, properties); - let request = Request::Subscribe(subscribe); - if !is_valid_filters { - return Err(ClientError::Request(request)); + let subscribe = Subscribe::new_many(topics, properties); + if !subscribe_has_valid_filters(&subscribe) { + return Err(ClientError::Request(subscribe.into())); } - self.request_tx.send_async(request).await?; + self.request_tx.send_async(subscribe.into()).await?; + Ok(()) } @@ -358,14 +354,12 @@ impl AsyncClient { where T: IntoIterator, { - let mut topics_iter = topics.into_iter(); - let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path)); - let subscribe = Subscribe::new_many(topics_iter, properties); - let request = Request::Subscribe(subscribe); - if !is_valid_filters { - return Err(ClientError::TryRequest(request)); + let subscribe = Subscribe::new_many(topics, properties); + if !subscribe_has_valid_filters(&subscribe) { + return Err(ClientError::TryRequest(subscribe.into())); } - self.request_tx.try_send(request)?; + + self.request_tx.try_send(subscribe.into())?; Ok(()) } @@ -608,13 +602,12 @@ impl Client { properties: Option, ) -> Result<(), ClientError> { let filter = Filter::new(topic, qos); - let is_filter_valid = valid_filter(&filter.path); let subscribe = Subscribe::new(filter, properties); - let request = Request::Subscribe(subscribe); - if !is_filter_valid { - return Err(ClientError::Request(request)); + if !subscribe_has_valid_filters(&subscribe) { + return Err(ClientError::Request(subscribe.into())); } - self.client.request_tx.send(request)?; + + self.client.request_tx.send(subscribe.into())?; Ok(()) } @@ -655,14 +648,12 @@ impl Client { where T: IntoIterator, { - let mut topics_iter = topics.into_iter(); - let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path)); - let subscribe = Subscribe::new_many(topics_iter, properties); - let request = Request::Subscribe(subscribe); - if !is_valid_filters { - return Err(ClientError::Request(request)); + let subscribe = Subscribe::new_many(topics, properties); + if !subscribe_has_valid_filters(&subscribe) { + return Err(ClientError::Request(subscribe.into())); } - self.client.request_tx.send(request)?; + + self.client.request_tx.send(subscribe.into())?; Ok(()) } @@ -755,6 +746,15 @@ impl Client { } } +#[must_use] +fn subscribe_has_valid_filters(subscribe: &Subscribe) -> bool { + !subscribe.filters.is_empty() + && subscribe + .filters + .iter() + .all(|filter| valid_filter(&filter.path)) +} + /// Error type returned by [`Connection::recv`] #[derive(Debug, Eq, PartialEq)] pub struct RecvError; diff --git a/rumqttc/src/v5/mod.rs b/rumqttc/src/v5/mod.rs index 2518a93f1..6e0e43931 100644 --- a/rumqttc/src/v5/mod.rs +++ b/rumqttc/src/v5/mod.rs @@ -49,6 +49,12 @@ pub enum Request { Disconnect, } +impl From for Request { + fn from(subscribe: Subscribe) -> Self { + Self::Subscribe(subscribe) + } +} + #[cfg(feature = "websocket")] type RequestModifierFn = Arc< dyn Fn(http::Request<()>) -> Pin> + Send>>