Skip to content

Commit baf8d15

Browse files
committed
Refactor PublushFlowHanderl, so, now we have two clear distinct paths for Qos1 and Qos2, Qos1 is using a map while Qos2 is using the IncomingMessageFlowPersistence
1 parent c5b9932 commit baf8d15

File tree

3 files changed

+81
-79
lines changed

3 files changed

+81
-79
lines changed

src/main/java/com/hivemq/mqtt/handler/publish/PublishFlowHandler.java

Lines changed: 33 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class PublishFlowHandler extends ChannelDuplexHandler {
7070
private final @NotNull IncomingPublishHandler incomingPublishHandler;
7171
private final @NotNull DropOutgoingPublishesHandler dropOutgoingPublishesHandler;
7272

73-
private final @NotNull Map<Integer, Boolean> qos1And2AlreadySentMap;
73+
private final @NotNull Map<Integer, Boolean> qos1AlreadySentMap;
7474

7575
@VisibleForTesting
7676
@Inject
@@ -83,7 +83,7 @@ public PublishFlowHandler(
8383
this.publishPollService = publishPollService;
8484
this.persistence = persistence;
8585
this.orderedTopicService = orderedTopicService;
86-
this.qos1And2AlreadySentMap = new HashMap<>();
86+
this.qos1AlreadySentMap = new HashMap<>();
8787
this.incomingPublishHandler = incomingPublishHandler;
8888
this.dropOutgoingPublishesHandler = dropOutgoingPublishesHandler;
8989
}
@@ -117,13 +117,17 @@ public void write(
117117

118118
if (msg instanceof PUBACK) {
119119
final PUBACK puback = (PUBACK) msg;
120-
final String client = ClientConnection.of(ctx.channel()).getClientId();
121120
final int messageId = puback.getPacketIdentifier();
122-
persistence.addOrReplace(client, messageId, puback);
123-
promise.addListener(new PUBLISHFlowCompleteListener(messageId,
124-
client,
125-
qos1And2AlreadySentMap,
126-
persistence));
121+
promise.addListener((ChannelFutureListener) future -> {
122+
if (future.isSuccess()) {
123+
qos1AlreadySentMap.remove(messageId);
124+
if (log.isTraceEnabled()) {
125+
log.trace("Client '{}' completed a PUBLISH flow with QoS 1 for packet identifier '{}'",
126+
ctx,
127+
messageId);
128+
}
129+
}
130+
});
127131
}
128132

129133
final boolean flowComplete = orderedTopicService.handlePublish(ctx.channel(), msg, promise);
@@ -183,22 +187,27 @@ private void handlePublish(
183187

184188
if (publish.getQoS() == QoS.AT_MOST_ONCE) {// do nothing
185189
incomingPublishHandler.interceptOrDelegate(ctx, publish, clientId);
186-
// QoS 1 or 2 duplicate delivery handling
187-
} else {
190+
// QoS 1 delivery handling
191+
} else if (publish.getQoS() == QoS.AT_LEAST_ONCE) {
188192
UNACKNOWLEDGED_PUBLISHES_COUNTER.incrementAndGet();
193+
if (publish.isDuplicateDelivery() && qos1AlreadySentMap.get(publish.getPacketIdentifier()) != null) {
194+
log.debug("Client {} sent a duplicate publish message with id {}. This message is ignored",
195+
clientId,
196+
publish.getPacketIdentifier());
197+
} else {
198+
final int packetId = publish.getPacketIdentifier();
199+
qos1AlreadySentMap.put(publish.getPacketIdentifier(), true);
200+
firstPublishForMessageIdReceived(ctx, publish, clientId, packetId);
201+
}
202+
// QoS 2 duplicate delivery handling
203+
} else {
189204
final int messageId = publish.getPacketIdentifier();
190205
final MessageWithID savedMessage = persistence.get(clientId, messageId);
191-
192-
//No PUBLISH message was found in persistence. This is the standard case since we don't know this message yet
193206
if (!(savedMessage instanceof PUBLISH)) {
207+
persistence.addOrReplace(clientId, messageId, publish);
194208
firstPublishForMessageIdReceived(ctx, publish, clientId, messageId);
195-
//The publish was resent with the DUP flag
196-
} else if (publish.isDuplicateDelivery()) {
197-
resentWithDUPFlag(ctx, publish, clientId);
198-
//The publish was resent without DUP flag!
199-
} else {
200-
resentWithoutDUPFlag(ctx, publish, clientId);
201-
}
209+
} else
210+
ctx.writeAndFlush(new PUBREC(messageId));
202211
}
203212
}
204213

@@ -207,47 +216,13 @@ private void firstPublishForMessageIdReceived(
207216
final @NotNull PUBLISH publish,
208217
final @NotNull String client,
209218
final int messageId) throws Exception {
210-
persistence.addOrReplace(client, messageId, publish);
211219
incomingPublishHandler.interceptOrDelegate(ctx, publish, client);
212-
qos1And2AlreadySentMap.put(messageId, true);
213220
log.trace(
214221
"Client {} sent a publish message with id {} which was not forwarded before. This message is processed normally",
215222
client,
216223
messageId);
217224
}
218225

219-
private void resentWithDUPFlag(
220-
final @NotNull ChannelHandlerContext ctx, final @NotNull PUBLISH publish, final @NotNull String client)
221-
throws Exception {
222-
final Boolean alreadySent = qos1And2AlreadySentMap.get(publish.getPacketIdentifier());
223-
if (alreadySent != null && alreadySent) {
224-
225-
log.debug("Client {} sent a duplicate publish message with id {}. This message is ignored",
226-
client,
227-
publish.getPacketIdentifier());
228-
} else {
229-
super.channelRead(ctx, publish);
230-
log.debug(
231-
"Client {} sent a duplicate publish message with id {} which was not forwarded before. This message is processed normally",
232-
client,
233-
publish.getPacketIdentifier());
234-
}
235-
qos1And2AlreadySentMap.put(publish.getPacketIdentifier(), true);
236-
}
237-
238-
private void resentWithoutDUPFlag(
239-
final @NotNull ChannelHandlerContext ctx, final @NotNull PUBLISH publish, final @NotNull String client)
240-
throws Exception {
241-
log.debug(
242-
"Client {} sent a new PUBLISH with QoS {} and a message identifier which is already in process ({}) by another flow! Starting new flow",
243-
client,
244-
publish.getQoS().getQosNumber(),
245-
publish.getPacketIdentifier());
246-
persistence.addOrReplace(client, publish.getPacketIdentifier(), publish);
247-
incomingPublishHandler.interceptOrDelegate(ctx, publish, client);
248-
qos1And2AlreadySentMap.put(publish.getPacketIdentifier(), true);
249-
}
250-
251226

252227
private void handlePuback(final @NotNull ChannelHandlerContext ctx, final PUBACK msg) {
253228

@@ -291,7 +266,7 @@ private void handlePubrel(final ChannelHandlerContext ctx, final PUBREL pubrel)
291266

292267
persistence.addOrReplace(client, messageId, pubrel);
293268
ctx.writeAndFlush(new PUBCOMP(messageId))
294-
.addListener(new PUBLISHFlowCompleteListener(messageId, client, qos1And2AlreadySentMap, persistence));
269+
.addListener(new PubcompSentListener(messageId, client, persistence));
295270
}
296271

297272
private void handlePubcomp(final @NotNull ChannelHandlerContext ctx, @NotNull final PUBCOMP msg) {
@@ -329,31 +304,27 @@ private void returnMessageId(
329304
}
330305

331306
@Immutable
332-
private static class PUBLISHFlowCompleteListener implements ChannelFutureListener {
307+
private static class PubcompSentListener implements ChannelFutureListener {
333308

334309
private final int messageId;
335310
private final @NotNull String client;
336-
private final @NotNull Map<Integer, Boolean> qos1And2AlreadySentMap;
337311
private final @NotNull IncomingMessageFlowPersistence persistence;
338312

339-
PUBLISHFlowCompleteListener(
313+
PubcompSentListener(
340314
final int messageId,
341315
final @NotNull String client,
342-
final @NotNull Map<Integer, Boolean> qos1And2AlreadySentMap,
343316
final @NotNull IncomingMessageFlowPersistence persistence) {
344317
this.messageId = messageId;
345318
this.client = client;
346-
this.qos1And2AlreadySentMap = qos1And2AlreadySentMap;
347319
this.persistence = persistence;
348320
}
349321

350322
@Override
351-
public void operationComplete(final ChannelFuture future) throws Exception {
323+
public void operationComplete(final ChannelFuture future) {
352324
if (future.isSuccess()) {
353325
UNACKNOWLEDGED_PUBLISHES_COUNTER.decrementAndGet();
354-
qos1And2AlreadySentMap.remove(messageId);
355326
persistence.remove(client, messageId);
356-
log.trace("Client '{}' completed a PUBLISH flow with QoS 1 or 2 for packet identifier '{}'",
327+
log.trace("Client '{}' completed a PUBLISH flow with QoS 2 for packet identifier '{}'",
357328
client,
358329
messageId);
359330
}

src/main/java/com/hivemq/persistence/qos/IncomingMessageFlowPersistenceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class IncomingMessageFlowPersistenceImpl implements IncomingMessageFlowPe
3232
private final @NotNull IncomingMessageFlowLocalPersistence localPersistence;
3333

3434
@Inject
35-
IncomingMessageFlowPersistenceImpl(final @NotNull IncomingMessageFlowLocalPersistence localPersistence) {
35+
public IncomingMessageFlowPersistenceImpl(final @NotNull IncomingMessageFlowLocalPersistence localPersistence) {
3636
this.localPersistence = localPersistence;
3737
}
3838

src/test/java/com/hivemq/mqtt/handler/publish/PublishFlowHandlerTest.java

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@
3737
import com.hivemq.mqtt.message.reason.Mqtt5PubRecReasonCode;
3838
import com.hivemq.mqtt.message.reason.Mqtt5PubRelReasonCode;
3939
import com.hivemq.mqtt.services.PublishPollService;
40+
import com.hivemq.persistence.local.IncomingMessageFlowInMemoryLocalPersistence;
4041
import com.hivemq.persistence.qos.IncomingMessageFlowPersistence;
42+
import com.hivemq.persistence.qos.IncomingMessageFlowPersistenceImpl;
43+
import io.netty.channel.ChannelHandlerContext;
4144
import io.netty.channel.ChannelPromise;
4245
import io.netty.channel.embedded.EmbeddedChannel;
4346
import org.junit.After;
@@ -51,6 +54,7 @@
5154
import static org.junit.Assert.assertEquals;
5255
import static org.junit.Assert.assertNotNull;
5356
import static org.junit.Assert.assertTrue;
57+
import static org.mockito.ArgumentMatchers.any;
5458
import static org.mockito.ArgumentMatchers.eq;
5559
import static org.mockito.ArgumentMatchers.same;
5660
import static org.mockito.Mockito.anyInt;
@@ -213,7 +217,7 @@ public void test_qos_1_messages_is_dup_ignored() {
213217
}
214218

215219
@Test
216-
public void test_qos_1_messages_is_not_dup() {
220+
public void test_qos_1_messages_is_dup() {
217221

218222
final int messageid = 1;
219223

@@ -231,9 +235,6 @@ public void test_qos_1_messages_is_not_dup() {
231235
channel.writeInbound(publish);
232236

233237
assertEquals(true, channel.outboundMessages().isEmpty());
234-
verify(incomingMessageFlowPersistence, times(2)).addOrReplace(CLIENT_ID,
235-
publish.getPacketIdentifier(),
236-
publish);
237238
}
238239

239240
@Test
@@ -270,7 +271,7 @@ public void test_qos_2_messages_is_dup_not_forwarded() {
270271
channel.writeInbound(publish);
271272

272273
//pubcomp is here
273-
assertEquals(1, channel.outboundMessages().size());
274+
assertEquals(2, channel.outboundMessages().size());
274275
}
275276

276277
@Test
@@ -292,11 +293,11 @@ public void test_qos_2_messages_is_dup_ignored() {
292293
channel.writeInbound(publish);
293294
channel.writeInbound(publish);
294295

295-
assertEquals(true, channel.outboundMessages().isEmpty());
296+
assertEquals(false, channel.outboundMessages().isEmpty());
296297
}
297298

298299
@Test
299-
public void test_qos_2_messages_is_not_dup() {
300+
public void test_qos_2_messages_is_dup() {
300301

301302
final int messageid = 1;
302303

@@ -313,8 +314,8 @@ public void test_qos_2_messages_is_not_dup() {
313314
channel.writeInbound(publish);
314315
channel.writeInbound(publish);
315316

316-
assertEquals(true, channel.outboundMessages().isEmpty());
317-
verify(incomingMessageFlowPersistence, times(2)).addOrReplace(CLIENT_ID,
317+
assertEquals(false, channel.outboundMessages().isEmpty());
318+
verify(incomingMessageFlowPersistence, times(1)).addOrReplace(CLIENT_ID,
318319
publish.getPacketIdentifier(),
319320
publish);
320321
}
@@ -352,13 +353,6 @@ public void test_acknowledge_qos_1_message() {
352353

353354
assertNotNull(pubackOut);
354355
assertEquals(puback.getPacketIdentifier(), pubackOut.getPacketIdentifier());
355-
356-
verify(incomingMessageFlowPersistence).addOrReplace(eq("client"),
357-
eq(puback.getPacketIdentifier()),
358-
same(puback));
359-
360-
//We have to make sure that the client was actually deleted in the end
361-
verify(incomingMessageFlowPersistence).remove(eq("client"), eq(puback.getPacketIdentifier()));
362356
}
363357

364358
@Test
@@ -817,6 +811,43 @@ public void test_max_inflight_window() throws Exception {
817811
assertEquals(3, orderedTopicService.unacknowledgedMessages().size());
818812
}
819813

814+
@Test()
815+
public void test_Qos2AndQos1PublishDoNotInterfereWithEachOther() throws Exception {
816+
InternalConfigurations.MAX_INFLIGHT_WINDOW_SIZE_MESSAGES = 50;
817+
channel = new EmbeddedChannel(new PublishFlowHandler(publishPollService,
818+
new IncomingMessageFlowPersistenceImpl(new IncomingMessageFlowInMemoryLocalPersistence()),
819+
orderedTopicService,
820+
incomingPublishHandler,
821+
mock(DropOutgoingPublishesHandler.class)));
822+
final ClientConnection clientConnection = spy(new DummyClientConnection(channel, null));
823+
when(clientConnection.getFreePacketIdRanges()).thenReturn(freePacketIdRanges);
824+
channel.attr(ClientConnectionContext.CHANNEL_ATTRIBUTE_NAME).set(clientConnection);
825+
ClientConnection.of(channel).setClientId(CLIENT_ID);
826+
827+
828+
final PUBLISH publishQoS1 = createPublish("topic", 100, QoS.AT_LEAST_ONCE);
829+
final PUBLISH publishQoS2 = createPublish("topic", 100, QoS.EXACTLY_ONCE);
830+
831+
final PUBLISH publishQoS2_1000 = createPublish("topic", 100, QoS.EXACTLY_ONCE);
832+
833+
channel.pipeline().fireChannelRead(publishQoS2);
834+
channel.pipeline().fireChannelRead(publishQoS1);
835+
channel.pipeline().fireChannelRead(publishQoS2_1000);
836+
837+
assertEquals(0, orderedTopicService.queue.size());
838+
assertEquals(0, orderedTopicService.unacknowledgedMessages().size());
839+
verify(incomingPublishHandler, times(1)).interceptOrDelegate(any(ChannelHandlerContext.class),
840+
eq(publishQoS1),
841+
eq(CLIENT_ID));
842+
verify(incomingPublishHandler, times(1)).interceptOrDelegate(any(ChannelHandlerContext.class),
843+
eq(publishQoS2),
844+
eq(CLIENT_ID));
845+
verify(incomingPublishHandler, times(0)).interceptOrDelegate(any(ChannelHandlerContext.class),
846+
eq(publishQoS2_1000),
847+
eq(CLIENT_ID));
848+
}
849+
850+
820851
private PUBLISH createPublish(final String topic, final int messageId, final QoS qoS) {
821852
return createPublish(topic, messageId, qoS, false);
822853
}

0 commit comments

Comments
 (0)