From 7c701b994b8d3f4ff1795e3cbd712bc0ce18bf80 Mon Sep 17 00:00:00 2001 From: dlandiak Date: Mon, 18 Nov 2024 17:35:24 +0200 Subject: [PATCH] PublishMsg rename qosLevel to qos --- .../client/service/handlers/MqttPublishHandler.java | 10 +++++----- .../mqtt/broker/adaptor/NettyMqttConverter.java | 6 +++--- .../mqtt/broker/adaptor/ProtoConverter.java | 4 ++-- .../mqtt/broker/server/MqttSessionHandler.java | 2 +- .../broker/service/mqtt/DefaultMqttMessageCreator.java | 2 +- .../service/mqtt/DefaultPublishMsgDeliveryService.java | 2 +- .../mqtt/broker/service/mqtt/PublishMsg.java | 6 +++--- .../processing/ApplicationPackProcessingCtx.java | 2 +- .../service/mqtt/retain/RetainedMsgProcessorImpl.java | 2 +- .../service/processing/MsgDispatcherServiceImpl.java | 4 ++-- .../mqtt/broker/adaptor/NettyMqttConverterTest.java | 4 ++-- .../mqtt/broker/adaptor/ProtoConverterTest.java | 4 ++-- .../ApplicationPersistedMsgCtxServiceImplTest.java | 2 +- 13 files changed, 25 insertions(+), 25 deletions(-) diff --git a/application/src/main/java/org/thingsboard/mqtt/broker/actors/client/service/handlers/MqttPublishHandler.java b/application/src/main/java/org/thingsboard/mqtt/broker/actors/client/service/handlers/MqttPublishHandler.java index cc686b4d3..031cec699 100644 --- a/application/src/main/java/org/thingsboard/mqtt/broker/actors/client/service/handlers/MqttPublishHandler.java +++ b/application/src/main/java/org/thingsboard/mqtt/broker/actors/client/service/handlers/MqttPublishHandler.java @@ -112,10 +112,10 @@ public void process(ClientSessionCtx ctx, MqttPublishMsg msg, TbActorRef actorRe MqttMsgWrapper mqttMsgWrapper = null; // for QoS 0 try { - if (MqttQoS.EXACTLY_ONCE.value() == publishMsg.getQosLevel()) { + if (MqttQoS.EXACTLY_ONCE.value() == publishMsg.getQos()) { mqttMsgWrapper = processExactlyOnce(ctx, msgId); if (ensureMsgPersistedAwaitingPubRel(ctx, actorRef, mqttMsgWrapper)) return; - } else if (MqttQoS.AT_LEAST_ONCE.value() == publishMsg.getQosLevel()) { + } else if (MqttQoS.AT_LEAST_ONCE.value() == publishMsg.getQos()) { mqttMsgWrapper = processAtLeastOnce(ctx, msgId); } } catch (FullMsgQueueException e) { @@ -180,9 +180,9 @@ private void handleMsgPersistenceFailure(ClientSessionCtx ctx, PublishMsg publis private void handleMqtt5ErrorResponse(ClientSessionCtx ctx, PublishMsg publishMsg, MqttReasonCodes.PubRec pubRecCode, MqttReasonCodes.PubAck pubAckCode) { - if (publishMsg.getQosLevel() == 2) { + if (publishMsg.getQos() == 2) { pushPubRecErrorResponseWithReasonCode(ctx, publishMsg, pubRecCode); - } else if (publishMsg.getQosLevel() == 1) { + } else if (publishMsg.getQos() == 1) { pushPubAckErrorResponseWithReasonCode(ctx, publishMsg, pubAckCode); } else { // QoS=0 - do nothing @@ -218,7 +218,7 @@ public void onSuccess(TbQueueMsgMetadata metadata) { if (isTraceEnabled) { log.trace("[{}][{}] Successfully acknowledged msg: {}", ctx.getClientId(), ctx.getSessionId(), publishMsg.getPacketId()); } - sendPubResponseEventToActor(actorRef, ctx.getSessionId(), mqttMsgWrapper, MqttQoS.valueOf(publishMsg.getQosLevel())); + sendPubResponseEventToActor(actorRef, ctx.getSessionId(), mqttMsgWrapper, MqttQoS.valueOf(publishMsg.getQos())); }); } diff --git a/application/src/main/java/org/thingsboard/mqtt/broker/adaptor/NettyMqttConverter.java b/application/src/main/java/org/thingsboard/mqtt/broker/adaptor/NettyMqttConverter.java index 3a1472996..40cd87923 100644 --- a/application/src/main/java/org/thingsboard/mqtt/broker/adaptor/NettyMqttConverter.java +++ b/application/src/main/java/org/thingsboard/mqtt/broker/adaptor/NettyMqttConverter.java @@ -39,9 +39,9 @@ import org.thingsboard.mqtt.broker.actors.client.messages.mqtt.MqttPublishMsg; import org.thingsboard.mqtt.broker.actors.client.messages.mqtt.MqttSubscribeMsg; import org.thingsboard.mqtt.broker.actors.client.messages.mqtt.MqttUnsubscribeMsg; +import org.thingsboard.mqtt.broker.common.data.BrokerConstants; import org.thingsboard.mqtt.broker.common.data.subscription.SubscriptionOptions; import org.thingsboard.mqtt.broker.common.data.subscription.TopicSubscription; -import org.thingsboard.mqtt.broker.common.data.BrokerConstants; import org.thingsboard.mqtt.broker.service.mqtt.PublishMsg; import org.thingsboard.mqtt.broker.session.ClientSessionCtx; import org.thingsboard.mqtt.broker.session.DisconnectReason; @@ -185,7 +185,7 @@ private static PublishMsg extractPublishMsg(MqttPublishMessage mqttPublishMessag return PublishMsg.builder() .packetId(mqttPublishMessage.variableHeader().packetId()) .topicName(mqttPublishMessage.variableHeader().topicName()) - .qosLevel(mqttPublishMessage.fixedHeader().qosLevel().value()) + .qos(mqttPublishMessage.fixedHeader().qosLevel().value()) .isRetained(mqttPublishMessage.fixedHeader().isRetain()) .isDup(mqttPublishMessage.fixedHeader().isDup()) .byteBuf(byteBuf) @@ -200,7 +200,7 @@ private static PublishMsg extractLastWillPublishMsg(MqttConnectMessage msg) { .payload(msg.payload().willMessageInBytes()) .isRetained(msg.variableHeader().isWillRetain()) .isDup(false) - .qosLevel(msg.variableHeader().willQos()) + .qos(msg.variableHeader().willQos()) .properties(msg.payload().willProperties()) .build(); } diff --git a/application/src/main/java/org/thingsboard/mqtt/broker/adaptor/ProtoConverter.java b/application/src/main/java/org/thingsboard/mqtt/broker/adaptor/ProtoConverter.java index afea21d36..79832b311 100644 --- a/application/src/main/java/org/thingsboard/mqtt/broker/adaptor/ProtoConverter.java +++ b/application/src/main/java/org/thingsboard/mqtt/broker/adaptor/ProtoConverter.java @@ -60,7 +60,7 @@ public static QueueProtos.PublishMsgProto convertToPublishMsgProto(SessionInfo s QueueProtos.PublishMsgProto.Builder builder = QueueProtos.PublishMsgProto.newBuilder() .setPacketId(publishMsg.getPacketId()) .setTopicName(publishMsg.getTopicName()) - .setQos(publishMsg.getQosLevel()) + .setQos(publishMsg.getQos()) .setRetain(publishMsg.isRetained()) .addAllUserProperties(toUserPropertyProtos(userProperties)) .setClientId(sessionInfo.getClientInfo().getClientId()); @@ -155,7 +155,7 @@ public static PublishMsg convertToPublishMsg(QueueProtos.PublishMsgProto msg, in .payload(msg.getPayload().toByteArray()) .properties(properties) .packetId(packetId) - .qosLevel(qos) + .qos(qos) .isDup(isDup) .build(); } diff --git a/application/src/main/java/org/thingsboard/mqtt/broker/server/MqttSessionHandler.java b/application/src/main/java/org/thingsboard/mqtt/broker/server/MqttSessionHandler.java index 3dcaa1c19..891824251 100644 --- a/application/src/main/java/org/thingsboard/mqtt/broker/server/MqttSessionHandler.java +++ b/application/src/main/java/org/thingsboard/mqtt/broker/server/MqttSessionHandler.java @@ -223,7 +223,7 @@ private void processPublish(MqttMessage msg) { rateLimitBatchProcessor.addMessage(mqttPublishMsg, mqttMsg -> clientMqttActorManager.processMqttMsg(clientId, mqttMsg), mqttMsg -> { - processMsgOnRateLimits(mqttMsg.getPublishMsg().getPacketId(), mqttMsg.getPublishMsg().getQosLevel(), "Total rate limits detected"); + processMsgOnRateLimits(mqttMsg.getPublishMsg().getPacketId(), mqttMsg.getPublishMsg().getQos(), "Total rate limits detected"); mqttMsg.release(); }); return; diff --git a/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/DefaultMqttMessageCreator.java b/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/DefaultMqttMessageCreator.java index 6fa61f6f3..77301c7ae 100644 --- a/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/DefaultMqttMessageCreator.java +++ b/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/DefaultMqttMessageCreator.java @@ -202,7 +202,7 @@ public MqttMessage createPubCompMsg(int msgId, MqttReasonCodes.PubComp code) { @Override public MqttPublishMessage createPubMsg(PublishMsg pubMsg) { - return getMqttPublishMessage(pubMsg.isDup(), pubMsg.getQosLevel(), pubMsg.isRetained(), + return getMqttPublishMessage(pubMsg.isDup(), pubMsg.getQos(), pubMsg.isRetained(), pubMsg.getTopicName(), pubMsg.getPacketId(), pubMsg.getPayload(), pubMsg.getProperties()); } diff --git a/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/DefaultPublishMsgDeliveryService.java b/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/DefaultPublishMsgDeliveryService.java index 022fd3250..0ff32555c 100644 --- a/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/DefaultPublishMsgDeliveryService.java +++ b/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/DefaultPublishMsgDeliveryService.java @@ -135,7 +135,7 @@ public void sendPublishMsgToClientWithoutFlush(ClientSessionCtx sessionCtx, Publ pubMsg = sessionCtx.getTopicAliasCtx().createPublishMsgUsingTopicAlias(pubMsg, minTopicNameLengthForAliasReplacement); MqttPublishMessage mqttPubMsg = mqttMessageGenerator.createPubMsg(pubMsg); tbMessageStatsReportClient.reportStats(OUTGOING_MSGS); - tbMessageStatsReportClient.reportClientReceiveStats(sessionCtx.getClientId(), pubMsg.getQosLevel()); + tbMessageStatsReportClient.reportClientReceiveStats(sessionCtx.getClientId(), pubMsg.getQos()); sendPublishMsgWithoutFlushToClient(sessionCtx, mqttPubMsg); } diff --git a/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/PublishMsg.java b/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/PublishMsg.java index 71219bfc6..532975a42 100644 --- a/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/PublishMsg.java +++ b/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/PublishMsg.java @@ -33,17 +33,17 @@ public class PublishMsg { private final String topicName; private final byte[] payload; private final ByteBuf byteBuf; - private final int qosLevel; + private final int qos; private final boolean isRetained; private final boolean isDup; private final MqttProperties properties; - public PublishMsg(int packetId, String topicName, byte[] payload, int qosLevel, boolean isRetained, boolean isDup) { + public PublishMsg(int packetId, String topicName, byte[] payload, int qos, boolean isRetained, boolean isDup) { this.packetId = packetId; this.topicName = topicName; this.payload = payload; this.byteBuf = Unpooled.wrappedBuffer(payload); - this.qosLevel = qosLevel; + this.qos = qos; this.isRetained = isRetained; this.isDup = isDup; this.properties = MqttProperties.NO_PROPERTIES; diff --git a/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/persistence/application/processing/ApplicationPackProcessingCtx.java b/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/persistence/application/processing/ApplicationPackProcessingCtx.java index d40acd4c8..58b89f596 100644 --- a/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/persistence/application/processing/ApplicationPackProcessingCtx.java +++ b/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/persistence/application/processing/ApplicationPackProcessingCtx.java @@ -61,7 +61,7 @@ public ApplicationPackProcessingCtx(ApplicationSubmitStrategy submitStrategy, Ap switch (persistedMsg.getPacketType()) { case PUBLISH: PersistedPublishMsg persistedPublishMsg = (PersistedPublishMsg) persistedMsg; - if (persistedPublishMsg.isSharedSubscriptionMsg() && persistedPublishMsg.getPublishMsg().getQosLevel() == 0) { + if (persistedPublishMsg.isSharedSubscriptionMsg() && persistedPublishMsg.getPublishMsg().getQos() == 0) { break; } publishPendingMsgMap.put(persistedMsg.getPacketId(), persistedPublishMsg); diff --git a/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/retain/RetainedMsgProcessorImpl.java b/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/retain/RetainedMsgProcessorImpl.java index 4bec14475..b2af6e2cd 100644 --- a/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/retain/RetainedMsgProcessorImpl.java +++ b/application/src/main/java/org/thingsboard/mqtt/broker/service/mqtt/retain/RetainedMsgProcessorImpl.java @@ -54,7 +54,7 @@ private boolean payloadIsEmpty(PublishMsg publishMsg) { private RetainedMsg newRetainedMsg(PublishMsg publishMsg) { MqttProperties properties = MqttPropertiesUtil.getMqttProperties(publishMsg); - return new RetainedMsg(publishMsg.getTopicName(), publishMsg.getPayload(), publishMsg.getQosLevel(), properties); + return new RetainedMsg(publishMsg.getTopicName(), publishMsg.getPayload(), publishMsg.getQos(), properties); } } diff --git a/application/src/main/java/org/thingsboard/mqtt/broker/service/processing/MsgDispatcherServiceImpl.java b/application/src/main/java/org/thingsboard/mqtt/broker/service/processing/MsgDispatcherServiceImpl.java index 8f78a45fd..7bc7f03af 100644 --- a/application/src/main/java/org/thingsboard/mqtt/broker/service/processing/MsgDispatcherServiceImpl.java +++ b/application/src/main/java/org/thingsboard/mqtt/broker/service/processing/MsgDispatcherServiceImpl.java @@ -30,8 +30,8 @@ import org.thingsboard.mqtt.broker.common.data.ClientType; import org.thingsboard.mqtt.broker.common.data.MqttQoS; import org.thingsboard.mqtt.broker.common.data.SessionInfo; -import org.thingsboard.mqtt.broker.common.data.util.StringUtils; import org.thingsboard.mqtt.broker.common.data.subscription.SubscriptionOptions; +import org.thingsboard.mqtt.broker.common.data.util.StringUtils; import org.thingsboard.mqtt.broker.common.stats.MessagesStats; import org.thingsboard.mqtt.broker.gen.queue.QueueProtos.PublishMsgProto; import org.thingsboard.mqtt.broker.queue.TbQueueCallback; @@ -107,7 +107,7 @@ public void persistPublishMsg(SessionInfo sessionInfo, PublishMsg publishMsg, Tb PublishMsgProto publishMsgProto = ProtoConverter.convertToPublishMsgProto(sessionInfo, publishMsg); producerStats.incrementTotal(); tbMessageStatsReportClient.reportStats(INCOMING_MSGS); - tbMessageStatsReportClient.reportClientSendStats(sessionInfo.getClientId(), publishMsg.getQosLevel()); + tbMessageStatsReportClient.reportClientSendStats(sessionInfo.getClientId(), publishMsg.getQos()); callback = statsManager.wrapTbQueueCallback(callback, producerStats); DefaultTbQueueMsgHeaders headers = createHeaders(publishMsg); diff --git a/application/src/test/java/org/thingsboard/mqtt/broker/adaptor/NettyMqttConverterTest.java b/application/src/test/java/org/thingsboard/mqtt/broker/adaptor/NettyMqttConverterTest.java index 682d513d5..34dc8221f 100644 --- a/application/src/test/java/org/thingsboard/mqtt/broker/adaptor/NettyMqttConverterTest.java +++ b/application/src/test/java/org/thingsboard/mqtt/broker/adaptor/NettyMqttConverterTest.java @@ -50,10 +50,10 @@ import org.thingsboard.mqtt.broker.actors.client.messages.mqtt.MqttPublishMsg; import org.thingsboard.mqtt.broker.actors.client.messages.mqtt.MqttSubscribeMsg; import org.thingsboard.mqtt.broker.actors.client.messages.mqtt.MqttUnsubscribeMsg; +import org.thingsboard.mqtt.broker.common.data.BrokerConstants; import org.thingsboard.mqtt.broker.common.data.SessionInfo; import org.thingsboard.mqtt.broker.common.data.subscription.SubscriptionOptions; import org.thingsboard.mqtt.broker.common.data.subscription.TopicSubscription; -import org.thingsboard.mqtt.broker.common.data.BrokerConstants; import org.thingsboard.mqtt.broker.service.mqtt.PublishMsg; import org.thingsboard.mqtt.broker.session.ClientSessionCtx; import org.thingsboard.mqtt.broker.session.DisconnectReasonType; @@ -224,7 +224,7 @@ public void testCreateMqttPublishMsg() { PublishMsg publishMsg = mqttPublishMsg.getPublishMsg(); Assert.assertEquals(1, publishMsg.getPacketId()); Assert.assertEquals("topic", publishMsg.getTopicName()); - Assert.assertEquals(MqttQoS.AT_LEAST_ONCE.value(), publishMsg.getQosLevel()); + Assert.assertEquals(MqttQoS.AT_LEAST_ONCE.value(), publishMsg.getQos()); Assert.assertFalse(publishMsg.isRetained()); Assert.assertFalse(publishMsg.isDup()); } diff --git a/application/src/test/java/org/thingsboard/mqtt/broker/adaptor/ProtoConverterTest.java b/application/src/test/java/org/thingsboard/mqtt/broker/adaptor/ProtoConverterTest.java index c69bfc6c1..3dff6c2c3 100644 --- a/application/src/test/java/org/thingsboard/mqtt/broker/adaptor/ProtoConverterTest.java +++ b/application/src/test/java/org/thingsboard/mqtt/broker/adaptor/ProtoConverterTest.java @@ -367,7 +367,7 @@ public void givenPubMsg_whenExecuteConvertToPublishMsgProto_thenGetExpectedPubMs PublishMsg publishMsg = PublishMsg.builder() .topicName("topic") .packetId(1) - .qosLevel(2) + .qos(2) .properties(properties) .payload("p".getBytes(StandardCharsets.UTF_8)) .build(); @@ -391,7 +391,7 @@ public void givenPubMsgProto_whenExecuteConvertToPublishMsg_thenGetExpectedPubMs assertEquals("t", publishMsg.getTopicName()); assertEquals(1, publishMsg.getPacketId()); - assertEquals(1, publishMsg.getQosLevel()); + assertEquals(1, publishMsg.getQos()); assertNotNull(publishMsg.getProperties().getProperty(BrokerConstants.PAYLOAD_FORMAT_INDICATOR_PROP_ID)); assertNull(publishMsg.getProperties().getProperty(BrokerConstants.CONTENT_TYPE_PROP_ID)); assertNull(publishMsg.getProperties().getProperty(BrokerConstants.SUBSCRIPTION_IDENTIFIER_PROP_ID)); diff --git a/application/src/test/java/org/thingsboard/mqtt/broker/service/mqtt/persistence/application/processing/ApplicationPersistedMsgCtxServiceImplTest.java b/application/src/test/java/org/thingsboard/mqtt/broker/service/mqtt/persistence/application/processing/ApplicationPersistedMsgCtxServiceImplTest.java index 77c3a57ec..ab569da93 100644 --- a/application/src/test/java/org/thingsboard/mqtt/broker/service/mqtt/persistence/application/processing/ApplicationPersistedMsgCtxServiceImplTest.java +++ b/application/src/test/java/org/thingsboard/mqtt/broker/service/mqtt/persistence/application/processing/ApplicationPersistedMsgCtxServiceImplTest.java @@ -129,7 +129,7 @@ private PublishMsg buildPubMsg() { .packetId(1) .topicName("topic") .payload("data".getBytes()) - .qosLevel(1) + .qos(1) .isRetained(false) .isDup(false) .build();