Skip to content

Commit

Permalink
fix(rumqttc): subscribe_many not validating filters properly (#882)
Browse files Browse the repository at this point in the history
  • Loading branch information
FSMaxB authored Jun 24, 2024
1 parent 6d7bd82 commit 15c3968
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 77 deletions.
77 changes: 38 additions & 39 deletions rumqttc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,25 +150,23 @@ impl AsyncClient {

/// Sends a MQTT Subscribe to the `EventLoop`
pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
let topic = topic.into();
let subscribe = Subscribe::new(&topic, qos);
let request = Request::Subscribe(subscribe);
if !valid_filter(&topic) {
return Err(ClientError::Request(request));
let subscribe = Subscribe::new(topic, qos);
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::Request(subscribe.into()));
}
self.request_tx.send_async(request).await?;

self.request_tx.send_async(subscribe.into()).await?;
Ok(())
}

/// Attempts to send a MQTT Subscribe to the `EventLoop`
pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
let topic = topic.into();
let subscribe = Subscribe::new(&topic, qos);
let request = Request::Subscribe(subscribe);
if !valid_filter(&topic) {
return Err(ClientError::TryRequest(request));
let subscribe = Subscribe::new(topic, qos);
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::TryRequest(subscribe.into()));
}
self.request_tx.try_send(request)?;

self.request_tx.try_send(subscribe.into())?;
Ok(())
}

Expand All @@ -177,14 +175,12 @@ impl AsyncClient {
where
T: IntoIterator<Item = SubscribeFilter>,
{
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::Request(request));
let subscribe = Subscribe::new_many(topics);
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::Request(subscribe.into()));
}
self.request_tx.send_async(request).await?;

self.request_tx.send_async(subscribe.into()).await?;
Ok(())
}

Expand All @@ -193,14 +189,11 @@ impl AsyncClient {
where
T: IntoIterator<Item = SubscribeFilter>,
{
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::TryRequest(request));
let subscribe = Subscribe::new_many(topics);
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::TryRequest(subscribe.into()));
}
self.request_tx.try_send(request)?;
self.request_tx.try_send(subscribe.into())?;
Ok(())
}

Expand Down Expand Up @@ -341,13 +334,12 @@ impl Client {

/// Sends a MQTT Subscribe to the `EventLoop`
pub fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
let topic = topic.into();
let subscribe = Subscribe::new(&topic, qos);
let request = Request::Subscribe(subscribe);
if !valid_filter(&topic) {
return Err(ClientError::Request(request));
let subscribe = Subscribe::new(topic, qos);
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::Request(subscribe.into()));
}
self.client.request_tx.send(request)?;

self.client.request_tx.send(subscribe.into())?;
Ok(())
}

Expand All @@ -362,14 +354,12 @@ impl Client {
where
T: IntoIterator<Item = SubscribeFilter>,
{
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::Request(request));
let subscribe = Subscribe::new_many(topics);
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::Request(subscribe.into()));
}
self.client.request_tx.send(request)?;

self.client.request_tx.send(subscribe.into())?;
Ok(())
}

Expand Down Expand Up @@ -408,6 +398,15 @@ impl Client {
}
}

#[must_use]
fn subscribe_has_valid_filters(subscribe: &Subscribe) -> bool {
!subscribe.filters.is_empty()
&& subscribe
.filters
.iter()
.all(|filter| valid_filter(&filter.path))
}

/// Error type returned by [`Connection::recv`]
#[derive(Debug, Eq, PartialEq)]
pub struct RecvError;
Expand Down
76 changes: 38 additions & 38 deletions rumqttc/src/v5/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use super::mqttbytes::v5::{
Filter, PubAck, PubRec, Publish, PublishProperties, Subscribe, SubscribeProperties,
Unsubscribe, UnsubscribeProperties,
};
use super::mqttbytes::{valid_filter, QoS};
use super::mqttbytes::QoS;
use super::{ConnectionError, Event, EventLoop, MqttOptions, Request};
use crate::valid_topic;
use crate::{valid_filter, valid_topic};

use bytes::Bytes;
use flume::{SendError, Sender, TrySendError};
Expand Down Expand Up @@ -256,13 +256,12 @@ impl AsyncClient {
properties: Option<SubscribeProperties>,
) -> Result<(), ClientError> {
let filter = Filter::new(topic, qos);
let is_filter_valid = valid_filter(&filter.path);
let subscribe = Subscribe::new(filter, properties);
let request: Request = Request::Subscribe(subscribe);
if !is_filter_valid {
return Err(ClientError::Request(request));
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::Request(subscribe.into()));
}
self.request_tx.send_async(request).await?;

self.request_tx.send_async(subscribe.into()).await?;
Ok(())
}

Expand All @@ -287,13 +286,12 @@ impl AsyncClient {
properties: Option<SubscribeProperties>,
) -> Result<(), ClientError> {
let filter = Filter::new(topic, qos);
let is_filter_valid = valid_filter(&filter.path);
let subscribe = Subscribe::new(filter, properties);
let request = Request::Subscribe(subscribe);
if !is_filter_valid {
return Err(ClientError::TryRequest(request));
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::TryRequest(subscribe.into()));
}
self.request_tx.try_send(request)?;

self.request_tx.try_send(subscribe.into())?;
Ok(())
}

Expand All @@ -319,15 +317,13 @@ impl AsyncClient {
where
T: IntoIterator<Item = Filter>,
{
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter, properties);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::Request(request));
let subscribe = Subscribe::new_many(topics, properties);
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::Request(subscribe.into()));
}

self.request_tx.send_async(request).await?;
self.request_tx.send_async(subscribe.into()).await?;

Ok(())
}

Expand Down Expand Up @@ -358,14 +354,12 @@ impl AsyncClient {
where
T: IntoIterator<Item = Filter>,
{
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter, properties);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::TryRequest(request));
let subscribe = Subscribe::new_many(topics, properties);
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::TryRequest(subscribe.into()));
}
self.request_tx.try_send(request)?;

self.request_tx.try_send(subscribe.into())?;
Ok(())
}

Expand Down Expand Up @@ -608,13 +602,12 @@ impl Client {
properties: Option<SubscribeProperties>,
) -> Result<(), ClientError> {
let filter = Filter::new(topic, qos);
let is_filter_valid = valid_filter(&filter.path);
let subscribe = Subscribe::new(filter, properties);
let request = Request::Subscribe(subscribe);
if !is_filter_valid {
return Err(ClientError::Request(request));
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::Request(subscribe.into()));
}
self.client.request_tx.send(request)?;

self.client.request_tx.send(subscribe.into())?;
Ok(())
}

Expand Down Expand Up @@ -655,14 +648,12 @@ impl Client {
where
T: IntoIterator<Item = Filter>,
{
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter, properties);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::Request(request));
let subscribe = Subscribe::new_many(topics, properties);
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::Request(subscribe.into()));
}
self.client.request_tx.send(request)?;

self.client.request_tx.send(subscribe.into())?;
Ok(())
}

Expand Down Expand Up @@ -755,6 +746,15 @@ impl Client {
}
}

#[must_use]
fn subscribe_has_valid_filters(subscribe: &Subscribe) -> bool {
!subscribe.filters.is_empty()
&& subscribe
.filters
.iter()
.all(|filter| valid_filter(&filter.path))
}

/// Error type returned by [`Connection::recv`]
#[derive(Debug, Eq, PartialEq)]
pub struct RecvError;
Expand Down
6 changes: 6 additions & 0 deletions rumqttc/src/v5/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ pub enum Request {
Disconnect,
}

impl From<Subscribe> for Request {
fn from(subscribe: Subscribe) -> Self {
Self::Subscribe(subscribe)
}
}

#[cfg(feature = "websocket")]
type RequestModifierFn = Arc<
dyn Fn(http::Request<()>) -> Pin<Box<dyn Future<Output = http::Request<()>> + Send>>
Expand Down

0 comments on commit 15c3968

Please sign in to comment.