diff --git a/hivemq-edge/src/main/java/com/hivemq/bootstrap/ClientConnection.java b/hivemq-edge/src/main/java/com/hivemq/bootstrap/ClientConnection.java index 02c01049ce..0246edc904 100644 --- a/hivemq-edge/src/main/java/com/hivemq/bootstrap/ClientConnection.java +++ b/hivemq-edge/src/main/java/com/hivemq/bootstrap/ClientConnection.java @@ -15,11 +15,8 @@ */ package com.hivemq.bootstrap; -import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; import com.hivemq.configuration.service.entity.Listener; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import com.hivemq.extension.sdk.api.client.parameter.ClientInformation; import com.hivemq.extension.sdk.api.client.parameter.ConnectionInformation; import com.hivemq.extension.sdk.api.packets.auth.ModifiableDefaultPermissions; @@ -37,6 +34,8 @@ import com.hivemq.security.auth.SslClientCertificate; import io.netty.channel.Channel; import io.netty.util.AttributeKey; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -47,21 +46,23 @@ import java.util.Optional; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; public class ClientConnection { /** * The name of the {@link Channel} attribute which the client connection information is stored in. */ - public static final AttributeKey CHANNEL_ATTRIBUTE_NAME = + public static final @NotNull AttributeKey CHANNEL_ATTRIBUTE_NAME = AttributeKey.valueOf("Client.Connection"); private final @NotNull Channel channel; private final @NotNull PublishFlushHandler publishFlushHandler; - private volatile @NotNull ClientState clientState = ClientState.CONNECTING; + private final @NotNull FreePacketIdRanges messageIDPool; + private final @NotNull AtomicReference clientState; + private final @NotNull HashMap additionalInformation; private @Nullable ProtocolVersion protocolVersion; private @Nullable String clientId; - private boolean cleanStart; private @Nullable ModifiableDefaultPermissions authPermissions; private @Nullable Listener connectedListener; private @Nullable MqttWillPublish willPublish; @@ -72,22 +73,19 @@ public class ClientConnection { private @Nullable Long clientSessionExpiryInterval; private @Nullable Long connectReceivedTimestamp; private @Nullable Long maxPacketSizeSend; - private @Nullable String[] topicAliasMapping; + private @Nullable String @Nullable [] topicAliasMapping; + private @Nullable Boolean requestProblemInformation; + private @Nullable SettableFuture disconnectFuture; + private @Nullable ConnectionAttributes connectionAttributes; private boolean noSharedSubscription; private boolean clientIdAssigned; private boolean incomingPublishesSkipRest; private boolean incomingPublishesDefaultFailedSkipRest; private boolean requestResponseInformation; - private @Nullable Boolean requestProblemInformation; - private @Nullable SettableFuture disconnectFuture; - private final @NotNull FreePacketIdRanges messageIDPool; - - private @Nullable ConnectionAttributes connectionAttributes; - - private boolean sendWill = true; + private boolean cleanStart; + private boolean sendWill; private boolean preventLwt; private boolean inFlightMessagesSent; - private @Nullable SslClientCertificate authCertificate; private @Nullable String authSniHostname; private @Nullable String authCipherSuite; @@ -100,20 +98,24 @@ public class ClientConnection { private @Nullable Mqtt5UserProperties authUserProperties; private @Nullable ScheduledFuture authFuture; private @Nullable Boolean clearPasswordAfterAuth; - private @Nullable ClientContextImpl extensionClientContext; private @Nullable ClientEventListeners extensionClientEventListeners; private @Nullable ClientAuthenticators extensionClientAuthenticators; private @Nullable ClientAuthorizers extensionClientAuthorizers; private @Nullable ClientInformation extensionClientInformation; private @Nullable ConnectionInformation extensionConnectionInformation; - private @NotNull HashMap additionalInformation; public ClientConnection(final @NotNull Channel channel, final @NotNull PublishFlushHandler publishFlushHandler) { this.channel = channel; this.publishFlushHandler = publishFlushHandler; - messageIDPool = new FreePacketIdRanges(); + this.messageIDPool = new FreePacketIdRanges(); this.additionalInformation = new HashMap<>(); + this.clientState = new AtomicReference<>(ClientState.CONNECTING); + this.sendWill = true; + } + + public static @NotNull ClientConnection fromChannel(final @NotNull Channel channel) { + return channel.attr(ClientConnection.CHANNEL_ATTRIBUTE_NAME).get(); } public @NotNull Channel getChannel() { @@ -125,20 +127,11 @@ public ClientConnection(final @NotNull Channel channel, final @NotNull PublishFl } public @NotNull ClientState getClientState() { - return clientState; + return clientState.get(); } - public void proposeClientState(final @NotNull ClientState clientState) { - if (!this.clientState.disconnected()) { - this.clientState = clientState; - } - } - - // ONLY VISIBLE FOR TESTING !!! - // DO NOT USE IN PROD !!! - @VisibleForTesting() - public void setClientStateUnsafe(final @NotNull ClientState clientState) { - this.clientState = clientState; + public void proposeClientState(final @NotNull ClientState proposed) { + clientState.updateAndGet(current -> current.disconnected() ? current : proposed); } public @Nullable ProtocolVersion getProtocolVersion() { @@ -255,7 +248,7 @@ public int incrementInFlightCount() { return inFlightMessageCount.incrementAndGet(); } - public int incrementInFlightCount(int count) { + public int incrementInFlightCount(final int count) { if (inFlightMessageCount == null) { inFlightMessageCount = new AtomicInteger(0); } @@ -292,11 +285,11 @@ public void setMaxPacketSizeSend(final @Nullable Long maxPacketSizeSend) { this.maxPacketSizeSend = maxPacketSizeSend; } - public @Nullable String[] getTopicAliasMapping() { + public @Nullable String @Nullable [] getTopicAliasMapping() { return topicAliasMapping; } - public void setTopicAliasMapping(final @Nullable String[] topicAliasMapping) { + public void setTopicAliasMapping(final @Nullable String @Nullable [] topicAliasMapping) { this.topicAliasMapping = topicAliasMapping; } @@ -592,8 +585,8 @@ public void setClearPasswordAfterAuth(final @Nullable Boolean clearPasswordAfter return Optional.ofNullable(clearPasswordAfterAuth); } - public void clearPassword(){ - if(authPassword == null) { + public void clearPassword() { + if (authPassword == null) { return; } Arrays.fill(authPassword, (byte) 0); @@ -603,8 +596,4 @@ public void clearPassword(){ public @NotNull HashMap getAdditionalInformation() { return additionalInformation; } - - public static @NotNull ClientConnection fromChannel(Channel channel) { - return channel.attr(ClientConnection.CHANNEL_ATTRIBUTE_NAME).get(); - } } diff --git a/hivemq-edge/src/main/java/com/hivemq/bootstrap/ClientState.java b/hivemq-edge/src/main/java/com/hivemq/bootstrap/ClientState.java index 4291e4a4d0..b0978999d8 100644 --- a/hivemq-edge/src/main/java/com/hivemq/bootstrap/ClientState.java +++ b/hivemq-edge/src/main/java/com/hivemq/bootstrap/ClientState.java @@ -19,9 +19,6 @@ import java.util.EnumSet; -/** - * @author Abdullah Imal - */ public enum ClientState { CONNECTING, diff --git a/hivemq-edge/src/main/java/com/hivemq/codec/decoder/MQTTMessageDecoder.java b/hivemq-edge/src/main/java/com/hivemq/codec/decoder/MQTTMessageDecoder.java index e8ee579d9e..6fa8ecd6d9 100644 --- a/hivemq-edge/src/main/java/com/hivemq/codec/decoder/MQTTMessageDecoder.java +++ b/hivemq-edge/src/main/java/com/hivemq/codec/decoder/MQTTMessageDecoder.java @@ -20,8 +20,6 @@ import com.hivemq.codec.decoder.mqtt.MqttConnectDecoder; import com.hivemq.codec.decoder.mqtt.MqttDecoders; import com.hivemq.configuration.service.MqttConfigurationService; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import com.hivemq.metrics.handler.GlobalMQTTMessageCounter; import com.hivemq.mqtt.handler.connack.MqttConnacker; import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnector; @@ -35,27 +33,27 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.util.List; import static com.hivemq.mqtt.message.MessageType.CONNECT; -/** - * @author Dominik Obermaier - */ public class MQTTMessageDecoder extends ByteToMessageDecoder { private static final int MAX_REMAINING_LENGTH_MULTIPLIER = 0x80 * 0x80 * 0x80; private static final int NOT_ENOUGH_BYTES_READABLE = -2; private static final int MALFORMED_REMAINING_LENGTH = -1; private static final int MIN_FIXED_HEADER_LENGTH = 2; + private static final int MIN_CONNECT_VAR_HEADER_LENGTH = 7; private final @NotNull MqttConnectDecoder connectDecoder; private final @NotNull MqttConnacker mqttConnacker; - private final @NotNull MqttConfigurationService mqttConfig; private final @NotNull MqttDecoders mqttDecoders; private final @NotNull MqttServerDisconnector mqttServerDisconnector; private final @NotNull GlobalMQTTMessageCounter globalMQTTMessageCounter; + private final @NotNull MqttConfigurationService mqttConfig; public MQTTMessageDecoder( final @NotNull MqttConnectDecoder connectDecoder, @@ -66,10 +64,10 @@ public MQTTMessageDecoder( final @NotNull GlobalMQTTMessageCounter globalMQTTMessageCounter) { this.connectDecoder = connectDecoder; this.mqttConnacker = mqttConnacker; - this.mqttConfig = mqttConfig; this.mqttDecoders = mqttDecoders; this.mqttServerDisconnector = mqttServerDisconnector; this.globalMQTTMessageCounter = globalMQTTMessageCounter; + this.mqttConfig = mqttConfig; } public MQTTMessageDecoder(final ChannelDependencies channelDependencies) { @@ -81,9 +79,59 @@ public MQTTMessageDecoder(final ChannelDependencies channelDependencies) { channelDependencies.getGlobalMQTTMessageCounter()); } + private static int getFixedHeaderSize(final int remainingLength) { + // 2 = 1 byte fixed header + 1 byte first byte of remaining length + int remainingLengthSize = 2; + + if (remainingLength > 127) { + remainingLengthSize++; + } + if (remainingLength > 16383) { + remainingLengthSize++; + } + if (remainingLength > 2097151) { + remainingLengthSize++; + } + return remainingLengthSize; + } + + /** + * Calculates the remaining length according to the MQTT spec. + * + * @param buf the message buffer + * @return the remaining length, -1 if the remaining length is malformed or -2 if not enough bytes are available + */ + private static int calculateRemainingLength(final @NotNull ByteBuf buf) { + int remainingLength = 0; + int multiplier = 1; + byte encodedByte; + do { + if (multiplier > MAX_REMAINING_LENGTH_MULTIPLIER) { + buf.skipBytes(buf.readableBytes()); + //This means the remaining length is malformed! + return MALFORMED_REMAINING_LENGTH; + } + if (!buf.isReadable()) { + return NOT_ENOUGH_BYTES_READABLE; + } + + encodedByte = buf.readByte(); + remainingLength += ((encodedByte & (byte) 0x7f) * multiplier); + multiplier *= 0x80; + } while ((encodedByte & 0x80) != 0); + + return remainingLength; + } + + private static @NotNull MessageType getMessageType(final byte fixedHeader) { + return MessageType.valueOf((fixedHeader & 0b1111_0000) >> 4); + } + @Override protected void decode( - final @NotNull ChannelHandlerContext ctx, final @NotNull ByteBuf buf, final @NotNull List out) { + final @NotNull ChannelHandlerContext ctx, + final @NotNull ByteBuf buf, + final @NotNull List out) { final int readableBytes = buf.readableBytes(); if (readableBytes < MIN_FIXED_HEADER_LENGTH) { @@ -91,11 +139,8 @@ protected void decode( } buf.markReaderIndex(); - final byte fixedHeader = buf.readByte(); - final int remainingLength = calculateRemainingLength(buf); - if (remainingLength == NOT_ENOUGH_BYTES_READABLE) { buf.resetReaderIndex(); return; @@ -116,21 +161,28 @@ protected void decode( return; } + final int fixedHeaderSize = getFixedHeaderSize(remainingLength); + final int packetSize = fixedHeaderSize + remainingLength; + + final MessageType messageType = getMessageType(fixedHeader); + + //this is the message size HiveMQ allows for incoming messages + + if (packetSize > mqttConfig.maxPacketSize()) { + handlePacketTooLarge(buf, messageType, clientConnection); + return; + } + if (buf.readableBytes() < remainingLength) { buf.resetReaderIndex(); return; } - final int fixedHeaderSize = getFixedHeaderSize(remainingLength); - final int packetSize = fixedHeaderSize + remainingLength; - - final MessageType messageType = getMessageType(fixedHeader); final Message message; if (messageType == CONNECT) { message = handleConnect(buf, clientConnection, fixedHeader, packetSize, remainingLength); } else { - message = - handleMessage(buf, clientConnection, fixedHeader, messageType, packetSize, remainingLength); + message = handleMessage(buf, clientConnection, fixedHeader, messageType, packetSize, remainingLength); } if (message == null) { buf.clear(); @@ -140,6 +192,43 @@ protected void decode( out.add(message); } + private void handlePacketTooLarge( + final @NotNull ByteBuf buf, + final @NotNull MessageType messageType, + final @NotNull ClientConnection clientConnectionContext) { + //connack with PACKET_TOO_LARGE for Mqtt5 + if (messageType == MessageType.CONNECT) { + // Theoretically, remaining length could be too short to read the protocol version. + // But in this case we can never reach a "packet too large" as the minimum configurable max packet size is 15 + // while the broker needs only 7 bytes to read the version. Therefore, broker ignores this case and continues reading until + // it has at least 7 bytes of the variable CONNECT header to determine the protocol version, + // see https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901035:~:text=identify%20MQTT%20traffic.-,3.1.2.2%20Protocol%20Version,-Figure%203%E2%80%913 + if (buf.readableBytes() < MIN_CONNECT_VAR_HEADER_LENGTH) { + buf.resetReaderIndex(); + return; + } + connectDecoder.decodeProtocolVersion(clientConnectionContext, buf); + mqttConnacker.connackError(clientConnectionContext.getChannel(), + "A client (ID: {}, IP: {}) connect packet exceeded the maximum permissible size.", + "Sent CONNECT exceeded the maximum permissible size", + Mqtt5ConnAckReasonCode.PACKET_TOO_LARGE, + ReasonStrings.CONNACK_PACKET_TOO_LARGE); + } else { + final ProtocolVersion protocolVersion = clientConnectionContext.getProtocolVersion(); + //force channel close for Mqtt3.1, Mqtt3.1.1 and null (before connect) + final boolean forceClose = protocolVersion != ProtocolVersion.MQTTv5; + mqttServerDisconnector.disconnect(clientConnectionContext.getChannel(), + "A client (ID: {}, IP: {}) sent a message, that was bigger than the maximum message size. Disconnecting client.", + "Sent a message that was bigger than the maximum size", + Mqtt5DisconnectReasonCode.PACKET_TOO_LARGE, + ReasonStrings.DISCONNECT_PACKET_TOO_LARGE_MESSAGE, + Mqtt5UserProperties.NO_USER_PROPERTIES, + false, + forceClose); + } + buf.clear(); + } + private @Nullable Message handleConnect( final @NotNull ByteBuf buf, final @NotNull ClientConnection clientConnection, @@ -147,18 +236,6 @@ protected void decode( final int packetSize, final int remainingLength) { - //this is the message size HiveMQ allows for incoming messages - if (packetSize > mqttConfig.maxPacketSize()) { - connectDecoder.decodeProtocolVersion(clientConnection, buf); - mqttConnacker.connackError(clientConnection.getChannel(), - "A client (IP: {}) connect packet exceeded the maximum permissible size.", - "Sent CONNECT exceeded the maximum permissible size", - Mqtt5ConnAckReasonCode.PACKET_TOO_LARGE, - ReasonStrings.CONNACK_PACKET_TOO_LARGE); - - return null; - } - // Check if the client is already connected if (clientConnection.getProtocolVersion() != null) { mqttServerDisconnector.disconnect(clientConnection.getChannel(), @@ -194,22 +271,6 @@ protected void decode( final ProtocolVersion protocolVersion = clientConnection.getProtocolVersion(); - //this is the message size HiveMQ allows for incoming messages - if (packetSize > mqttConfig.maxPacketSize()) { - - //force channel close for Mqtt3.1, Mqtt3.1.1 and null (before connect) - final boolean forceClose = protocolVersion != ProtocolVersion.MQTTv5; - mqttServerDisconnector.disconnect(clientConnection.getChannel(), - "A client (IP: {}) sent a message, that was bigger than the maximum message size. Disconnecting client.", - "Sent a message that was bigger than the maximum size", - Mqtt5DisconnectReasonCode.PACKET_TOO_LARGE, - ReasonStrings.DISCONNECT_PACKET_TOO_LARGE_MESSAGE, - Mqtt5UserProperties.NO_USER_PROPERTIES, - false, - forceClose); - return null; - } - // Check if client is connected if (protocolVersion == null) { mqttServerDisconnector.logAndClose(clientConnection.getChannel(), @@ -228,113 +289,45 @@ protected void decode( final MqttDecoder decoder = mqttDecoders.decoder(messageType, protocolVersion); if (decoder != null) { return decoder.decode(clientConnection, messageBuffer, fixedHeader); - } else { - switch (messageType) { - case RESERVED_ZERO: - mqttServerDisconnector.disconnect(clientConnection.getChannel(), - "A client (IP: {}) sent a message with an invalid message type '0'. This message type is reserved. Disconnecting client.", - "Sent a message with message type '0'", - Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, - ReasonStrings.DISCONNECT_MESSAGE_TYPE_ZERO); - return null; - case CONNACK: - mqttServerDisconnector.disconnect(clientConnection.getChannel(), - "A client (IP: {}) sent a CONNACK message. This is invalid because clients are not allowed to send CONNACKs. Disconnecting client.", - "Sent a CONNACK message", - Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, - ReasonStrings.DISCONNECT_CONNACK_RECEIVED); - return null; - case SUBACK: - mqttServerDisconnector.disconnect(clientConnection.getChannel(), - "A client (IP: {}) sent a SUBACK message. This is invalid because clients are not allowed to send SUBACKs. Disconnecting client.", - "Sent a SUBACK message", - Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, - ReasonStrings.DISCONNECT_SUBACK_RECEIVED); - return null; - case UNSUBACK: - mqttServerDisconnector.disconnect(clientConnection.getChannel(), - "A client (IP: {}) sent a UNSUBACK message. This is invalid because clients are not allowed to send UNSUBACKs. Disconnecting client.", - "Sent a UNSUBACK message", - Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, - ReasonStrings.DISCONNECT_UNSUBACK_RECEIVED); - return null; - case PINGRESP: - mqttServerDisconnector.disconnect(clientConnection.getChannel(), - "A client (IP: {}) sent a PINGRESP message. This is invalid because clients are not allowed to send PINGRESPs. Disconnecting client.", - "Sent a PINGRESP message", - Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, - ReasonStrings.DISCONNECT_PINGRESP_RECEIVED); - return null; - case AUTH: - mqttServerDisconnector.disconnect(clientConnection.getChannel(), - "A client (IP: {}) sent a message with an invalid message type '15'. This message type is reserved. Disconnecting client.", - "Sent a message with message type '15'", - Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, - ReasonStrings.DISCONNECT_MESSAGE_TYPE_FIFTEEN); - return null; - default: - mqttServerDisconnector.disconnect(clientConnection.getChannel(), - "A client (IP: {}) connected but the message type could not get determined. Disconnecting client.", - "Sent a message with invalid message type", - Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, - ReasonStrings.DISCONNECT_MESSAGE_TYPE_INVALID); - return null; - } } - } - - private static int getFixedHeaderSize(final int remainingLength) { - - // 2 = 1 byte fixed header + 1 byte first byte of remaining length - int remainingLengthSize = 2; - if (remainingLength > 127) { - remainingLengthSize++; - } - if (remainingLength > 16383) { - remainingLengthSize++; - } - if (remainingLength > 2097151) { - remainingLengthSize++; + switch (messageType) { + case RESERVED_ZERO -> mqttServerDisconnector.disconnect(clientConnection.getChannel(), + "A client (IP: {}) sent a message with an invalid message type '0'. This message type is reserved. Disconnecting client.", + "Sent a message with message type '0'", + Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, + ReasonStrings.DISCONNECT_MESSAGE_TYPE_ZERO); + case CONNACK -> mqttServerDisconnector.disconnect(clientConnection.getChannel(), + "A client (IP: {}) sent a CONNACK message. This is invalid because clients are not allowed to send CONNACKs. Disconnecting client.", + "Sent a CONNACK message", + Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, + ReasonStrings.DISCONNECT_CONNACK_RECEIVED); + case SUBACK -> mqttServerDisconnector.disconnect(clientConnection.getChannel(), + "A client (IP: {}) sent a SUBACK message. This is invalid because clients are not allowed to send SUBACKs. Disconnecting client.", + "Sent a SUBACK message", + Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, + ReasonStrings.DISCONNECT_SUBACK_RECEIVED); + case UNSUBACK -> mqttServerDisconnector.disconnect(clientConnection.getChannel(), + "A client (IP: {}) sent a UNSUBACK message. This is invalid because clients are not allowed to send UNSUBACKs. Disconnecting client.", + "Sent a UNSUBACK message", + Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, + ReasonStrings.DISCONNECT_UNSUBACK_RECEIVED); + case PINGRESP -> mqttServerDisconnector.disconnect(clientConnection.getChannel(), + "A client (IP: {}) sent a PINGRESP message. This is invalid because clients are not allowed to send PINGRESPs. Disconnecting client.", + "Sent a PINGRESP message", + Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, + ReasonStrings.DISCONNECT_PINGRESP_RECEIVED); + case AUTH -> mqttServerDisconnector.disconnect(clientConnection.getChannel(), + "A client (IP: {}) sent a message with an invalid message type '15'. This message type is reserved. Disconnecting client.", + "Sent a message with message type '15'", + Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, + ReasonStrings.DISCONNECT_MESSAGE_TYPE_FIFTEEN); + default -> mqttServerDisconnector.disconnect(clientConnection.getChannel(), + "A client (IP: {}) connected but the message type could not get determined. Disconnecting client.", + "Sent a message with invalid message type", + Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, + ReasonStrings.DISCONNECT_MESSAGE_TYPE_INVALID); } - return remainingLengthSize; - } - - /** - * Calculates the remaining length according to the MQTT spec. - * - * @param buf the message buffer - * @return the remaining length, -1 if the remaining length is malformed or -2 if not enough bytes are available - */ - private static int calculateRemainingLength(final @NotNull ByteBuf buf) { - - int remainingLength = 0; - int multiplier = 1; - byte encodedByte; - - do { - if (multiplier > MAX_REMAINING_LENGTH_MULTIPLIER) { - buf.skipBytes(buf.readableBytes()); - //This means the remaining length is malformed! - return MALFORMED_REMAINING_LENGTH; - } - - if (!buf.isReadable()) { - return NOT_ENOUGH_BYTES_READABLE; - } - - encodedByte = buf.readByte(); - - remainingLength += ((encodedByte & (byte) 0x7f) * multiplier); - multiplier *= 0x80; - - } while ((encodedByte & 0x80) != 0); - - return remainingLength; - } - - - private static @NotNull MessageType getMessageType(final byte fixedHeader) { - return MessageType.valueOf((fixedHeader & 0b1111_0000) >> 4); + return null; } } diff --git a/hivemq-edge/src/main/java/com/hivemq/configuration/entity/mqtt/PacketsConfigEntity.java b/hivemq-edge/src/main/java/com/hivemq/configuration/entity/mqtt/PacketsConfigEntity.java index 67f91ceead..db0213d864 100644 --- a/hivemq-edge/src/main/java/com/hivemq/configuration/entity/mqtt/PacketsConfigEntity.java +++ b/hivemq-edge/src/main/java/com/hivemq/configuration/entity/mqtt/PacketsConfigEntity.java @@ -19,15 +19,10 @@ import jakarta.xml.bind.annotation.XmlAccessorType; import jakarta.xml.bind.annotation.XmlElement; import jakarta.xml.bind.annotation.XmlRootElement; - -import java.util.Objects; +import org.jetbrains.annotations.Nullable; import static com.hivemq.mqtt.message.connack.Mqtt5CONNACK.DEFAULT_MAXIMUM_PACKET_SIZE_NO_LIMIT; -/** - * @author Florian Limpöck - * @since 4.0.0 - */ @XmlRootElement(name = "packets") @XmlAccessorType(XmlAccessType.NONE) @SuppressWarnings({"FieldMayBeFinal", "FieldCanBeLocal"}) @@ -41,15 +36,15 @@ public int getMaxPacketSize() { } @Override - public boolean equals(final Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - final PacketsConfigEntity that = (PacketsConfigEntity) o; - return getMaxPacketSize() == that.getMaxPacketSize(); + public boolean equals(final @Nullable Object o) { + if (this == o) { + return true; + } + return o instanceof final PacketsConfigEntity that && maxPacketSize == that.maxPacketSize; } @Override public int hashCode() { - return Objects.hashCode(getMaxPacketSize()); + return Integer.hashCode(maxPacketSize); } } diff --git a/hivemq-edge/src/test/java/com/hivemq/bootstrap/netty/initializer/TlsTcpChannelInitializerTest.java b/hivemq-edge/src/test/java/com/hivemq/bootstrap/netty/initializer/TlsTcpChannelInitializerTest.java index e256b4a559..af55f9a045 100644 --- a/hivemq-edge/src/test/java/com/hivemq/bootstrap/netty/initializer/TlsTcpChannelInitializerTest.java +++ b/hivemq-edge/src/test/java/com/hivemq/bootstrap/netty/initializer/TlsTcpChannelInitializerTest.java @@ -25,6 +25,7 @@ import com.hivemq.logging.EventLog; import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnector; import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnectorImpl; +import com.hivemq.configuration.service.impl.MqttConfigurationServiceImpl; import com.hivemq.security.ssl.SslFactory; import io.netty.channel.Channel; import io.netty.channel.ChannelPipeline; @@ -100,6 +101,7 @@ public void before() throws Exception { when(socketChannel.attr(any(AttributeKey.class))).thenReturn(attribute); when(socketChannel.isActive()).thenReturn(true); when(channelDependencies.getConfigurationService()).thenReturn(fullConfigurationService); + when(fullConfigurationService.mqttConfiguration()).thenReturn(new MqttConfigurationServiceImpl()); when(channelDependencies.getRestrictionsConfigurationService()).thenReturn(restrictionsConfigurationService); when(restrictionsConfigurationService.incomingLimit()).thenReturn(0L); diff --git a/hivemq-edge/src/test/java/com/hivemq/bootstrap/netty/initializer/TlsWebsocketChannelInitializerTest.java b/hivemq-edge/src/test/java/com/hivemq/bootstrap/netty/initializer/TlsWebsocketChannelInitializerTest.java index c3751545bd..bdd573736b 100644 --- a/hivemq-edge/src/test/java/com/hivemq/bootstrap/netty/initializer/TlsWebsocketChannelInitializerTest.java +++ b/hivemq-edge/src/test/java/com/hivemq/bootstrap/netty/initializer/TlsWebsocketChannelInitializerTest.java @@ -25,6 +25,7 @@ import com.hivemq.logging.EventLog; import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnector; import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnectorImpl; +import com.hivemq.configuration.service.impl.MqttConfigurationServiceImpl; import com.hivemq.security.ssl.SslFactory; import io.netty.channel.Channel; import io.netty.channel.ChannelPipeline; @@ -109,6 +110,7 @@ public void before() { when(sslFactory.getSslContext(any(Tls.class))).thenReturn(sslContext); when(sslFactory.getSslHandler(any(SocketChannel.class), any(Tls.class), any(SslContext.class))).thenReturn(sslHandler); when(channelDependencies.getConfigurationService()).thenReturn(fullConfigurationService); + when(fullConfigurationService.mqttConfiguration()).thenReturn(new MqttConfigurationServiceImpl()); when(mockListener.getTls()).thenReturn(tls); when(channelDependencies.getConfigurationService()).thenReturn(fullConfigurationService); when(channelDependencies.getRestrictionsConfigurationService()).thenReturn(restrictionsConfigurationService); diff --git a/hivemq-edge/src/test/java/com/hivemq/codec/decoder/MQTTMessageDecoderTest.java b/hivemq-edge/src/test/java/com/hivemq/codec/decoder/MQTTMessageDecoderTest.java index a4cce2ad97..10ad17ab60 100644 --- a/hivemq-edge/src/test/java/com/hivemq/codec/decoder/MQTTMessageDecoderTest.java +++ b/hivemq-edge/src/test/java/com/hivemq/codec/decoder/MQTTMessageDecoderTest.java @@ -17,7 +17,6 @@ import com.hivemq.bootstrap.ClientConnection; import com.hivemq.configuration.service.ConfigurationService; -import org.jetbrains.annotations.NotNull; import com.hivemq.mqtt.message.ProtocolVersion; import com.hivemq.mqtt.message.connack.CONNACK; import com.hivemq.mqtt.message.reason.Mqtt5ConnAckReasonCode; @@ -25,13 +24,17 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; +import org.jetbrains.annotations.NotNull; import org.junit.Before; import org.junit.Test; -import org.mockito.MockitoAnnotations; import util.TestConfigurationBootstrap; import util.TestMqttDecoder; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class MQTTMessageDecoderTest { @@ -40,7 +43,6 @@ public class MQTTMessageDecoderTest { @Before public void setUp() throws Exception { - MockitoAnnotations.initMocks(this); channel = new EmbeddedChannel(TestMqttDecoder.create()); clientConnection = new ClientConnection(channel, null); //setting version to fake "connected" state @@ -159,8 +161,7 @@ public void decode_whenReceivesSecondCONNECT_thenConnectionIsClosed() { 0, // payload // client identifier - 0, 4, 't', 'e', 's', 't' - }; + 0, 4, 't', 'e', 's', 't'}; final ByteBuf buf = Unpooled.buffer(); buf.writeBytes(connect); @@ -253,12 +254,12 @@ public void decode_whenReceives31CONNECTTooLarge_thenConnectionIsClosedAndCONNAC // type, reserved 0b0001_0000, // remaining length - 17, + 19, // variable header // protocol name - 0, 6, 'M', 'Q', 'T', 'T', + 0, 6, 'M', 'Q', 'I', 's', 'd', 'p', // protocol version - 3, 1, + 4, // connect flags (byte) 0b0000_0000, // keep alive @@ -295,15 +296,212 @@ public void decode_whenReceivesMqtt31PUBLISHTooLarge_thenConnectionIsClosed() { testPublishPacketSizeTooLarge(ProtocolVersion.MQTTv3_1); } - private void testPublishPacketSizeTooLarge(final @NotNull ProtocolVersion protocolVersion) { + @Test + public void decode_whenReceivesPartialMqtt5CONNECTTooLarge_nextMessageIsReadThenConnectionIsClosedAndCONNACKIsReceived() { + final ConfigurationService fullConfig = new TestConfigurationBootstrap().getConfigurationService(); + fullConfig.mqttConfiguration().setMaxPacketSize(15); + channel = new EmbeddedChannel(TestMqttDecoder.create(fullConfig)); + clientConnection = new ClientConnection(channel, null); + clientConnection.setProtocolVersion(null); + channel.attr(ClientConnection.CHANNEL_ATTRIBUTE_NAME).set(clientConnection); + + final ByteBuf buf1 = Unpooled.buffer(); + final ByteBuf buf2 = Unpooled.buffer(); + final byte[] mqtt5ConnectPart1 = { + // fixed header + // type, reserved + 0b0001_0000, + // remaining length + 15}; + final byte[] mqtt5ConnectPart2 = { + // variable header + // protocol name + 0, 4, 'M', 'Q', 'T', 'T', + // protocol version + 5}; + buf1.writeBytes(mqtt5ConnectPart1); + buf2.writeBytes(mqtt5ConnectPart2); + channel.writeInbound(buf1); + CONNACK connack = channel.readOutbound(); + assertNull(connack); + + channel.writeInbound(buf2); + connack = channel.readOutbound(); + assertNotNull(connack); + + //verify that the client was disconnected and it received the proper CONNACK + assertFalse(channel.isOpen()); + assertEquals(Mqtt5ConnAckReasonCode.PACKET_TOO_LARGE, connack.getReasonCode()); + assertEquals(ReasonStrings.CONNACK_PACKET_TOO_LARGE, connack.getReasonString()); + } + + @Test + public void decode_whenReceivesPartialMqtt31CONNECTTooLarge_nextMessageIsReadThenConnectionIsClosedAndCONNACKIsReceived() { + final ConfigurationService fullConfig = new TestConfigurationBootstrap().getConfigurationService(); + fullConfig.mqttConfiguration().setMaxPacketSize(15); + channel = new EmbeddedChannel(TestMqttDecoder.create(fullConfig)); + clientConnection = new ClientConnection(channel, null); + clientConnection.setProtocolVersion(null); + channel.attr(ClientConnection.CHANNEL_ATTRIBUTE_NAME).set(clientConnection); + + final ByteBuf buf1 = Unpooled.buffer(); + final ByteBuf buf2 = Unpooled.buffer(); + final byte[] mqtt31ConnectPart1 = { + // fixed header + // type, reserved + 0b0001_0000, + // remaining length + 15}; + final byte[] mqtt31ConnectPart2 = { + // variable header + // protocol name + 0, 6, 'M', 'Q', 'I', 's', 'd', 'p', + // protocol version + 4}; + buf1.writeBytes(mqtt31ConnectPart1); + buf2.writeBytes(mqtt31ConnectPart2); + channel.writeInbound(buf1); + CONNACK connack = channel.readOutbound(); + assertNull(connack); + + channel.writeInbound(buf2); + connack = channel.readOutbound(); + assertNotNull(connack); + + //verify that the client was disconnected and it received the proper CONNACK + assertFalse(channel.isOpen()); + assertEquals(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED, connack.getReasonCode()); + } + + @Test + public void decode_whenReceivesPartialMqtt311CONNECTTooLarge_nextMessageIsReadThenConnectionIsClosedAndCONNACKIsReceived() { + final ConfigurationService fullConfig = new TestConfigurationBootstrap().getConfigurationService(); + fullConfig.mqttConfiguration().setMaxPacketSize(15); + channel = new EmbeddedChannel(TestMqttDecoder.create(fullConfig)); + clientConnection = new ClientConnection(channel, null); + clientConnection.setProtocolVersion(null); + channel.attr(ClientConnection.CHANNEL_ATTRIBUTE_NAME).set(clientConnection); + + final ByteBuf buf1 = Unpooled.buffer(); + final ByteBuf buf2 = Unpooled.buffer(); + final byte[] mqtt311ConnectPart1 = { + // fixed header + // type, reserved + 0b0001_0000, + // remaining length + 15}; + final byte[] mqtt311ConnectPart2 = { + // variable header + // protocol name + 0, 4, 'M', 'Q', 'T', 'T', + // protocol version + 4}; + buf1.writeBytes(mqtt311ConnectPart1); + buf2.writeBytes(mqtt311ConnectPart2); + channel.writeInbound(buf1); + CONNACK connack = channel.readOutbound(); + assertNull(connack); + + channel.writeInbound(buf2); + connack = channel.readOutbound(); + assertNotNull(connack); + + //verify that the client was disconnected and it received the proper CONNACK + assertFalse(channel.isOpen()); + assertEquals(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED, connack.getReasonCode()); + } + + @Test + public void decode_whenReceivesMinimumPUBLISHTooLarge_thenConnectionIsClosed_3_1() { + final ProtocolVersion protocolVersion = ProtocolVersion.MQTTv3_1; + + final ConfigurationService fullConfig = new TestConfigurationBootstrap().getConfigurationService(); + fullConfig.mqttConfiguration().setMaxPacketSize(15); + channel = new EmbeddedChannel(TestMqttDecoder.create(fullConfig)); + clientConnection = new ClientConnection(channel, null); + //setting version to fake "connected" state + clientConnection.setProtocolVersion(protocolVersion); + channel.attr(ClientConnection.CHANNEL_ATTRIBUTE_NAME).set(clientConnection); + + final byte[] publish = { + // fixed header + // type, flags + 0b0011_0000, + // remaining length + 22}; + + final ByteBuf buf = Unpooled.buffer(); + buf.writeBytes(publish); + channel.writeInbound(buf); + + //verify that the client was disconnected + assertFalse(channel.isOpen()); + } + + @Test + public void decode_whenReceivesMinimumPUBLISHTooLarge_thenConnectionIsClosed_3_1_1() { + final ProtocolVersion protocolVersion = ProtocolVersion.MQTTv3_1_1; + final ConfigurationService fullConfig = new TestConfigurationBootstrap().getConfigurationService(); - fullConfig.mqttConfiguration().setMaxPacketSize(10); + fullConfig.mqttConfiguration().setMaxPacketSize(15); channel = new EmbeddedChannel(TestMqttDecoder.create(fullConfig)); clientConnection = new ClientConnection(channel, null); //setting version to fake "connected" state clientConnection.setProtocolVersion(protocolVersion); channel.attr(ClientConnection.CHANNEL_ATTRIBUTE_NAME).set(clientConnection); + final byte[] publish = { + // fixed header + // type, flags + 0b0011_0000, + // remaining length + 22}; + + final ByteBuf buf = Unpooled.buffer(); + buf.writeBytes(publish); + channel.writeInbound(buf); + + //verify that the client was disconnected + assertFalse(channel.isOpen()); + } + + @Test + public void decode_whenReceivesMinimumPUBLISHTooLarge_thenConnectionIsClosed_5() { + final ProtocolVersion protocolVersion = ProtocolVersion.MQTTv5; + + final ConfigurationService fullConfig = new TestConfigurationBootstrap().getConfigurationService(); + fullConfig.mqttConfiguration().setMaxPacketSize(15); + channel = new EmbeddedChannel(TestMqttDecoder.create(fullConfig)); + clientConnection = new ClientConnection(channel, null); + //setting version to fake "connected" state + clientConnection.setProtocolVersion(protocolVersion); + channel.attr(ClientConnection.CHANNEL_ATTRIBUTE_NAME).set(clientConnection); + + final byte[] publish = { + // fixed header + // type, flags + 0b0011_0000, + // remaining length + 22}; + + final ByteBuf buf = Unpooled.buffer(); + buf.writeBytes(publish); + channel.writeInbound(buf); + + //verify that the client was disconnected + assertFalse(channel.isOpen()); + } + + private void testPublishPacketSizeTooLarge(final @NotNull ProtocolVersion protocolVersion) { + final ConfigurationService config = new TestConfigurationBootstrap().getConfigurationService(); + config.mqttConfiguration().setMaxPacketSize(10); + channel = new EmbeddedChannel(TestMqttDecoder.create(config)); + clientConnection = new ClientConnection(channel, null); + + //setting version to fake "connected" state + clientConnection.setProtocolVersion(protocolVersion); + channel.attr(ClientConnection.CHANNEL_ATTRIBUTE_NAME).set(clientConnection); + final byte[] publish = { // fixed header // type, flags @@ -316,9 +514,7 @@ private void testPublishPacketSizeTooLarge(final @NotNull ProtocolVersion protoc // properties 14, // user properties - 0x26, 0, 4, 't', 'e', 's', 't', 0, 5, 'v', 'a', 'l', 'u', 'e' - - }; + 0x26, 0, 4, 't', 'e', 's', 't', 0, 5, 'v', 'a', 'l', 'u', 'e'}; final ByteBuf buf = Unpooled.buffer(); buf.writeBytes(publish); @@ -328,10 +524,10 @@ private void testPublishPacketSizeTooLarge(final @NotNull ProtocolVersion protoc assertFalse(channel.isOpen()); } - private void testConnectPacketSizeTooLarge(byte[] connect) { - final ConfigurationService fullConfig = new TestConfigurationBootstrap().getConfigurationService(); - fullConfig.mqttConfiguration().setMaxPacketSize(10); - channel = new EmbeddedChannel(TestMqttDecoder.create(fullConfig)); + private void testConnectPacketSizeTooLarge(final byte @NotNull [] connect) { + final ConfigurationService config = new TestConfigurationBootstrap().getConfigurationService(); + config.mqttConfiguration().setMaxPacketSize(10); + channel = new EmbeddedChannel(TestMqttDecoder.create(config)); clientConnection = new ClientConnection(channel, null); clientConnection.setProtocolVersion(null); channel.attr(ClientConnection.CHANNEL_ATTRIBUTE_NAME).set(clientConnection); diff --git a/hivemq-edge/src/test/java/util/TestMqttDecoder.java b/hivemq-edge/src/test/java/util/TestMqttDecoder.java index a2d75aec84..52d6485c3d 100644 --- a/hivemq-edge/src/test/java/util/TestMqttDecoder.java +++ b/hivemq-edge/src/test/java/util/TestMqttDecoder.java @@ -17,14 +17,28 @@ import com.codahale.metrics.MetricRegistry; import com.hivemq.codec.decoder.MQTTMessageDecoder; +import com.hivemq.codec.decoder.MqttPingreqDecoder; import com.hivemq.codec.decoder.mqtt.MqttConnectDecoder; import com.hivemq.codec.decoder.mqtt.MqttDecoders; -import com.hivemq.codec.decoder.MqttPingreqDecoder; -import com.hivemq.codec.decoder.mqtt.mqtt3.*; -import com.hivemq.codec.decoder.mqtt.mqtt5.*; +import com.hivemq.codec.decoder.mqtt.mqtt3.Mqtt3DisconnectDecoder; +import com.hivemq.codec.decoder.mqtt.mqtt3.Mqtt3PubackDecoder; +import com.hivemq.codec.decoder.mqtt.mqtt3.Mqtt3PubcompDecoder; +import com.hivemq.codec.decoder.mqtt.mqtt3.Mqtt3PublishDecoder; +import com.hivemq.codec.decoder.mqtt.mqtt3.Mqtt3PubrecDecoder; +import com.hivemq.codec.decoder.mqtt.mqtt3.Mqtt3PubrelDecoder; +import com.hivemq.codec.decoder.mqtt.mqtt3.Mqtt3SubscribeDecoder; +import com.hivemq.codec.decoder.mqtt.mqtt3.Mqtt3UnsubscribeDecoder; +import com.hivemq.codec.decoder.mqtt.mqtt5.Mqtt5AuthDecoder; +import com.hivemq.codec.decoder.mqtt.mqtt5.Mqtt5DisconnectDecoder; +import com.hivemq.codec.decoder.mqtt.mqtt5.Mqtt5PubackDecoder; +import com.hivemq.codec.decoder.mqtt.mqtt5.Mqtt5PubcompDecoder; +import com.hivemq.codec.decoder.mqtt.mqtt5.Mqtt5PublishDecoder; +import com.hivemq.codec.decoder.mqtt.mqtt5.Mqtt5PubrecDecoder; +import com.hivemq.codec.decoder.mqtt.mqtt5.Mqtt5PubrelDecoder; +import com.hivemq.codec.decoder.mqtt.mqtt5.Mqtt5SubscribeDecoder; +import com.hivemq.codec.decoder.mqtt.mqtt5.Mqtt5UnsubscribeDecoder; import com.hivemq.configuration.HivemqId; import com.hivemq.configuration.service.ConfigurationService; -import org.jetbrains.annotations.NotNull; import com.hivemq.limitation.TopicAliasLimiterImpl; import com.hivemq.logging.EventLog; import com.hivemq.metrics.MetricsHolder; @@ -34,13 +48,11 @@ import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnector; import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnectorImpl; import com.hivemq.util.ClientIds; +import org.jetbrains.annotations.NotNull; import static com.hivemq.mqtt.message.connect.Mqtt5CONNECT.SESSION_EXPIRY_MAX; import static com.hivemq.mqtt.message.publish.PUBLISH.MESSAGE_EXPIRY_INTERVAL_MAX; -/** - * @author Christoph Schäbel - */ public class TestMqttDecoder { public static MQTTMessageDecoder create() { @@ -59,16 +71,13 @@ public static MQTTMessageDecoder create(final @NotNull ConfigurationService full final MqttConnacker mqttConnacker = new MqttConnackerImpl(eventLog); final MetricsHolder metricsHolder = new MetricsHolder(new MetricRegistry()); - final MqttConnectDecoder mqttConnectDecoder = new MqttConnectDecoder(mqttConnacker, - fullConfigurationService, - hiveMQId, - new ClientIds(hiveMQId)); + final MqttConnectDecoder mqttConnectDecoder = + new MqttConnectDecoder(mqttConnacker, fullConfigurationService, hiveMQId, new ClientIds(hiveMQId)); - return new MQTTMessageDecoder( - mqttConnectDecoder, - mqttConnacker, fullConfigurationService.mqttConfiguration(), - new MqttDecoders( - new Mqtt3PublishDecoder(hiveMQId, disconnector, fullConfigurationService), + return new MQTTMessageDecoder(mqttConnectDecoder, + mqttConnacker, + fullConfigurationService.mqttConfiguration(), + new MqttDecoders(new Mqtt3PublishDecoder(hiveMQId, disconnector, fullConfigurationService), new Mqtt3PubackDecoder(disconnector, fullConfigurationService), new Mqtt3PubrecDecoder(disconnector, fullConfigurationService), new Mqtt3PubcompDecoder(disconnector, fullConfigurationService), @@ -77,7 +86,10 @@ public static MQTTMessageDecoder create(final @NotNull ConfigurationService full new Mqtt3SubscribeDecoder(disconnector, fullConfigurationService), new Mqtt3UnsubscribeDecoder(disconnector, fullConfigurationService), new MqttPingreqDecoder(disconnector), - new Mqtt5PublishDecoder(disconnector, hiveMQId, fullConfigurationService, new TopicAliasLimiterImpl()), + new Mqtt5PublishDecoder(disconnector, + hiveMQId, + fullConfigurationService, + new TopicAliasLimiterImpl()), new Mqtt5DisconnectDecoder(disconnector, fullConfigurationService), new Mqtt5SubscribeDecoder(disconnector, fullConfigurationService), new Mqtt5PubackDecoder(disconnector, fullConfigurationService), @@ -85,8 +97,8 @@ public static MQTTMessageDecoder create(final @NotNull ConfigurationService full new Mqtt5PubrelDecoder(disconnector, fullConfigurationService), new Mqtt5PubcompDecoder(disconnector, fullConfigurationService), new Mqtt5AuthDecoder(disconnector, fullConfigurationService), - new Mqtt5UnsubscribeDecoder(disconnector, fullConfigurationService)) - , disconnector, + new Mqtt5UnsubscribeDecoder(disconnector, fullConfigurationService)), + disconnector, new GlobalMQTTMessageCounter(metricsHolder)); } }