Skip to content

Commit

Permalink
fix: Enabled --file and --key-separator to be used together, fix …
Browse files Browse the repository at this point in the history
…`--key` handling when producing lines (#3092)

closes #3076

The issue also mentions it should be added to the CI. I'm looking into doing that.

DISCLAIMER: I didn't test this yet

Co-authored-by: Özgür Akkurt <[email protected]>
  • Loading branch information
ozgrakkurt and ozgrakkurt committed Mar 25, 2023
1 parent 23c353c commit 6263076
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 39 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -977,11 +977,15 @@ jobs:
- name: set DEV fluvio for as test CLI
if: matrix.cli_version == 'dev'
run: |
FLUVIO_CLI_RELEASE_CHANNEL=dev
echo "FLUVIO_CLI_RELEASE_CHANNEL=${FLUVIO_CLI_RELEASE_CHANNEL}" >> $GITHUB_ENV
FLUVIO_BIN=~/.fluvio/bin/fluvio-dev
echo "FLUVIO_BIN=${FLUVIO_BIN}" >> $GITHUB_ENV
- name: set stable fluvio for as test CLI
if: matrix.cli_version == 'stable'
run: |
FLUVIO_CLI_RELEASE_CHANNEL=stable
echo "FLUVIO_CLI_RELEASE_CHANNEL=${FLUVIO_CLI_RELEASE_CHANNEL}" >> $GITHUB_ENV
FLUVIO_BIN=~/.fluvio/bin/fluvio
echo "FLUVIO_BIN=${FLUVIO_BIN}" >> $GITHUB_ENV
Expand Down
87 changes: 48 additions & 39 deletions crates/fluvio-cli/src/client/produce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ mod cmd {

/// Sends key/value records split on the first instance of the separator.
#[cfg(feature = "producer-file-io")]
#[clap(long, value_parser = validate_key_separator, group = "RecordKey", conflicts_with = "TestFile")]
#[clap(long, value_parser = validate_key_separator, group = "RecordKey", conflicts_with = "raw")]
pub key_separator: Option<String>,
#[cfg(not(feature = "producer-file-io"))]
#[clap(long, value_parser = validate_key_separator, group = "RecordKey")]
Expand Down Expand Up @@ -249,44 +249,7 @@ mod cmd {

#[cfg(feature = "producer-file-io")]
if self.raw {
let key = self.key.clone().map(Bytes::from);
// Read all input and send as one record
let buffer = match &self.file {
Some(path) => UserInputRecords::try_from(UserInputType::File {
key: key.clone(),
path: path.to_path_buf(),
})
.unwrap_or_default(),

None => {
let mut buffer = Vec::new();
std::io::Read::read_to_end(&mut std::io::stdin(), &mut buffer)?;
UserInputRecords::try_from(UserInputType::Text {
key: key.clone(),
data: Bytes::from(buffer),
})
.unwrap_or_default()
}
};

let key = if let Some(key) = buffer.key() {
RecordKey::from(key)
} else {
RecordKey::NULL
};

let data: RecordData = buffer.into();

let produce_output = producer.send(key, data).await?;

if self.delivery_semantic != DeliverySemantic::AtMostOnce {
produce_output.wait().await?;
}

#[cfg(feature = "stats")]
if self.is_stats_collect() && self.is_print_live_stats() {
self.update_stats_bar(maybe_stats_bar.as_ref(), &producer, "");
}
self.process_raw_file(&producer).await?;
} else {
// Read input line-by-line and send as individual records
#[cfg(feature = "stats")]
Expand Down Expand Up @@ -321,6 +284,50 @@ mod cmd {
}

impl ProduceOpt {
#[cfg(feature = "producer-file-io")]
async fn process_raw_file(&self, producer: &TopicProducer) -> Result<()> {
let key = self.key.clone().map(Bytes::from);
// Read all input and send as one record
let buffer = match &self.file {
Some(path) => UserInputRecords::try_from(UserInputType::File {
key: key.clone(),
path: path.to_path_buf(),
})
.unwrap_or_default(),

None => {
let mut buffer = Vec::new();
std::io::Read::read_to_end(&mut std::io::stdin(), &mut buffer)?;
UserInputRecords::try_from(UserInputType::Text {
key: key.clone(),
data: Bytes::from(buffer),
})
.unwrap_or_default()
}
};

let key = if let Some(key) = buffer.key() {
RecordKey::from(key)
} else {
RecordKey::NULL
};

let data: RecordData = buffer.into();

let produce_output = producer.send(key, data).await?;

if self.delivery_semantic != DeliverySemantic::AtMostOnce {
produce_output.wait().await?;
}

#[cfg(feature = "stats")]
if self.is_stats_collect() && self.is_print_live_stats() {
self.update_stats_bar(maybe_stats_bar.as_ref(), &producer, "");
}

Ok(())
}

async fn produce_lines(
&self,
producer: Arc<TopicProducer>,
Expand Down Expand Up @@ -446,6 +453,8 @@ mod cmd {
let produce_output = if let Some(separator) = &self.key_separator {
self.produce_key_value(producer.clone(), line, separator)
.await?
} else if let Some(key) = &self.key {
Some(producer.send(RecordKey::from(key.as_bytes()), line).await?)
} else {
Some(producer.send(RecordKey::NULL, line).await?)
};
Expand Down
78 changes: 78 additions & 0 deletions tests/cli/fluvio_smoke_tests/e2e-file-with-separator.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#!/usr/bin/env bats

TEST_HELPER_DIR="$BATS_TEST_DIRNAME/../test_helper"
export TEST_HELPER_DIR

load "$TEST_HELPER_DIR"/tools_check.bash
load "$TEST_HELPER_DIR"/fluvio_dev.bash
load "$TEST_HELPER_DIR"/bats-support/load.bash
load "$TEST_HELPER_DIR"/bats-assert/load.bash

setup_file() {
TOPIC_NAME=$(random_string)
export TOPIC_NAME
debug_msg "Topic name: $TOPIC_NAME"

KEY1=$(random_string)
export KEY1
KEY2=$(random_string)
export KEY2
KEY3=$(random_string)
export KEY3
VAL1=$(random_string)
export VAL1
VAL2=$(random_string)
export VAL2
VAL3=$(random_string)
export VAL3

SEPARATOR='||'
export SEPARATOR

MULTI_LINE_FILE_CONTENTS=$KEY1$SEPARATOR$VAL1$'\n'$KEY2$SEPARATOR$VAL2$'\n'$KEY3$SEPARATOR$VAL3
export MULTI_LINE_FILE_CONTENTS

MULTI_LINE_FILE_NAME=$(random_string)
export MULTI_LINE_FILE_NAME

run bash -c 'echo "$MULTI_LINE_FILE_CONTENTS" > "$MULTI_LINE_FILE_NAME"'
}

teardown_file() {
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME"
run rm $MULTI_LINE_FILE_NAME
}

# Create topic
@test "Create a topic for file message with separator" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on stable version"
fi

debug_msg "topic: $TOPIC_NAME"
run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME"
assert_success
}

# Produce message
@test "Produce file message with separator" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on stable version"
fi

run bash -c 'timeout 15s "$FLUVIO_BIN" produce --file "$MULTI_LINE_FILE_NAME" --key-separator "$SEPARATOR" "$TOPIC_NAME"'
assert_success
}

# Consume message and compare message
# Warning: Adding anything extra to the `debug_msg` skews the message comparison
@test "Consume file message with separator" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on stable version"
fi

run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME" -B -d -F '{{key}}={{value}}'

assert_output $KEY1=$VAL1$'\n'$KEY2=$VAL2$'\n'$KEY3=$VAL3
assert_success
}

0 comments on commit 6263076

Please sign in to comment.