Skip to content

Commit

Permalink
[fix][cli] Fix Pulsar-Client CLI to print metadata of message includi…
Browse files Browse the repository at this point in the history
…ng encryption metadata (apache#23347)
  • Loading branch information
rdhabalia authored Sep 25, 2024
1 parent 31f27a0 commit b1c5d96
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
Expand All @@ -40,7 +41,9 @@
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
Expand Down Expand Up @@ -87,7 +90,8 @@ public void updateConfig(ClientBuilder clientBuilder, Authentication authenticat
* Whether to display BytesMessages in hexdump style, ignored for simple text messages
* @return String representation of the message
*/
protected String interpretMessage(Message<?> message, boolean displayHex) throws IOException {
protected String interpretMessage(Message<?> message, boolean displayHex, boolean printMetadata)
throws IOException {
StringBuilder sb = new StringBuilder();

String properties = Arrays.toString(message.getProperties().entrySet().toArray());
Expand Down Expand Up @@ -122,6 +126,45 @@ protected String interpretMessage(Message<?> message, boolean displayHex) throws
}
sb.append("content:").append(data);

if (printMetadata) {
if (message.getEncryptionCtx().isPresent()) {
EncryptionContext encContext = message.getEncryptionCtx().get();
if (encContext.getKeys() != null && !encContext.getKeys().isEmpty()) {
sb.append(", ");
sb.append("encryption-keys:").append(", ");
encContext.getKeys().forEach((keyName, keyInfo) -> {
String metadata = Arrays.toString(keyInfo.getMetadata().entrySet().toArray());
sb.append("name:").append(keyName).append(", ").append("key-value:")
.append(Base64.getEncoder().encode(keyInfo.getKeyValue())).append(", ")
.append("metadata:").append(metadata).append(", ");

});
sb.append(", ").append("param:").append(Base64.getEncoder().encode(encContext.getParam()))
.append(", ").append("algorithm:").append(encContext.getAlgorithm()).append(", ")
.append("compression-type:").append(encContext.getCompressionType()).append(", ")
.append("uncompressed-size").append(encContext.getUncompressedMessageSize()).append(", ")
.append("batch-size")
.append(encContext.getBatchSize().isPresent() ? encContext.getBatchSize().get() : 1);
}
}
if (message.hasBrokerPublishTime()) {
sb.append(", ").append("publish-time:").append(DateFormatter.format(message.getPublishTime()));
}
sb.append(", ").append("event-time:").append(DateFormatter.format(message.getEventTime()));
sb.append(", ").append("message-id:").append(message.getMessageId());
sb.append(", ").append("producer-name:").append(message.getProducerName());
sb.append(", ").append("sequence-id:").append(message.getSequenceId());
sb.append(", ").append("replicated-from:").append(message.getReplicatedFrom());
sb.append(", ").append("redelivery-count:").append(message.getRedeliveryCount());
sb.append(", ").append("ordering-key:")
.append(message.getOrderingKey() != null ? new String(message.getOrderingKey()) : "");
sb.append(", ").append("schema-version:")
.append(message.getSchemaVersion() != null ? new String(message.getSchemaVersion()) : "");
if (message.hasIndex()) {
sb.append(", ").append("index:").append(message.getIndex());
}
}

return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ public class CmdConsume extends AbstractCmdConsume {
@Option(names = { "-ca", "--crypto-failure-action" }, description = "Crypto Failure Action")
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;

@Option(names = { "-mp", "--print-metadata" }, description = "Message metadata")
private boolean printMetadata = false;

public CmdConsume() {
// Do nothing
super();
Expand Down Expand Up @@ -199,7 +202,7 @@ private int consume(String topic) {
numMessagesConsumed += 1;
if (!hideContent) {
System.out.println(MESSAGE_BOUNDARY);
String output = this.interpretMessage(msg, displayHex);
String output = this.interpretMessage(msg, displayHex, printMetadata);
System.out.println(output);
} else if (numMessagesConsumed % 1000 == 0) {
System.out.println("Received " + numMessagesConsumed + " messages");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ public class CmdRead extends AbstractCmdConsume {
@Option(names = { "-ca", "--crypto-failure-action" }, description = "Crypto Failure Action")
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;

@Option(names = { "-mp", "--print-metadata" }, description = "Message metadata")
private boolean printMetadata = false;

public CmdRead() {
// Do nothing
super();
Expand Down Expand Up @@ -178,7 +181,7 @@ private int read(String topic) {
numMessagesRead += 1;
if (!hideContent) {
System.out.println(MESSAGE_BOUNDARY);
String output = this.interpretMessage(msg, displayHex);
String output = this.interpretMessage(msg, displayHex, printMetadata);
System.out.println(output);
} else if (numMessagesRead % 1000 == 0) {
System.out.println("Received " + numMessagesRead + " messages");
Expand Down

0 comments on commit b1c5d96

Please sign in to comment.