diff --git a/Cargo.lock b/Cargo.lock index 7c860330..5261fec9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1057,6 +1057,7 @@ dependencies = [ "amq-protocol-codegen", "async-global-executor", "async-global-executor-trait", + "async-io", "async-reactor-trait", "async-trait", "executor-trait", diff --git a/Cargo.toml b/Cargo.toml index 79dbb247..a7b7e5f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ waker-fn = "^1.1" [dev-dependencies] async-global-executor = "^3.1" +async-io = "^2.0" futures-lite = "^2.0" serde_json = "^1.0" waker-fn = "^1.1" diff --git a/examples/c.rs b/examples/c.rs new file mode 100644 index 00000000..f9b0abfd --- /dev/null +++ b/examples/c.rs @@ -0,0 +1,94 @@ +use futures_lite::StreamExt; +use lapin::{Connection, ConnectionProperties, options::*, types::FieldTable}; +use tracing::info; + +fn main() { + if std::env::var("RUST_LOG").is_err() { + unsafe { std::env::set_var("RUST_LOG", "info") }; + } + + tracing_subscriber::fmt::init(); + + let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); + let recovery_config = lapin::RecoveryConfig::default().auto_recover_channels(); + + async_global_executor::block_on(async { + let conn = Connection::connect( + &addr, + ConnectionProperties::default().with_experimental_recovery_config(recovery_config), + ) + .await + .expect("connection error"); + + info!("CONNECTED"); + + //receive channel + let channel = conn.create_channel().await.expect("create_channel"); + info!(state=?conn.status().state()); + + let queue = channel + .queue_declare( + "hello-recover", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await + .expect("queue_declare"); + info!(state=?conn.status().state()); + info!(?queue, "Declared queue"); + + let ch = channel.clone(); + async_global_executor::spawn(async move { + loop { + async_io::Timer::after(std::time::Duration::from_secs(1)).await; + info!("Trigger failure"); + assert!( + ch.queue_declare( + "fake queue", + QueueDeclareOptions { + passive: true, + ..QueueDeclareOptions::default() + }, + FieldTable::default(), + ) + .await + .is_err() + ); + } + }) + .detach(); + + info!("will consume"); + let mut consumer = channel + .basic_consume( + "hello-recover", + "my_consumer", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await + .expect("basic_consume"); + info!(state=?conn.status().state()); + + let mut count = 0; + while let Some(delivery) = consumer.next().await { + info!(message=?delivery, "received message"); + if let Ok(delivery) = delivery { + let data = str::from_utf8(&delivery.data).expect("invalid utf8 data"); + println!( + "{}", + data + ); + delivery + .ack(BasicAckOptions::default()) + .await + .expect("basic_ack"); + count += 1; + if data == "STOP" { + println!("Received {} msgs", count); + break; + } + } + } + }) +} diff --git a/examples/p.rs b/examples/p.rs new file mode 100644 index 00000000..5ab41846 --- /dev/null +++ b/examples/p.rs @@ -0,0 +1,111 @@ +use lapin::{BasicProperties, Connection, ConnectionProperties, options::*, types::FieldTable}; +use std::sync::{Arc, atomic::{AtomicUsize, Ordering}}; +use tracing::info; + +fn main() { + if std::env::var("RUST_LOG").is_err() { + unsafe { std::env::set_var("RUST_LOG", "info"); } + } + + tracing_subscriber::fmt::init(); + + let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); + let recovery_config = lapin::RecoveryConfig::default().auto_recover_channels(); + + async_global_executor::block_on(async { + let conn = Connection::connect( + &addr, + ConnectionProperties::default().with_experimental_recovery_config(recovery_config), + ) + .await + .expect("connection error"); + + info!("CONNECTED"); + + let channel1 = conn.create_channel().await.expect("create_channel"); + channel1 + .confirm_select(ConfirmSelectOptions::default()) + .await + .expect("confirm_select"); + channel1 + .queue_declare( + "hello-recover", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await + .expect("queue_declare"); + + let count = Arc::new(AtomicUsize::new(0)); + let counter = count.clone(); + + let ch = channel1.clone(); + async_global_executor::spawn(async move { + loop { + async_io::Timer::after(std::time::Duration::from_secs(1)).await; + info!("Trigger failure"); + assert!( + ch.queue_declare( + "fake queue", + QueueDeclareOptions { + passive: true, + ..QueueDeclareOptions::default() + }, + FieldTable::default(), + ) + .await + .is_err() + ); + counter.fetch_add(1, Ordering::SeqCst); + } + }) + .detach(); + + let mut published = 0; + let mut errors = 0; + info!("will publish"); + loop { + let res = channel1 + .basic_publish( + "", + "hello-recover", + BasicPublishOptions::default(), + b"before", + BasicProperties::default(), + ) + .await; + let res = if let Ok(res) = res { + res.await.map(|_| ()) + } else { + res.map(|_| ()) + }; + match res { + Ok(()) => { + published += 1; + } + Err(err) => { + if !err.is_amqp_soft_error() { + panic!("{}", err); + } + errors += 1; + if let Some(notifier) = err.notifier() { + notifier.await + } + } + } + if count.load(Ordering::SeqCst) > 10 { + println!("Published {} with {} errors", published, errors); + channel1 + .basic_publish( + "", + "hello-recover", + BasicPublishOptions::default(), + b"STOP", + BasicProperties::default(), + ) + .await.unwrap(); + break; + } + } + }); +} diff --git a/examples/t.rs b/examples/t.rs new file mode 100644 index 00000000..74cdc6f6 --- /dev/null +++ b/examples/t.rs @@ -0,0 +1,122 @@ +use lapin::{ + BasicProperties, Connection, ConnectionProperties, message::DeliveryResult, options::*, + publisher_confirm::Confirmation, types::FieldTable, +}; +use tracing::info; + +fn main() { + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", "info"); + } + + tracing_subscriber::fmt::init(); + + let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); + let recovery_config = lapin::RecoveryConfig::default().auto_recover_channels(); + + async_global_executor::block_on(async { + let conn = Connection::connect( + &addr, + ConnectionProperties::default().with_experimental_recovery_config(recovery_config), + ) + .await + .expect("connection error"); + + info!("CONNECTED"); + + { + let channel1 = conn.create_channel().await.expect("create_channel"); + let channel2 = conn.create_channel().await.expect("create_channel"); + channel1 + .confirm_select(ConfirmSelectOptions::default()) + .await + .expect("confirm_select"); + channel1 + .queue_declare( + "recover-test", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await + .expect("queue_declare"); + + info!("will consume"); + let channel = channel2.clone(); + channel2 + .basic_consume( + "recover-test", + "my_consumer", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await + .expect("basic_consume") + .set_delegate(move |delivery: DeliveryResult| { + let channel = channel.clone(); + async move { + info!(message=?delivery, "received message"); + if let Ok(Some(delivery)) = delivery { + delivery + .ack(BasicAckOptions::default()) + .await + .expect("basic_ack"); + if &delivery.data[..] == b"after" { + channel + .basic_cancel("my_consumer", BasicCancelOptions::default()) + .await + .expect("basic_cancel"); + } + } + } + }); + + info!("will publish"); + let confirm = channel1 + .basic_publish( + "", + "recover-test", + BasicPublishOptions::default(), + b"before", + BasicProperties::default(), + ) + .await + .expect("basic_publish") + .await + .expect("publisher-confirms"); + assert_eq!(confirm, Confirmation::Ack(None)); + + info!("before fail"); + assert!( + channel1 + .queue_declare( + "fake queue", + QueueDeclareOptions { + passive: true, + ..QueueDeclareOptions::default() + }, + FieldTable::default(), + ) + .await + .is_err() + ); + info!("after fail"); + + info!("publish after"); + let confirm = channel1 + .basic_publish( + "", + "recover-test", + BasicPublishOptions::default(), + b"after", + BasicProperties::default(), + ) + .await + .expect("basic_publish") + .await + .expect("publisher-confirms"); + assert_eq!(confirm, Confirmation::Ack(None)); + } + + conn.run().expect("conn.run"); + }); +}