Skip to content

Commit

Permalink
PublishMsg rename qosLevel to qos
Browse files Browse the repository at this point in the history
  • Loading branch information
dmytro-landiak committed Nov 18, 2024
1 parent fbfa6ee commit 7c701b9
Show file tree
Hide file tree
Showing 13 changed files with 25 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ public void process(ClientSessionCtx ctx, MqttPublishMsg msg, TbActorRef actorRe

MqttMsgWrapper mqttMsgWrapper = null; // for QoS 0
try {
if (MqttQoS.EXACTLY_ONCE.value() == publishMsg.getQosLevel()) {
if (MqttQoS.EXACTLY_ONCE.value() == publishMsg.getQos()) {
mqttMsgWrapper = processExactlyOnce(ctx, msgId);
if (ensureMsgPersistedAwaitingPubRel(ctx, actorRef, mqttMsgWrapper)) return;
} else if (MqttQoS.AT_LEAST_ONCE.value() == publishMsg.getQosLevel()) {
} else if (MqttQoS.AT_LEAST_ONCE.value() == publishMsg.getQos()) {
mqttMsgWrapper = processAtLeastOnce(ctx, msgId);
}
} catch (FullMsgQueueException e) {
Expand Down Expand Up @@ -180,9 +180,9 @@ private void handleMsgPersistenceFailure(ClientSessionCtx ctx, PublishMsg publis

private void handleMqtt5ErrorResponse(ClientSessionCtx ctx, PublishMsg publishMsg,
MqttReasonCodes.PubRec pubRecCode, MqttReasonCodes.PubAck pubAckCode) {
if (publishMsg.getQosLevel() == 2) {
if (publishMsg.getQos() == 2) {
pushPubRecErrorResponseWithReasonCode(ctx, publishMsg, pubRecCode);
} else if (publishMsg.getQosLevel() == 1) {
} else if (publishMsg.getQos() == 1) {
pushPubAckErrorResponseWithReasonCode(ctx, publishMsg, pubAckCode);
} else {
// QoS=0 - do nothing
Expand Down Expand Up @@ -218,7 +218,7 @@ public void onSuccess(TbQueueMsgMetadata metadata) {
if (isTraceEnabled) {
log.trace("[{}][{}] Successfully acknowledged msg: {}", ctx.getClientId(), ctx.getSessionId(), publishMsg.getPacketId());
}
sendPubResponseEventToActor(actorRef, ctx.getSessionId(), mqttMsgWrapper, MqttQoS.valueOf(publishMsg.getQosLevel()));
sendPubResponseEventToActor(actorRef, ctx.getSessionId(), mqttMsgWrapper, MqttQoS.valueOf(publishMsg.getQos()));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import org.thingsboard.mqtt.broker.actors.client.messages.mqtt.MqttPublishMsg;
import org.thingsboard.mqtt.broker.actors.client.messages.mqtt.MqttSubscribeMsg;
import org.thingsboard.mqtt.broker.actors.client.messages.mqtt.MqttUnsubscribeMsg;
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
import org.thingsboard.mqtt.broker.common.data.subscription.SubscriptionOptions;
import org.thingsboard.mqtt.broker.common.data.subscription.TopicSubscription;
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
import org.thingsboard.mqtt.broker.service.mqtt.PublishMsg;
import org.thingsboard.mqtt.broker.session.ClientSessionCtx;
import org.thingsboard.mqtt.broker.session.DisconnectReason;
Expand Down Expand Up @@ -185,7 +185,7 @@ private static PublishMsg extractPublishMsg(MqttPublishMessage mqttPublishMessag
return PublishMsg.builder()
.packetId(mqttPublishMessage.variableHeader().packetId())
.topicName(mqttPublishMessage.variableHeader().topicName())
.qosLevel(mqttPublishMessage.fixedHeader().qosLevel().value())
.qos(mqttPublishMessage.fixedHeader().qosLevel().value())
.isRetained(mqttPublishMessage.fixedHeader().isRetain())
.isDup(mqttPublishMessage.fixedHeader().isDup())
.byteBuf(byteBuf)
Expand All @@ -200,7 +200,7 @@ private static PublishMsg extractLastWillPublishMsg(MqttConnectMessage msg) {
.payload(msg.payload().willMessageInBytes())
.isRetained(msg.variableHeader().isWillRetain())
.isDup(false)
.qosLevel(msg.variableHeader().willQos())
.qos(msg.variableHeader().willQos())
.properties(msg.payload().willProperties())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static QueueProtos.PublishMsgProto convertToPublishMsgProto(SessionInfo s
QueueProtos.PublishMsgProto.Builder builder = QueueProtos.PublishMsgProto.newBuilder()
.setPacketId(publishMsg.getPacketId())
.setTopicName(publishMsg.getTopicName())
.setQos(publishMsg.getQosLevel())
.setQos(publishMsg.getQos())
.setRetain(publishMsg.isRetained())
.addAllUserProperties(toUserPropertyProtos(userProperties))
.setClientId(sessionInfo.getClientInfo().getClientId());
Expand Down Expand Up @@ -155,7 +155,7 @@ public static PublishMsg convertToPublishMsg(QueueProtos.PublishMsgProto msg, in
.payload(msg.getPayload().toByteArray())
.properties(properties)
.packetId(packetId)
.qosLevel(qos)
.qos(qos)
.isDup(isDup)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private void processPublish(MqttMessage msg) {
rateLimitBatchProcessor.addMessage(mqttPublishMsg,
mqttMsg -> clientMqttActorManager.processMqttMsg(clientId, mqttMsg),
mqttMsg -> {
processMsgOnRateLimits(mqttMsg.getPublishMsg().getPacketId(), mqttMsg.getPublishMsg().getQosLevel(), "Total rate limits detected");
processMsgOnRateLimits(mqttMsg.getPublishMsg().getPacketId(), mqttMsg.getPublishMsg().getQos(), "Total rate limits detected");
mqttMsg.release();
});
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public MqttMessage createPubCompMsg(int msgId, MqttReasonCodes.PubComp code) {

@Override
public MqttPublishMessage createPubMsg(PublishMsg pubMsg) {
return getMqttPublishMessage(pubMsg.isDup(), pubMsg.getQosLevel(), pubMsg.isRetained(),
return getMqttPublishMessage(pubMsg.isDup(), pubMsg.getQos(), pubMsg.isRetained(),
pubMsg.getTopicName(), pubMsg.getPacketId(), pubMsg.getPayload(), pubMsg.getProperties());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void sendPublishMsgToClientWithoutFlush(ClientSessionCtx sessionCtx, Publ
pubMsg = sessionCtx.getTopicAliasCtx().createPublishMsgUsingTopicAlias(pubMsg, minTopicNameLengthForAliasReplacement);
MqttPublishMessage mqttPubMsg = mqttMessageGenerator.createPubMsg(pubMsg);
tbMessageStatsReportClient.reportStats(OUTGOING_MSGS);
tbMessageStatsReportClient.reportClientReceiveStats(sessionCtx.getClientId(), pubMsg.getQosLevel());
tbMessageStatsReportClient.reportClientReceiveStats(sessionCtx.getClientId(), pubMsg.getQos());
sendPublishMsgWithoutFlushToClient(sessionCtx, mqttPubMsg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ public class PublishMsg {
private final String topicName;
private final byte[] payload;
private final ByteBuf byteBuf;
private final int qosLevel;
private final int qos;
private final boolean isRetained;
private final boolean isDup;
private final MqttProperties properties;

public PublishMsg(int packetId, String topicName, byte[] payload, int qosLevel, boolean isRetained, boolean isDup) {
public PublishMsg(int packetId, String topicName, byte[] payload, int qos, boolean isRetained, boolean isDup) {
this.packetId = packetId;
this.topicName = topicName;
this.payload = payload;
this.byteBuf = Unpooled.wrappedBuffer(payload);
this.qosLevel = qosLevel;
this.qos = qos;
this.isRetained = isRetained;
this.isDup = isDup;
this.properties = MqttProperties.NO_PROPERTIES;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ApplicationPackProcessingCtx(ApplicationSubmitStrategy submitStrategy, Ap
switch (persistedMsg.getPacketType()) {
case PUBLISH:
PersistedPublishMsg persistedPublishMsg = (PersistedPublishMsg) persistedMsg;
if (persistedPublishMsg.isSharedSubscriptionMsg() && persistedPublishMsg.getPublishMsg().getQosLevel() == 0) {
if (persistedPublishMsg.isSharedSubscriptionMsg() && persistedPublishMsg.getPublishMsg().getQos() == 0) {
break;
}
publishPendingMsgMap.put(persistedMsg.getPacketId(), persistedPublishMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private boolean payloadIsEmpty(PublishMsg publishMsg) {

private RetainedMsg newRetainedMsg(PublishMsg publishMsg) {
MqttProperties properties = MqttPropertiesUtil.getMqttProperties(publishMsg);
return new RetainedMsg(publishMsg.getTopicName(), publishMsg.getPayload(), publishMsg.getQosLevel(), properties);
return new RetainedMsg(publishMsg.getTopicName(), publishMsg.getPayload(), publishMsg.getQos(), properties);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.thingsboard.mqtt.broker.common.data.ClientType;
import org.thingsboard.mqtt.broker.common.data.MqttQoS;
import org.thingsboard.mqtt.broker.common.data.SessionInfo;
import org.thingsboard.mqtt.broker.common.data.util.StringUtils;
import org.thingsboard.mqtt.broker.common.data.subscription.SubscriptionOptions;
import org.thingsboard.mqtt.broker.common.data.util.StringUtils;
import org.thingsboard.mqtt.broker.common.stats.MessagesStats;
import org.thingsboard.mqtt.broker.gen.queue.QueueProtos.PublishMsgProto;
import org.thingsboard.mqtt.broker.queue.TbQueueCallback;
Expand Down Expand Up @@ -107,7 +107,7 @@ public void persistPublishMsg(SessionInfo sessionInfo, PublishMsg publishMsg, Tb
PublishMsgProto publishMsgProto = ProtoConverter.convertToPublishMsgProto(sessionInfo, publishMsg);
producerStats.incrementTotal();
tbMessageStatsReportClient.reportStats(INCOMING_MSGS);
tbMessageStatsReportClient.reportClientSendStats(sessionInfo.getClientId(), publishMsg.getQosLevel());
tbMessageStatsReportClient.reportClientSendStats(sessionInfo.getClientId(), publishMsg.getQos());
callback = statsManager.wrapTbQueueCallback(callback, producerStats);

DefaultTbQueueMsgHeaders headers = createHeaders(publishMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@
import org.thingsboard.mqtt.broker.actors.client.messages.mqtt.MqttPublishMsg;
import org.thingsboard.mqtt.broker.actors.client.messages.mqtt.MqttSubscribeMsg;
import org.thingsboard.mqtt.broker.actors.client.messages.mqtt.MqttUnsubscribeMsg;
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
import org.thingsboard.mqtt.broker.common.data.SessionInfo;
import org.thingsboard.mqtt.broker.common.data.subscription.SubscriptionOptions;
import org.thingsboard.mqtt.broker.common.data.subscription.TopicSubscription;
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
import org.thingsboard.mqtt.broker.service.mqtt.PublishMsg;
import org.thingsboard.mqtt.broker.session.ClientSessionCtx;
import org.thingsboard.mqtt.broker.session.DisconnectReasonType;
Expand Down Expand Up @@ -224,7 +224,7 @@ public void testCreateMqttPublishMsg() {
PublishMsg publishMsg = mqttPublishMsg.getPublishMsg();
Assert.assertEquals(1, publishMsg.getPacketId());
Assert.assertEquals("topic", publishMsg.getTopicName());
Assert.assertEquals(MqttQoS.AT_LEAST_ONCE.value(), publishMsg.getQosLevel());
Assert.assertEquals(MqttQoS.AT_LEAST_ONCE.value(), publishMsg.getQos());
Assert.assertFalse(publishMsg.isRetained());
Assert.assertFalse(publishMsg.isDup());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public void givenPubMsg_whenExecuteConvertToPublishMsgProto_thenGetExpectedPubMs
PublishMsg publishMsg = PublishMsg.builder()
.topicName("topic")
.packetId(1)
.qosLevel(2)
.qos(2)
.properties(properties)
.payload("p".getBytes(StandardCharsets.UTF_8))
.build();
Expand All @@ -391,7 +391,7 @@ public void givenPubMsgProto_whenExecuteConvertToPublishMsg_thenGetExpectedPubMs

assertEquals("t", publishMsg.getTopicName());
assertEquals(1, publishMsg.getPacketId());
assertEquals(1, publishMsg.getQosLevel());
assertEquals(1, publishMsg.getQos());
assertNotNull(publishMsg.getProperties().getProperty(BrokerConstants.PAYLOAD_FORMAT_INDICATOR_PROP_ID));
assertNull(publishMsg.getProperties().getProperty(BrokerConstants.CONTENT_TYPE_PROP_ID));
assertNull(publishMsg.getProperties().getProperty(BrokerConstants.SUBSCRIPTION_IDENTIFIER_PROP_ID));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private PublishMsg buildPubMsg() {
.packetId(1)
.topicName("topic")
.payload("data".getBytes())
.qosLevel(1)
.qos(1)
.isRetained(false)
.isDup(false)
.build();
Expand Down

0 comments on commit 7c701b9

Please sign in to comment.