Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mqtt v5 subscriber disconnect problem #195

Open
xiaozefeng opened this issue Mar 9, 2023 · 3 comments
Open

mqtt v5 subscriber disconnect problem #195

xiaozefeng opened this issue Mar 9, 2023 · 3 comments

Comments

@xiaozefeng
Copy link

I use paho-mqtt v5 to subscribe to a topic and after a period of almost 12 hours it disconnects

rust version :

rustup 1.25.2 (17db695f1 2023-02-01)
info: This is the version for the rustup toolchain manager, not the rustc compiler.
info: The currently active `rustc` version is `rustc 1.67.1 (d5a82bbd2 2023-02-07)`

the os:
No LSB modules are available.
Distributor ID: Ubuntu
Description: Ubuntu 20.04.5 LTS
Release: 20.04
Codename: focal

this is the Cargo.toml

tokio= {version = "1", features = ["full"]}
tracing="0.1.3"
tracing-subscriber = "0.3.16"
paho-mqtt = "0.12.0"
tokio-stream = "0.1.12"

this is the code :

use std::time::Duration;

use mqtt::MQTT_VERSION_5;
use paho_mqtt as mqtt;
use tokio_stream::StreamExt;
use tracing::info;
#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    let host = "mqtt://localhost:1883";
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id("paho-mqtt-client-v5")
        .finalize();
    let mut cli = mqtt::AsyncClient::new(create_opts).unwrap();
    let mut strm = cli.get_stream(25);

    let conn_opts = mqtt::ConnectOptionsBuilder::with_mqtt_version(MQTT_VERSION_5)
        .keep_alive_interval(Duration::from_secs(30))
        .clean_start(false)
        .properties(mqtt::properties![mqtt::PropertyCode::SessionExpiryInterval => 3600])
        .user_name("username")
        .password("password")
        .finalize();

    info!("connecting to mqtt server...");
    cli.connect(conn_opts).await.unwrap();

    let topic = "mining_updater_topic_test";
    info!("subscribing topic: {}", topic);

    cli.subscribe(topic, 1).await.unwrap();

    info!("waiting for receive message...");

    while let Some(msg_opt) = strm.next().await {
        if let Some(msg) = msg_opt {
            println!("{msg}");
        } else {
            info!("lost connection to mqtt server");
            while let Err(e) = cli.reconnect().await {
                println!("Error reconnect: {}", e);
                tokio::time::sleep(Duration::from_secs(1)).await;
            }
        }
    }
}
@fpagliughi
Copy link
Contributor

What server are you using? Is it public or private? If it's a public server, consider changing you Client ID to something unique. Someone else might be connecting with the same ID and knocking you off the server.

Is there a chance that the server is dropping you? Maybe monitor for a disconnect message from the server - perhaps using on_disconnect() or spying the network with Wireshark.

Is there any chance your board (computer) is powering down or going to sleep?

Does it reconnect?

@xiaozefeng
Copy link
Author

The server I am using is EMQX 5.0 and I am sure that my client id is unique.

I'm not running on my laptop, I'm running on a small machine with ubuntu installed.
And on the same machine, I can reconnect using the v3 protocol, but using the v5 protocol, I get disconnected and can't reconnect automatically.

os: also use ubuntu , is same the v5
No LSB modules are available.
Distributor ID: Ubuntu
Description: Ubuntu 20.04.5 LTS
Release: 20.04
Codename: focal

the v3 code:

use std::time::Duration;

use paho_mqtt as mqtt;
use tokio_stream::StreamExt;
use tracing::info;

#[tokio::main]
async fn main(){
    tracing_subscriber::fmt::init();

    let host = "mqtt://localhost:1883";
    let create_opts = mqtt::CreateOptionsBuilder::new_v3()
        .server_uri(host)
        .client_id("paho-mqtt-client-v3")
        .finalize();

    let mut cli =  mqtt::AsyncClient::new(create_opts).unwrap();
    let mut strm = cli.get_stream(25);
    let conn_opts = mqtt::ConnectOptionsBuilder::new_v3()
            .keep_alive_interval(Duration::from_secs(30))
            .clean_session(false)
            .finalize();
    info!("connecting to mqtt server...");
    cli.connect(conn_opts).await.unwrap();

    let topic = "mining_updater_topic_test";
    info!("subscribing topic: {}", topic);

    cli.subscribe(topic, 1).await.unwrap();

    info!("waiting for receive message...");

    while let Some(msg_opt) = strm.next().await {
        if let Some(msg) = msg_opt{
            println!("{msg}");
        }else{
            info!("lost connection to mqtt server");
            while let Err(e)= cli.reconnect().await{
                println!("Error reconnect: {}",e);
                tokio::time::sleep(Duration::from_secs(1)).await;
            }

        }
    }
}

@fpagliughi
Copy link
Contributor

OK. Thanks. I will try to reproduce it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants