Skip to content

Commit

Permalink
Add debug information for acknowledgment messages for qos 1 and 2
Browse files Browse the repository at this point in the history
  • Loading branch information
LukasBrand committed Aug 25, 2023
1 parent 2a26368 commit 4d3361d
Showing 8 changed files with 182 additions and 40 deletions.
15 changes: 12 additions & 3 deletions src/main/java/com/hivemq/cli/mqtt/AbstractMqttClientExecutor.java
Original file line number Diff line number Diff line change
@@ -267,10 +267,19 @@ public boolean isConnected(final @NotNull ClientKey key) {
}

private @NotNull Mqtt5Client connectMqtt5Client(
final @NotNull ConnectOptions connectOptions, final @Nullable SubscribeOptions subscribeOptions)
throws Exception {
final @NotNull ConnectOptions connectOptions,
final @Nullable SubscribeOptions subscribeOptions) throws Exception {
final MqttClientBuilder clientBuilder = createBuilder(connectOptions);
final Mqtt5Client client = clientBuilder.useMqttVersion5().build();
final Mqtt5Client client = clientBuilder.useMqttVersion5()
.advancedConfig()
.interceptors()
.incomingQos1Interceptor(new Mqtt5DebugIncomingQos1Interceptor())
.outgoingQos1Interceptor(new Mqtt5DebugOutgoingQos1Interceptor())
.incomingQos2Interceptor(new Mqtt5DebugIncomingQos2Interceptor())
.outgoingQos2Interceptor(new Mqtt5DebugOutgoingQos2Interceptor())
.applyInterceptors()
.applyAdvancedConfig()
.build();
final Mqtt5Publish willPublish = createMqtt5WillPublish(connectOptions.getWillOptions());
final Mqtt5ConnectRestrictions connectRestrictions =
createMqtt5ConnectRestrictions(connectOptions.getConnectRestrictionOptions());
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.hivemq.cli.mqtt;

import com.hivemq.cli.utils.LoggerUtils;
import com.hivemq.client.internal.mqtt.message.publish.puback.MqttPubAckBuilder;
import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientConfig;
import com.hivemq.client.mqtt.mqtt5.advanced.interceptor.qos1.Mqtt5IncomingQos1Interceptor;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.puback.Mqtt5PubAckBuilder;
import org.jetbrains.annotations.NotNull;
import org.tinylog.Logger;

public class Mqtt5DebugIncomingQos1Interceptor implements Mqtt5IncomingQos1Interceptor {

@Override
public void onPublish(
final @NotNull Mqtt5ClientConfig clientConfig,
final @NotNull Mqtt5Publish publish,
final @NotNull Mqtt5PubAckBuilder pubAckBuilder) {
final String clientPrefix = LoggerUtils.getClientPrefix(clientConfig);
Logger.debug("{} sending PUBACK\n {}", clientPrefix, ((MqttPubAckBuilder) pubAckBuilder).build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.hivemq.cli.mqtt;

import com.hivemq.cli.utils.LoggerUtils;
import com.hivemq.client.internal.mqtt.message.publish.pubcomp.MqttPubCompBuilder;
import com.hivemq.client.internal.mqtt.message.publish.pubrec.MqttPubRecBuilder;
import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientConfig;
import com.hivemq.client.mqtt.mqtt5.advanced.interceptor.qos2.Mqtt5IncomingQos2Interceptor;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.pubcomp.Mqtt5PubCompBuilder;
import com.hivemq.client.mqtt.mqtt5.message.publish.pubrec.Mqtt5PubRecBuilder;
import com.hivemq.client.mqtt.mqtt5.message.publish.pubrel.Mqtt5PubRel;
import org.jetbrains.annotations.NotNull;
import org.tinylog.Logger;

public class Mqtt5DebugIncomingQos2Interceptor implements Mqtt5IncomingQos2Interceptor {

@Override
public void onPublish(
@NotNull final Mqtt5ClientConfig clientConfig,
@NotNull final Mqtt5Publish publish,
@NotNull final Mqtt5PubRecBuilder pubRecBuilder) {
final String clientPrefix = LoggerUtils.getClientPrefix(clientConfig);
Logger.debug("{} sending PUBREC\n {}", clientPrefix, ((MqttPubRecBuilder) pubRecBuilder).build());
}

@Override
public void onPubRel(
@NotNull final Mqtt5ClientConfig clientConfig,
@NotNull final Mqtt5PubRel pubRel,
@NotNull final Mqtt5PubCompBuilder pubCompBuilder) {
final String clientPrefix = LoggerUtils.getClientPrefix(clientConfig);
Logger.debug("{} received PUBREL\n {}", clientPrefix, pubRel);
Logger.debug("{} sending PUBCOMP\n {}", clientPrefix, ((MqttPubCompBuilder) pubCompBuilder).build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.hivemq.cli.mqtt;

import com.hivemq.cli.utils.LoggerUtils;
import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientConfig;
import com.hivemq.client.mqtt.mqtt5.advanced.interceptor.qos1.Mqtt5OutgoingQos1Interceptor;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.puback.Mqtt5PubAck;
import org.jetbrains.annotations.NotNull;
import org.tinylog.Logger;

public class Mqtt5DebugOutgoingQos1Interceptor implements Mqtt5OutgoingQos1Interceptor {

@Override
public void onPubAck(
final @NotNull Mqtt5ClientConfig clientConfig,
final @NotNull Mqtt5Publish publish,
final @NotNull Mqtt5PubAck pubAck) {
final String clientPrefix = LoggerUtils.getClientPrefix(clientConfig);
Logger.debug("{} received PUBACK\n {}", clientPrefix, pubAck);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.hivemq.cli.mqtt;

import com.hivemq.cli.utils.LoggerUtils;
import com.hivemq.client.internal.mqtt.message.publish.pubrel.MqttPubRelBuilder;
import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientConfig;
import com.hivemq.client.mqtt.mqtt5.advanced.interceptor.qos2.Mqtt5OutgoingQos2Interceptor;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.pubcomp.Mqtt5PubComp;
import com.hivemq.client.mqtt.mqtt5.message.publish.pubrec.Mqtt5PubRec;
import com.hivemq.client.mqtt.mqtt5.message.publish.pubrel.Mqtt5PubRel;
import com.hivemq.client.mqtt.mqtt5.message.publish.pubrel.Mqtt5PubRelBuilder;
import org.jetbrains.annotations.NotNull;
import org.tinylog.Logger;

public class Mqtt5DebugOutgoingQos2Interceptor implements Mqtt5OutgoingQos2Interceptor {

@Override
public void onPubRec(
final @NotNull Mqtt5ClientConfig clientConfig,
final @NotNull Mqtt5Publish publish,
final @NotNull Mqtt5PubRec pubRec,
final @NotNull Mqtt5PubRelBuilder pubRelBuilder) {
final String clientPrefix = LoggerUtils.getClientPrefix(clientConfig);
Logger.debug("{} received PUBREC\n {}", clientPrefix, pubRec);
Logger.debug("{} sending PUBREL\n {}", clientPrefix, ((MqttPubRelBuilder) pubRelBuilder).build());
}

@Override
public void onPubRecError(
final @NotNull Mqtt5ClientConfig clientConfig,
final @NotNull Mqtt5Publish publish,
final @NotNull Mqtt5PubRec pubRec) {
final String clientPrefix = LoggerUtils.getClientPrefix(clientConfig);
Logger.debug("{} received PUBREC\n {}", clientPrefix, pubRec);
}

@Override
public void onPubComp(
final @NotNull Mqtt5ClientConfig clientConfig,
final @NotNull Mqtt5PubRel pubRel,
final @NotNull Mqtt5PubComp pubComp) {
final String clientPrefix = LoggerUtils.getClientPrefix(clientConfig);
Logger.debug("{} received PUBCOMP\n {}", clientPrefix, pubComp);
}
}
34 changes: 17 additions & 17 deletions src/main/java/com/hivemq/cli/mqtt/MqttClientExecutor.java
Original file line number Diff line number Diff line change
@@ -68,7 +68,7 @@ void mqtt5Connect(
final @NotNull Mqtt5Client client, final @NotNull Mqtt5Connect connectMessage) {
final String clientLogPrefix = LoggerUtils.getClientPrefix(client.getConfig());

Logger.debug("{} sending CONNECT {}", clientLogPrefix, connectMessage);
Logger.debug("{} sending CONNECT\n {}", clientLogPrefix, connectMessage);

final Mqtt5ConnAck connAck;
try {
@@ -78,14 +78,14 @@ void mqtt5Connect(
throw mqtt5ConnAckException;
}

Logger.debug("{} received CONNACK {} ", clientLogPrefix, connAck);
Logger.debug("{} received CONNACK\n {} ", clientLogPrefix, connAck);
}

void mqtt3Connect(
final @NotNull Mqtt3Client client, final @NotNull Mqtt3Connect connectMessage) {
final String clientLogPrefix = LoggerUtils.getClientPrefix(client.getConfig());

Logger.debug("{} sending CONNECT {}", clientLogPrefix, connectMessage);
Logger.debug("{} sending CONNECT\n {}", clientLogPrefix, connectMessage);

final Mqtt3ConnAck connAck;
try {
@@ -94,7 +94,7 @@ void mqtt3Connect(
Logger.debug(mqtt3ConnAckException.getMqttMessage());
throw mqtt3ConnAckException;
}
Logger.debug("{} received CONNACK {} ", clientLogPrefix, connAck);
Logger.debug("{} received CONNACK\n {} ", clientLogPrefix, connAck);
}

void mqtt5Subscribe(
@@ -112,7 +112,7 @@ void mqtt5Subscribe(

final Mqtt5Subscribe subscribeMessage = builder.build();

Logger.debug("{} sending SUBSCRIBE {}", clientLogPrefix, subscribeMessage);
Logger.debug("{} sending SUBSCRIBE\n {}", clientLogPrefix, subscribeMessage);

client.toAsync()
.subscribe(subscribeMessage, new SubscribeMqtt5PublishCallback(subscribeOptions, client))
@@ -129,7 +129,7 @@ void mqtt5Subscribe(
} else {
final ClientKey clientKey = ClientKey.of(client);
getClientDataMap().get(clientKey).addSubscription(MqttTopicFilter.of(topic));
Logger.debug("{} received SUBACK {}", clientLogPrefix, subAck);
Logger.debug("{} received SUBACK\n {}", clientLogPrefix, subAck);
}
})
.join();
@@ -144,7 +144,7 @@ void mqtt3Subscribe(
final Mqtt3SubscribeBuilder.Start.Complete builder = Mqtt3Subscribe.builder().topicFilter(topic).qos(qos);
final Mqtt3Subscribe subscribeMessage = builder.build();

Logger.debug("{} sending SUBSCRIBE {}", clientLogPrefix, subscribeMessage);
Logger.debug("{} sending SUBSCRIBE\n {}", clientLogPrefix, subscribeMessage);

client.toAsync()
.subscribe(subscribeMessage, new SubscribeMqtt3PublishCallback(subscribeOptions, client))
@@ -161,7 +161,7 @@ void mqtt3Subscribe(
} else {
getClientDataMap().get(ClientKey.of(client)).addSubscription(MqttTopicFilter.of(topic));

Logger.debug("{} received SUBACK {}", clientLogPrefix, subAck);
Logger.debug("{} received SUBACK\n {}", clientLogPrefix, subAck);
}
})
.join();
@@ -198,7 +198,7 @@ void mqtt5Publish(

final Mqtt5Publish publishMessage = publishBuilder.build();

Logger.debug("{} sending PUBLISH ('{}') {}",
Logger.debug("{} sending PUBLISH ('{}')\n {}",
clientLogPrefix,
bufferToString(publishOptions.getMessage()),
publishMessage);
@@ -216,7 +216,7 @@ void mqtt5Publish(
}
Logger.trace(throwable);
} else {
Logger.debug("{} received PUBLISH acknowledgement {}", clientLogPrefix, publishResult);
Logger.debug("{} finish PUBLISH\n {}", clientLogPrefix, publishResult);
}
}).join();
}
@@ -238,7 +238,7 @@ void mqtt3Publish(

final Mqtt3Publish publishMessage = publishBuilder.build();

Logger.debug("{} sending PUBLISH ('{}') {}",
Logger.debug("{} sending PUBLISH ('{}')\n {}",
clientLogPrefix,
bufferToString(publishOptions.getMessage()),
publishMessage);
@@ -251,7 +251,7 @@ void mqtt3Publish(
Throwables.getRootCause(throwable).getMessage());
Logger.trace(throwable);
} else {
Logger.debug("{} received PUBLISH acknowledgement {}", clientLogPrefix, publishResult);
Logger.debug("{} finish PUBLISH\n {}", clientLogPrefix, publishResult);
}
}).join();
}
@@ -266,7 +266,7 @@ void mqtt5Unsubscribe(final @NotNull Mqtt5Client client, final @NotNull Unsubscr
.userProperties(unsubscribeOptions.getUserProperties())
.build();

Logger.debug("{} sending UNSUBSCRIBE {}", clientLogPrefix, unsubscribeMessage);
Logger.debug("{} sending UNSUBSCRIBE\n {}", clientLogPrefix, unsubscribeMessage);

client.toAsync().unsubscribe(unsubscribeMessage).whenComplete((unsubAck, throwable) -> {
if (throwable != null) {
@@ -281,7 +281,7 @@ void mqtt5Unsubscribe(final @NotNull Mqtt5Client client, final @NotNull Unsubscr
} else {
getClientDataMap().get(ClientKey.of(client)).removeSubscription(MqttTopicFilter.of(topic));

Logger.debug("{} received UNSUBACK {}", clientLogPrefix, unsubAck);
Logger.debug("{} received UNSUBACK\n {}", clientLogPrefix, unsubAck);
}
}).join();
}
@@ -294,7 +294,7 @@ void mqtt3Unsubscribe(final @NotNull Mqtt3Client client, final @NotNull Unsubscr
for (final String topic : unsubscribeOptions.getTopics()) {
final Mqtt3Unsubscribe unsubscribeMessage = Mqtt3Unsubscribe.builder().topicFilter(topic).build();

Logger.debug("{} sending UNSUBSCRIBE {}", clientLogPrefix, unsubscribeMessage);
Logger.debug("{} sending UNSUBSCRIBE\n {}", clientLogPrefix, unsubscribeMessage);

client.toAsync().unsubscribe(unsubscribeMessage).whenComplete((unsubAck, throwable) -> {
if (throwable != null) {
@@ -305,7 +305,7 @@ void mqtt3Unsubscribe(final @NotNull Mqtt3Client client, final @NotNull Unsubscr
Logger.trace(throwable);
} else {
getClientDataMap().get(ClientKey.of(client)).removeSubscription(MqttTopicFilter.of(topic));
Logger.debug("{} received UNSUBACK", clientLogPrefix);
Logger.debug("{} received UNSUBACK\n {}", clientLogPrefix, unsubAck);
}
}).join();
}
@@ -331,7 +331,7 @@ void mqtt5Disconnect(final @NotNull Mqtt5Client client, final @NotNull Disconnec

final Mqtt5Disconnect disconnectMessage = disconnectBuilder.build();

Logger.debug("{} sending DISCONNECT {}", clientLogPrefix, disconnectMessage);
Logger.debug("{} sending DISCONNECT\n {}", clientLogPrefix, disconnectMessage);

client.toBlocking().disconnect(disconnectMessage);
}
Original file line number Diff line number Diff line change
@@ -50,9 +50,8 @@ public class SubscribeMqtt3PublishCallback implements Consumer<Mqtt3Publish> {

@Override
public void accept(final @NotNull Mqtt3Publish mqtt3Publish) {
String message;
try {
String message;

if (isJsonOutput) {
message = new JsonMqttPublish(mqtt3Publish, isBase64).toString();
} else {
@@ -63,19 +62,25 @@ public void accept(final @NotNull Mqtt3Publish mqtt3Publish) {
message = mqtt3Publish.getTopic() + ": " + message;
}

if (outputFile != null) {
MqttPublishUtils.printToFile(outputFile, message);
}
if (printToStdout) {
System.out.println(message);
}

Logger.debug("{} received PUBLISH ('{}') {}",
Logger.debug("{} received PUBLISH ('{}')\n {}",
LoggerUtils.getClientPrefix(client.getConfig()),
new String(mqtt3Publish.getPayloadAsBytes(), StandardCharsets.UTF_8),
mqtt3Publish);
} catch (final Exception e) {
Logger.error("An error occurred while processing an incoming PUBLISH.", e);
return;
}

if (outputFile != null) {
MqttPublishUtils.printToFile(outputFile, message);
}

if (printToStdout) {
if (System.out.checkError()) {
//TODO: Handle SIGPIPE
//throw new RuntimeException("SIGNAL RECEIVED. PIPE CLOSED");
}
System.out.println(message);
}
}
}
Original file line number Diff line number Diff line change
@@ -50,9 +50,8 @@ public class SubscribeMqtt5PublishCallback implements Consumer<Mqtt5Publish> {

@Override
public void accept(final @NotNull Mqtt5Publish mqtt5Publish) {
String message;
try {
String message;

if (isJsonOutput) {
message = new JsonMqttPublish(mqtt5Publish, isBase64).toString();
} else {
@@ -63,19 +62,25 @@ public void accept(final @NotNull Mqtt5Publish mqtt5Publish) {
message = mqtt5Publish.getTopic() + ": " + message;
}

if (outputFile != null) {
MqttPublishUtils.printToFile(outputFile, message);
}
if (printToStdout) {
System.out.println(message);
}

Logger.debug("{} received PUBLISH ('{}') {}",
Logger.debug("{} received PUBLISH ('{}')\n {}",
LoggerUtils.getClientPrefix(client.getConfig()),
new String(mqtt5Publish.getPayloadAsBytes(), StandardCharsets.UTF_8),
mqtt5Publish);
} catch (final Exception e) {
Logger.error("An error occurred while processing an incoming PUBLISH.", e);
return;
}

if (outputFile != null) {
MqttPublishUtils.printToFile(outputFile, message);
}

if (printToStdout) {
if (System.out.checkError()) {
//TODO: Handle SIGPIPE
//throw new RuntimeException("SIGNAL RECEIVED. PIPE CLOSED");
}
System.out.println(message);
}
}
}

0 comments on commit 4d3361d

Please sign in to comment.