From d41f6e07e79e209bb04b0975631f4072cca1fb85 Mon Sep 17 00:00:00 2001 From: Thomas Wright Date: Mon, 6 Jan 2025 12:37:39 +0100 Subject: [PATCH] Improve feedback and error handling of MQTT input handler --- Cargo.toml | 2 +- src/mqtt_input_provider.rs | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 721ad13..2e77b2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ tokio-util = "0.7.12" clap = { version = "4.5.20", features = ["derive"] } async-stream = "0.3.6" r2r = { version = "0.9.3", optional=true } -thiserror = "2.0.3" +thiserror = "2.0.9" justerror = "1.1.0" serde = {version = "1.0.215", features = ["derive"]} serde_json = "1.0.133" diff --git a/src/mqtt_input_provider.rs b/src/mqtt_input_provider.rs index 4da3969..46cce5f 100644 --- a/src/mqtt_input_provider.rs +++ b/src/mqtt_input_provider.rs @@ -58,7 +58,7 @@ impl MQTTInputProvider { receivers.insert(v.clone(), rx); } - let topics = var_topics.values().map(|t| t.clone()).collect::>(); + let topics = var_topics.values().cloned().collect::>(); // Spawn a background task to receive messages from the MQTT broker and // send them to the appropriate channel based on which topic they were @@ -73,7 +73,7 @@ impl MQTTInputProvider { // Create and connect to the MQTT client let client = provide_mqtt_client(host.clone()).await.unwrap(); let mut stream = client.clone().get_stream(10); - // println!("Connected to MQTT broker"); + println!("Connected to MQTT broker with topics {:?}", topics); let qos = topics.iter().map(|_| QOS).collect::>(); loop { match client.subscribe_many(&topics, &qos).await { @@ -103,13 +103,14 @@ impl MQTTInputProvider { ) .as_str(), ); - let sender = senders - .get(&VarName(msg.topic().to_string())) - .expect("Channel not found for topic"); - sender - .send(value) - .await - .expect("Failed to send value to channel"); + if let Some(sender) = senders.get(&VarName(msg.topic().to_string())) { + sender + .send(value) + .await + .expect("Failed to send value to channel"); + } else { + println!("Channel not found for topic {:?}", msg.topic()); + } } None => { // Connection lost, try to reconnect