Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manually listening to the system signal processing disconnect has bug #188

Open
xiaozefeng opened this issue Jan 15, 2023 · 1 comment
Open

Comments

@xiaozefeng
Copy link

xiaozefeng commented Jan 15, 2023

I want to manually listen to the linux signal in rust to handle the disconnection。
but has bug.

My guess is that there is a processing signal at the c, but this signal is intercepted by my upper layer, causing the c layer not to process it?

full code

use std::{env, process, thread, time::Duration};

use tokio::signal::unix::{signal, SignalKind};

extern crate paho_mqtt as mqtt;

const DFLT_BROKER: &str = "tcp://localhost:1883";
const DFLT_CLIENT: &str = "rust_subscribe";
const DFLT_TOPICS: &[&str] = &["rust/mqtt", "rust/test"];
// The qos list that match topics above.
const DFLT_QOS: &[i32] = &[0, 1];

// Reconnect to the broker when connection is lost.
async fn try_reconnect(cli: &mqtt::AsyncClient) -> bool {
    println!("Connection lost. Waiting to retry connection");
    for _ in 0..12 {
        thread::sleep(Duration::from_millis(5000));
        if cli.reconnect().await.is_ok() {
            println!("Successfully reconnected");
            return true;
        }
    }
    println!("Unable to reconnect after several attempts.");
    false
}

// Subscribes to multiple topics.
async fn subscribe_topics(cli: &mqtt::AsyncClient) {
    if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS).await {
        println!("Error subscribes topics: {:?}", e);
        process::exit(1);
    }
}

#[tokio::main]
async fn main() {
    let host = env::args()
        .nth(1)
        .unwrap_or_else(|| DFLT_BROKER.to_string());

    // Define the set of options for the create.
    // Use an ID for a persistent session.
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id(DFLT_CLIENT.to_string())
        .persistence(None)
        .finalize();

    // Create a client.
    let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|err| {
        println!("Error creating the client: {:?}", err);
        process::exit(1);
    });

    let cloned_cli = cli.clone();

    tokio::spawn(async move {
        // Initialize the consumer before connecting.
        let rx = cli.start_consuming();

        let lwt_props = mqtt::properties!(mqtt::PropertyCode::WillDelayInterval => 10);
        // Define the set of options for the connection.
        let lwt = mqtt::MessageBuilder::new()
            .topic("lost_connection_topic")
            .payload("{}")
            .properties(lwt_props)
            .qos(1)
            .finalize();
        let conn_opts = mqtt::ConnectOptionsBuilder::new()
            .keep_alive_interval(Duration::from_secs(20))
            .clean_session(false)
            .will_message(lwt)
            .finalize();

        // Connect and wait for it to complete or fail.
        if let Err(e) = cli.connect(conn_opts).await {
            println!("Unable to connect:\n\t{:?}", e);
            process::exit(1);
        }

        // Subscribe topics.
        subscribe_topics(&cli).await;

        println!("Processing requests...");
        for msg in rx.iter() {
            if let Some(msg) = msg {
                println!("{}", msg);
            } else if !cli.is_connected() {
                if try_reconnect(&cli).await {
                    println!("Resubscribe topics...");
                    subscribe_topics(&cli).await;
                } else {
                    break;
                }
            }
        }
    });
    let mut stream = signal(SignalKind::terminate()).unwrap();
    stream.recv().await;
    println!("gruceful shutdown")
}

start it

cargo run 

stop it

ctrl + c

At this point the process will get stuck。

my dependencies

[dependencies]
paho-mqtt = "0.11.1"

emqx server version: 5.0.13

@fpagliughi
Copy link
Contributor

I have no idea, but I’m not sure what you’re doing here.

I think you should just block on the spawned tokio task to await its completion. Or, for that matter, you appear to be running in an asynchronous tokio main function, so you don’t need a separate async task to run the MQTT connection?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants