Skip to content

Commit 153432e

Browse files
committed
error handling improvement in super_stream send
1 parent 9daed84 commit 153432e

File tree

4 files changed

+60
-6
lines changed

4 files changed

+60
-6
lines changed

src/error.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,26 @@ pub enum ProducerPublishError {
110110
#[error(transparent)]
111111
Client(#[from] ClientError),
112112
}
113+
114+
#[derive(Error, Debug)]
115+
pub enum SuperStreamProducerPublishError {
116+
#[error("Failed to send message to stream")]
117+
ProducerPublishError(),
118+
#[error("Failed to create a producer")]
119+
ProducerCreateError(),
120+
}
121+
122+
impl From<ProducerPublishError> for SuperStreamProducerPublishError {
123+
fn from(_err: ProducerPublishError) -> Self {
124+
SuperStreamProducerPublishError::ProducerPublishError()
125+
}
126+
}
127+
impl From<ProducerCreateError> for SuperStreamProducerPublishError {
128+
fn from(_err: ProducerCreateError) -> Self {
129+
SuperStreamProducerPublishError::ProducerCreateError()
130+
}
131+
}
132+
113133
#[derive(Error, Debug)]
114134
pub enum ProducerCloseError {
115135
#[error("Failed to close producer for stream {stream} status {status:?}")]

src/superstream_consumer.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ struct SuperStreamConsumerInternal {
2323
closed: Arc<AtomicBool>,
2424
handlers: Vec<ConsumerHandle>,
2525
waker: AtomicWaker,
26-
//filter_configuration: Option<FilterConfiguration>,
2726
}
2827

2928
/// Builder for [`Consumer`]
@@ -76,7 +75,6 @@ impl SuperStreamConsumerBuilder {
7675
closed: Arc::new(AtomicBool::new(false)),
7776
handlers,
7877
waker: AtomicWaker::new(),
79-
//filter_configuration: self.filter_configuration.clone(),
8078
};
8179

8280
Ok(SuperStreamConsumer {

src/superstream_producer.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::error::ProducerCloseError;
22
use crate::{
33
client::Client,
44
environment::Environment,
5-
error::{ProducerCreateError, ProducerPublishError},
5+
error::{ProducerCreateError, ProducerPublishError, SuperStreamProducerPublishError},
66
producer::{ConfirmationStatus, NoDedup, Producer},
77
superstream::{DefaultSuperStreamMetadata, RoutingStrategy},
88
};
@@ -48,7 +48,7 @@ impl SuperStreamProducer<NoDedup> {
4848
+ Sync
4949
+ 'static
5050
+ Clone,
51-
) -> Result<(), ProducerPublishError>
51+
) -> Result<(), SuperStreamProducerPublishError>
5252
where
5353
Fut: Future<Output = ()> + Send + Sync + 'static,
5454
{
@@ -61,6 +61,10 @@ impl SuperStreamProducer<NoDedup> {
6161
}
6262
};
6363

64+
if routes.is_empty() {
65+
return Err(crate::error::SuperStreamProducerPublishError::ProducerCreateError());
66+
}
67+
6468
for route in routes.into_iter() {
6569
if !self.1.contains_key(route.as_str()) {
6670
let producer = self
@@ -69,8 +73,8 @@ impl SuperStreamProducer<NoDedup> {
6973
.producer()
7074
.filter_value_extractor_arc(self.0.filter_value_extractor.clone())
7175
.build(route.as_str())
72-
.await;
73-
self.1.insert(route.clone(), producer.unwrap());
76+
.await?;
77+
self.1.insert(route.clone(), producer);
7478
}
7579

7680
let producer = self.1.get(route.as_str()).unwrap();

tests/integration/producer_test.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,38 @@ async fn key_super_steam_producer_test() {
440440
_ = super_stream_producer.close();
441441
}
442442

443+
#[tokio::test(flavor = "multi_thread")]
444+
async fn key_super_steam_non_existing_producer_test() {
445+
let env = TestEnvironment::create_super_stream().await;
446+
447+
let mut super_stream_producer = env
448+
.env
449+
.super_stream_producer(RoutingStrategy::RoutingKeyStrategy(
450+
RoutingKeyRoutingStrategy {
451+
routing_extractor: &routing_key_strategy_value_extractor,
452+
},
453+
))
454+
.build("non-existing-stream")
455+
.await
456+
.unwrap();
457+
458+
let msg = Message::builder().body(format!("message{}", 0)).build();
459+
let result = super_stream_producer
460+
.send(msg, |_| async move {})
461+
.await
462+
.unwrap_err();
463+
464+
assert_eq!(
465+
matches!(
466+
result,
467+
rabbitmq_stream_client::error::SuperStreamProducerPublishError::ProducerCreateError()
468+
),
469+
true
470+
);
471+
472+
_ = super_stream_producer.close();
473+
}
474+
443475
#[tokio::test(flavor = "multi_thread")]
444476
async fn hash_super_steam_producer_test() {
445477
let env = TestEnvironment::create_super_stream().await;

0 commit comments

Comments
 (0)