diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java index 71c172b633713..46adc45e2ea4d 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java @@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; @@ -111,6 +112,9 @@ public class CmdConsume extends AbstractCmdConsume { @Option(names = {"-rs", "--replicated" }, description = "Whether the subscription status should be replicated") private boolean replicateSubscriptionState = false; + @Option(names = { "-ca", "--crypto-failure-action" }, description = "Crypto Failure Action") + private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL; + public CmdConsume() { // Do nothing super(); @@ -174,6 +178,7 @@ private int consume(String topic) { } builder.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull); + builder.cryptoFailureAction(cryptoFailureAction); if (isNotBlank(this.encKeyValue)) { builder.defaultCryptoKeyReader(this.encKeyValue); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java index daab436499219..51bf2d6898b6b 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java @@ -30,6 +30,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; @@ -101,6 +102,9 @@ public class CmdRead extends AbstractCmdConsume { @Option(names = { "-pm", "--pool-messages" }, description = "Use the pooled message", arity = "1") private boolean poolMessages = true; + @Option(names = { "-ca", "--crypto-failure-action" }, description = "Crypto Failure Action") + private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL; + public CmdRead() { // Do nothing super(); @@ -153,6 +157,7 @@ private int read(String topic) { } builder.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull); + builder.cryptoFailureAction(cryptoFailureAction); if (isNotBlank(this.encKeyValue)) { builder.defaultCryptoKeyReader(this.encKeyValue);