diff --git a/crates/fluvio-cli/src/client/produce/mod.rs b/crates/fluvio-cli/src/client/produce/mod.rs index 8107497a92..bf42c0c678 100644 --- a/crates/fluvio-cli/src/client/produce/mod.rs +++ b/crates/fluvio-cli/src/client/produce/mod.rs @@ -427,7 +427,9 @@ mod cmd { let produce_output = self.produce_line(&producer, &line).await?; if let Some(produce_output) = produce_output { - produce_outputs.push(produce_output); + if self.delivery_semantic != DeliverySemantic::AtMostOnce { + produce_outputs.push(produce_output); + } } #[cfg(feature = "stats")] @@ -477,13 +479,14 @@ mod cmd { eprint!("> "); } + let mut produce_outputs = vec![]; + while let Some(Ok(line)) = lines.next() { let produce_output = self.produce_line(producer, &line).await?; if let Some(produce_output) = produce_output { if self.delivery_semantic != DeliverySemantic::AtMostOnce { - // ensure it was properly sent - produce_output.wait().await?; + produce_outputs.push(produce_output); } } @@ -511,6 +514,17 @@ mod cmd { eprint!("> "); } } + if self.delivery_semantic != DeliverySemantic::AtMostOnce { + // ensure all records were properly sent + join_all( + produce_outputs + .into_iter() + .map(|produce_output| produce_output.wait()), + ) + .await + .into_iter() + .collect::, _>>()?; + } Ok(()) }