Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer stops every <30 consumed messages #37

Open
kopiczko opened this issue Sep 23, 2016 · 2 comments
Open

Consumer stops every <30 consumed messages #37

kopiczko opened this issue Sep 23, 2016 · 2 comments

Comments

@kopiczko
Copy link

kopiczko commented Sep 23, 2016

I'm running this code on cloudamqp with configuration:
Cluster: cheerful-squirrel (change)
RabbitMQ 3.5.7, Erlang 18.2

The program is able to consume up to 30 messages and I see that the queue disappears in the RabbitMQ dashboard, which probably means the connection is lost. After that the consumer hangs. When I set auto_delete to false in queue_declare I observe that Ready messages count grows while consumer hangs.

Here's the code, it mostly follows the provided example:

    let mut session: Session = match Session::open_url(url) {
        Ok(s) => s,
        Err(e) => panic!("Session::open_url: {}", e),
    };
    let mut channel: Channel = match session.open_channel(1) {
        Ok(s) => s,
        Err(e) => panic!("Session.open_channel: {}", e),
    };

    channel.exchange_declare(exchange,
                          "fanout",
                          false, // passive
                          false, // durable
                          false, // auto_delete
                          false, // internal
                          false, // nowait - hangs when set to true
                          Table::new())
        .err()
        .map(|e| panic!("Channel.exchange_declare: {}", e));
    channel.queue_declare(queue,
                       false, // passive
                       false, // durable
                       true, // exclusive
                       true, // auto_delete
                       false, // nowait - hangs when set to true
                       Table::new())
        .err()
        .map(|e| panic!("Channel.queue_declare: {}", e));
    channel.queue_bind(queue,
                    exchange,
                    "",
                    false, // nowait - hangs when set to true
                    Table::new())
        .err()
        .map(|e| panic!("Channel.queue_bind: {}", e));

    let consumer = Consumer { cnt: 0 };
    let consumer_name = match channel.basic_consume(consumer,
                                                    queue,
                                                    "",
                                                    false, // no_local
                                                    false, // no_ack
                                                    true, // exlusive
                                                    false, // nowait - hangs when set to true
                                                    Table::new()) {
        Ok(s) => s,
        Err(e) => panic!("Channel.basic_consume: {}", e),
    };

    println!("{} Starting consumer: {}", date_str(), consumer_name);
    channel.start_consuming();

    channel.close(200, "Bye")
        .err()
        .map(|e| panic!("Channel.close: {}", e));
    session.close(200, "Good Bye");

Consumer:

struct Consumer {
    cnt: u64
}

impl amqp::Consumer for Consumer {
    fn handle_delivery(&mut self,
                       channel: &mut Channel,
                       deliver: protocol::basic::Deliver,
                       _: protocol::basic::BasicProperties,
                       body: Vec<u8>) {
        self.cnt += 1;
        let s = str::from_utf8(&body).unwrap();
        println!("{} Consumed #{}: {}", date_str(), self.cnt, s);
        channel.basic_ack(deliver.delivery_tag, false)
            .err()
            .map(|e| panic!("Consumer.handle_delivery basic_ack: {}", e));
    }
}
@kopiczko
Copy link
Author

I should have mentioned this is HA cluster.

@Antti
Copy link
Owner

Antti commented Sep 23, 2016

Can you run the consumer with the logger initialized: env_logger::init().unwrap(); and
RUST_LOG=debug. This will give you some idea of what's going on.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants