diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index c8c8716a3..0f359c5c2 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Made `DisconnectProperties` struct public. * Replace `Vec>` with `FixedBitSet` for managing packet ids of released QoS 2 publishes and incoming QoS 2 publishes in `MqttState`. * Accept `native_tls::TlsConnector` as input for `Transport::tls_with_config`. +* Return mapping of `subscription -> [client]` as console `/subscriptions` response ### Deprecated diff --git a/rumqttd/src/link/console.rs b/rumqttd/src/link/console.rs index 9f03f8a1e..80672bb50 100644 --- a/rumqttd/src/link/console.rs +++ b/rumqttd/src/link/console.rs @@ -10,6 +10,7 @@ use axum::{routing::get, Router}; use flume::Sender; use std::sync::Arc; use tokio::net::TcpListener; +use tokio::sync::oneshot; use tracing::info; #[derive(Debug)] @@ -97,13 +98,19 @@ async fn device_with_id( } async fn subscriptions(State(console): State>) -> impl IntoResponse { - let event = Event::PrintStatus(Print::Subscriptions); + let (tx, rx) = oneshot::channel(); + let event = Event::PrintStatus(Print::Subscriptions(tx)); let message = (console.connection_id, event); + let response_404 = Response::builder().status(404).body("".to_owned()).unwrap(); if console.router_tx.send(message).is_err() { - return Response::builder().status(404).body("".to_owned()).unwrap(); + return response_404; } - Response::new("OK".to_owned()) + let Ok(subscriptions) = rx.await else { + return response_404; + }; + + Response::new(serde_json::to_string(&subscriptions).unwrap()) } async fn subscriptions_with_filter( diff --git a/rumqttd/src/router/mod.rs b/rumqttd/src/router/mod.rs index a75cad34f..226ded6dc 100644 --- a/rumqttd/src/router/mod.rs +++ b/rumqttd/src/router/mod.rs @@ -5,6 +5,7 @@ use std::{ use bytes::Bytes; use serde::{Deserialize, Serialize}; +use tokio::sync::oneshot; use crate::{ protocol::{ @@ -401,13 +402,13 @@ pub enum Meter { Subscription(String, SubscriptionMeter), } -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum Print { Config, Router, ReadyQueue, Connection(String), - Subscriptions, + Subscriptions(oneshot::Sender>>), Subscription(Filter), Waiters(Filter), } diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index ebf362a71..1d58cc15f 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -1678,8 +1678,8 @@ fn print_status(router: &mut Router, metrics: Print) { println!("{metrics:#?}"); } - Print::Subscriptions => { - let metrics: HashMap> = router + Print::Subscriptions(tx) => { + let subscriptions: HashMap> = router .subscription_map .iter() .map(|(filter, connections)| { @@ -1692,7 +1692,10 @@ fn print_status(router: &mut Router, metrics: Print) { }) .collect(); - println!("{metrics:#?}"); + println!("{subscriptions:#?}"); + if tx.send(subscriptions).is_err() { + error!("Send failed"); + } } Print::Subscription(filter) => { let metrics = router.datalog.meter(&filter);