diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ddf2e4d263..b5130a3db9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -977,11 +977,15 @@ jobs: - name: set DEV fluvio for as test CLI if: matrix.cli_version == 'dev' run: | + FLUVIO_CLI_RELEASE_CHANNEL=dev + echo "FLUVIO_CLI_RELEASE_CHANNEL=${FLUVIO_CLI_RELEASE_CHANNEL}" >> $GITHUB_ENV FLUVIO_BIN=~/.fluvio/bin/fluvio-dev echo "FLUVIO_BIN=${FLUVIO_BIN}" >> $GITHUB_ENV - name: set stable fluvio for as test CLI if: matrix.cli_version == 'stable' run: | + FLUVIO_CLI_RELEASE_CHANNEL=stable + echo "FLUVIO_CLI_RELEASE_CHANNEL=${FLUVIO_CLI_RELEASE_CHANNEL}" >> $GITHUB_ENV FLUVIO_BIN=~/.fluvio/bin/fluvio echo "FLUVIO_BIN=${FLUVIO_BIN}" >> $GITHUB_ENV diff --git a/crates/fluvio-cli/src/client/produce/mod.rs b/crates/fluvio-cli/src/client/produce/mod.rs index 9031146590..65ae5bf6d3 100644 --- a/crates/fluvio-cli/src/client/produce/mod.rs +++ b/crates/fluvio-cli/src/client/produce/mod.rs @@ -74,7 +74,7 @@ mod cmd { /// Sends key/value records split on the first instance of the separator. #[cfg(feature = "producer-file-io")] - #[clap(long, value_parser = validate_key_separator, group = "RecordKey", conflicts_with = "TestFile")] + #[clap(long, value_parser = validate_key_separator, group = "RecordKey", conflicts_with = "raw")] pub key_separator: Option, #[cfg(not(feature = "producer-file-io"))] #[clap(long, value_parser = validate_key_separator, group = "RecordKey")] @@ -249,44 +249,7 @@ mod cmd { #[cfg(feature = "producer-file-io")] if self.raw { - let key = self.key.clone().map(Bytes::from); - // Read all input and send as one record - let buffer = match &self.file { - Some(path) => UserInputRecords::try_from(UserInputType::File { - key: key.clone(), - path: path.to_path_buf(), - }) - .unwrap_or_default(), - - None => { - let mut buffer = Vec::new(); - std::io::Read::read_to_end(&mut std::io::stdin(), &mut buffer)?; - UserInputRecords::try_from(UserInputType::Text { - key: key.clone(), - data: Bytes::from(buffer), - }) - .unwrap_or_default() - } - }; - - let key = if let Some(key) = buffer.key() { - RecordKey::from(key) - } else { - RecordKey::NULL - }; - - let data: RecordData = buffer.into(); - - let produce_output = producer.send(key, data).await?; - - if self.delivery_semantic != DeliverySemantic::AtMostOnce { - produce_output.wait().await?; - } - - #[cfg(feature = "stats")] - if self.is_stats_collect() && self.is_print_live_stats() { - self.update_stats_bar(maybe_stats_bar.as_ref(), &producer, ""); - } + self.process_raw_file(&producer).await?; } else { // Read input line-by-line and send as individual records #[cfg(feature = "stats")] @@ -321,6 +284,50 @@ mod cmd { } impl ProduceOpt { + #[cfg(feature = "producer-file-io")] + async fn process_raw_file(&self, producer: &TopicProducer) -> Result<()> { + let key = self.key.clone().map(Bytes::from); + // Read all input and send as one record + let buffer = match &self.file { + Some(path) => UserInputRecords::try_from(UserInputType::File { + key: key.clone(), + path: path.to_path_buf(), + }) + .unwrap_or_default(), + + None => { + let mut buffer = Vec::new(); + std::io::Read::read_to_end(&mut std::io::stdin(), &mut buffer)?; + UserInputRecords::try_from(UserInputType::Text { + key: key.clone(), + data: Bytes::from(buffer), + }) + .unwrap_or_default() + } + }; + + let key = if let Some(key) = buffer.key() { + RecordKey::from(key) + } else { + RecordKey::NULL + }; + + let data: RecordData = buffer.into(); + + let produce_output = producer.send(key, data).await?; + + if self.delivery_semantic != DeliverySemantic::AtMostOnce { + produce_output.wait().await?; + } + + #[cfg(feature = "stats")] + if self.is_stats_collect() && self.is_print_live_stats() { + self.update_stats_bar(maybe_stats_bar.as_ref(), &producer, ""); + } + + Ok(()) + } + async fn produce_lines( &self, producer: Arc, @@ -446,6 +453,8 @@ mod cmd { let produce_output = if let Some(separator) = &self.key_separator { self.produce_key_value(producer.clone(), line, separator) .await? + } else if let Some(key) = &self.key { + Some(producer.send(RecordKey::from(key.as_bytes()), line).await?) } else { Some(producer.send(RecordKey::NULL, line).await?) }; diff --git a/tests/cli/fluvio_smoke_tests/e2e-file-with-separator.bats b/tests/cli/fluvio_smoke_tests/e2e-file-with-separator.bats new file mode 100644 index 0000000000..b10ab63499 --- /dev/null +++ b/tests/cli/fluvio_smoke_tests/e2e-file-with-separator.bats @@ -0,0 +1,78 @@ +#!/usr/bin/env bats + +TEST_HELPER_DIR="$BATS_TEST_DIRNAME/../test_helper" +export TEST_HELPER_DIR + +load "$TEST_HELPER_DIR"/tools_check.bash +load "$TEST_HELPER_DIR"/fluvio_dev.bash +load "$TEST_HELPER_DIR"/bats-support/load.bash +load "$TEST_HELPER_DIR"/bats-assert/load.bash + +setup_file() { + TOPIC_NAME=$(random_string) + export TOPIC_NAME + debug_msg "Topic name: $TOPIC_NAME" + + KEY1=$(random_string) + export KEY1 + KEY2=$(random_string) + export KEY2 + KEY3=$(random_string) + export KEY3 + VAL1=$(random_string) + export VAL1 + VAL2=$(random_string) + export VAL2 + VAL3=$(random_string) + export VAL3 + + SEPARATOR='||' + export SEPARATOR + + MULTI_LINE_FILE_CONTENTS=$KEY1$SEPARATOR$VAL1$'\n'$KEY2$SEPARATOR$VAL2$'\n'$KEY3$SEPARATOR$VAL3 + export MULTI_LINE_FILE_CONTENTS + + MULTI_LINE_FILE_NAME=$(random_string) + export MULTI_LINE_FILE_NAME + + run bash -c 'echo "$MULTI_LINE_FILE_CONTENTS" > "$MULTI_LINE_FILE_NAME"' +} + +teardown_file() { + run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME" + run rm $MULTI_LINE_FILE_NAME +} + +# Create topic +@test "Create a topic for file message with separator" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on stable version" + fi + + debug_msg "topic: $TOPIC_NAME" + run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME" + assert_success +} + +# Produce message +@test "Produce file message with separator" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on stable version" + fi + + run bash -c 'timeout 15s "$FLUVIO_BIN" produce --file "$MULTI_LINE_FILE_NAME" --key-separator "$SEPARATOR" "$TOPIC_NAME"' + assert_success +} + +# Consume message and compare message +# Warning: Adding anything extra to the `debug_msg` skews the message comparison +@test "Consume file message with separator" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on stable version" + fi + + run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME" -B -d -F '{{key}}={{value}}' + + assert_output $KEY1=$VAL1$'\n'$KEY2=$VAL2$'\n'$KEY3=$VAL3 + assert_success +}