Skip to content

Commit

Permalink
CR: renaming of pool to ranges for consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
Remit committed Oct 13, 2023
1 parent 82ee03c commit cef11f8
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 46 deletions.
6 changes: 3 additions & 3 deletions src/main/java/com/hivemq/bootstrap/ClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class ClientConnection implements ClientConnectionContext {

private final @NotNull Channel channel;
private final @NotNull PublishFlushHandler publishFlushHandler;
private final @NotNull FreePacketIdRanges messageIDPool = new FreePacketIdRanges();
private final @NotNull FreePacketIdRanges freePacketIdRanges = new FreePacketIdRanges();
private final @NotNull Listener connectedListener;
private volatile @NotNull ClientState clientState;

Expand Down Expand Up @@ -374,8 +374,8 @@ public void setQueueSizeMaximum(final @Nullable Long queueSizeMaximum) {
this.queueSizeMaximum = queueSizeMaximum;
}

public @NotNull FreePacketIdRanges getMessageIDPool() {
return messageIDPool;
public @NotNull FreePacketIdRanges getFreePacketIdRanges() {
return freePacketIdRanges;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,8 @@ private void returnMessageId(

//Such a message ID must never be zero, but better be safe than sorry
if (messageId > 0) {
final FreePacketIdRanges messageIDPool = ClientConnection.of(channel).getMessageIDPool();
messageIDPool.returnId(messageId);
final FreePacketIdRanges freePacketIdRanges = ClientConnection.of(channel).getFreePacketIdRanges();
freePacketIdRanges.returnId(messageId);
if (log.isTraceEnabled()) {
log.trace("Returning Message ID {} for client {} because of a {} message was received",
messageId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ public void onSuccess(final @Nullable PublishStatus status) {
}

if (qos0Publish.getPacketIdentifier() != 0) {
final FreePacketIdRanges messageIDPool = ClientConnection.of(channel).getMessageIDPool();
messageIDPool.returnId(qos0Publish.getPacketIdentifier());
final FreePacketIdRanges freePacketIdRanges = ClientConnection.of(channel).getFreePacketIdRanges();
freePacketIdRanges.returnId(qos0Publish.getPacketIdentifier());
}
}

Expand Down
22 changes: 11 additions & 11 deletions src/main/java/com/hivemq/mqtt/services/PublishPollServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ public void pollNewMessages(final @NotNull String client) {

@Override
public void pollNewMessages(final @NotNull String client, final @NotNull Channel channel) {
final FreePacketIdRanges messageIDPool = ClientConnection.of(channel).getMessageIDPool();
final FreePacketIdRanges freePacketIdRanges = ClientConnection.of(channel).getFreePacketIdRanges();
final ImmutableIntArray messageIds;
try {
messageIds = createMessageIds(messageIDPool, pollMessageLimit(channel));
messageIds = createMessageIds(freePacketIdRanges, pollMessageLimit(channel));
} catch (final NoMessageIdAvailableException e) {
// This should never happen if the limit for the poll message limit is set correctly
log.error("No message id available for client {}", client, e);
Expand All @@ -164,7 +164,7 @@ public void onSuccess(final ImmutableList<PUBLISH> publishes) {
}
}
for (int i = usedIds; i < messageIds.length(); i++) {
messageIDPool.returnId(messageIds.get(i));
freePacketIdRanges.returnId(messageIds.get(i));
}
final List<PublishWithFuture> publishesToSend = new ArrayList<>(publishes.size());
final AtomicInteger inFlightMessageCount = inFlightMessageCount(channel);
Expand All @@ -179,7 +179,7 @@ public void onSuccess(final ImmutableList<PUBLISH> publishes) {
false,
client,
publish,
messageIDPool,
freePacketIdRanges,
channel,
client),
MoreExecutors.directExecutor());
Expand Down Expand Up @@ -221,8 +221,8 @@ public void onSuccess(final ImmutableList<MessageWithID> messages) {
inFlightMessageCount.addAndGet(messages.size());
for (int i = 0, messagesSize = messages.size(); i < messagesSize; i++) {
final MessageWithID message = messages.get(i);
final FreePacketIdRanges messageIDPool = clientConnection.getMessageIDPool();
messageIDPool.takeIfAvailable(message.getPacketIdentifier());
final FreePacketIdRanges freePacketIdRanges = clientConnection.getFreePacketIdRanges();
freePacketIdRanges.takeIfAvailable(message.getPacketIdentifier());

if (message instanceof PUBLISH) {
final PUBLISH publish = (PUBLISH) message;
Expand All @@ -232,7 +232,7 @@ public void onSuccess(final ImmutableList<MessageWithID> messages) {
false,
client,
publish,
messageIDPool,
freePacketIdRanges,
channel,
client),
MoreExecutors.directExecutor());
Expand All @@ -245,7 +245,7 @@ public void onSuccess(final ImmutableList<MessageWithID> messages) {
final SettableFuture<PublishStatus> settableFuture = SettableFuture.create();
channel.writeAndFlush(new PubrelWithFuture((PUBREL) message, settableFuture));
Futures.addCallback(settableFuture,
new PubrelResendCallback(client, message, messageIDPool, channel),
new PubrelResendCallback(client, message, freePacketIdRanges, channel),
MoreExecutors.directExecutor());
}
}
Expand Down Expand Up @@ -318,7 +318,7 @@ public void onSuccess(final @NotNull ImmutableList<PUBLISH> publishes) {
if (publishes.isEmpty()) {
return;
}
final FreePacketIdRanges messageIDPool = clientConnection.getMessageIDPool();
final FreePacketIdRanges freePacketIdRanges = clientConnection.getFreePacketIdRanges();
final List<PublishWithFuture> publishesToSend = new ArrayList<>(publishes.size());
final AtomicInteger inFlightMessageCount = inFlightMessageCount(channel);
// Add all messages to the in-flight count before sending them out.
Expand All @@ -339,7 +339,7 @@ public void onSuccess(final @NotNull ImmutableList<PUBLISH> publishes) {
int packetId = 0;
try {
if (checkNotNull(minQos).getQosNumber() > 0) {
packetId = messageIDPool.takeNextId();
packetId = freePacketIdRanges.takeNextId();
}
} catch (final NoMessageIdAvailableException e) {
// This should never happen if the limit for the poll message limit is set correctly
Expand Down Expand Up @@ -367,7 +367,7 @@ public void onSuccess(final @NotNull ImmutableList<PUBLISH> publishes) {
true,
sharedSubscription,
publishToSend,
messageIDPool,
freePacketIdRanges,
channel,
client),
MoreExecutors.directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class PublishFlowHandlerTest {
private PublishPollService publishPollService;

@Mock
private FreePacketIdRanges pool;
private FreePacketIdRanges freePacketIdRanges;

@Mock
private IncomingPublishHandler incomingPublishHandler;
Expand All @@ -87,15 +87,15 @@ public class PublishFlowHandlerTest {
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
InternalConfigurations.MAX_INFLIGHT_WINDOW_SIZE_MESSAGES = 5;
when(pool.takeNextId()).thenReturn(100);
when(freePacketIdRanges.takeNextId()).thenReturn(100);
orderedTopicService = new OrderedTopicService();
channel = new EmbeddedChannel(new PublishFlowHandler(publishPollService,
incomingMessageFlowPersistence,
orderedTopicService,
incomingPublishHandler,
mock(DropOutgoingPublishesHandler.class)));
final ClientConnection clientConnection = spy(new DummyClientConnection(channel, null));
when(clientConnection.getMessageIDPool()).thenReturn(pool);
when(clientConnection.getFreePacketIdRanges()).thenReturn(freePacketIdRanges);
channel.attr(ClientConnectionContext.CHANNEL_ATTRIBUTE_NAME).set(clientConnection);
ClientConnection.of(channel).setClientId(CLIENT_ID);
}
Expand All @@ -108,29 +108,29 @@ public void tearDown() throws Exception {
@Test
public void test_return_qos_1_message_id() throws Exception {

final PUBACK puback = new PUBACK(pool.takeNextId());
final PUBACK puback = new PUBACK(freePacketIdRanges.takeNextId());
channel.writeInbound(puback);

verify(pool).returnId(eq(100));
verify(freePacketIdRanges).returnId(eq(100));

}

@Test
public void test_return_qos_2_message_id() throws Exception {

final PUBCOMP pubcomp = new PUBCOMP(pool.takeNextId());
final PUBCOMP pubcomp = new PUBCOMP(freePacketIdRanges.takeNextId());
channel.writeInbound(pubcomp);

verify(pool).returnId(eq(100));
verify(freePacketIdRanges).returnId(eq(100));
}

@Test
public void test_dont_return_message_id() throws Exception {

final PUBREL pubrel = new PUBREL(pool.takeNextId());
final PUBREL pubrel = new PUBREL(freePacketIdRanges.takeNextId());
channel.writeInbound(pubrel);

verify(pool, never()).returnId(anyInt());
verify(freePacketIdRanges, never()).returnId(anyInt());
}

@Test
Expand All @@ -139,7 +139,7 @@ public void test_dont_return_invalid_message_id() {
final PUBACK puback = new PUBACK(-1);
channel.writeInbound(puback);

verify(pool, never()).returnId(anyInt());
verify(freePacketIdRanges, never()).returnId(anyInt());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
@SuppressWarnings("unchecked")
public class PublishPollServiceImplTest {

private @NotNull FreePacketIdRanges messageIDPool;
private @NotNull FreePacketIdRanges freePacketIdRanges;
private @NotNull ClientQueuePersistence clientQueuePersistence;
private @NotNull ConnectionPersistence connectionPersistence;
private @NotNull Channel channel;
Expand All @@ -87,7 +87,7 @@ public class PublishPollServiceImplTest {

@Before
public void setUp() throws Exception {
messageIDPool = mock(FreePacketIdRanges.class);
freePacketIdRanges = mock(FreePacketIdRanges.class);
clientQueuePersistence = mock(ClientQueuePersistence.class);
connectionPersistence = mock(ConnectionPersistence.class);
channel = mock(Channel.class);
Expand All @@ -102,7 +102,7 @@ public void setUp() throws Exception {

clientConnection = spy(new DummyClientConnection(channel, publishFlushHandler));
clientConnection.proposeClientState(ClientState.AUTHENTICATED);
when(clientConnection.getMessageIDPool()).thenReturn(messageIDPool);
when(clientConnection.getFreePacketIdRanges()).thenReturn(freePacketIdRanges);

when(connectionPersistence.get(anyString())).thenReturn(clientConnection);

Expand Down Expand Up @@ -133,7 +133,7 @@ public void tearDown() throws Exception {
@Test
public void test_new_messages() throws NoMessageIdAvailableException {

when(messageIDPool.takeNextId()).thenReturn(1);
when(freePacketIdRanges.takeNextId()).thenReturn(1);
when(clientQueuePersistence.readNew(eq("client"),
eq(false),
any(ImmutableIntArray.class),
Expand All @@ -143,7 +143,7 @@ public void test_new_messages() throws NoMessageIdAvailableException {

publishPollService.pollNewMessages("client");

verify(messageIDPool, times(48)).returnId(anyInt());
verify(freePacketIdRanges, times(48)).returnId(anyInt());
verify(publishFlushHandler, times(1)).sendPublishes(any(List.class));
}

Expand All @@ -155,7 +155,7 @@ public void test_new_messages_inflight_batch_size() throws NoMessageIdAvailableE

clientConnection.setClientReceiveMaximum(10);

when(messageIDPool.takeNextId()).thenReturn(1);
when(freePacketIdRanges.takeNextId()).thenReturn(1);
when(clientQueuePersistence.readNew(eq("client"),
eq(false),
any(ImmutableIntArray.class),
Expand All @@ -167,15 +167,15 @@ public void test_new_messages_inflight_batch_size() throws NoMessageIdAvailableE

publishPollService.pollNewMessages("client");

verify(messageIDPool,
verify(freePacketIdRanges,
times(9)).returnId(anyInt()); // 10 messages are polled because the client receive max is 10
verify(publishFlushHandler, times(1)).sendPublishes(any(List.class));
}

@Test
public void test_new_messages_channel_inactive() throws NoMessageIdAvailableException {

when(messageIDPool.takeNextId()).thenReturn(1);
when(freePacketIdRanges.takeNextId()).thenReturn(1);
when(clientQueuePersistence.readNew(eq("client"),
eq(false),
any(ImmutableIntArray.class),
Expand All @@ -189,7 +189,7 @@ public void test_new_messages_channel_inactive() throws NoMessageIdAvailableExce

verify(publishFlushHandler, times(1)).sendPublishes(argumentCaptor.capture());
argumentCaptor.getValue().get(0).getFuture().set(PublishStatus.NOT_CONNECTED);
verify(messageIDPool, times(50)).returnId(anyInt()); // The id must be returned
verify(freePacketIdRanges, times(50)).returnId(anyInt()); // The id must be returned
}

@Test
Expand All @@ -203,7 +203,7 @@ public void test_inflight_messages() {

publishPollService.pollInflightMessages("client", channel);

verify(messageIDPool, times(2)).takeIfAvailable(anyInt());
verify(freePacketIdRanges, times(2)).takeIfAvailable(anyInt());
verify(publishFlushHandler, times(1)).sendPublishes(any(List.class));
verify(channel).writeAndFlush(any(PubrelWithFuture.class));
}
Expand All @@ -218,9 +218,9 @@ public void test_inflight_messages_packet_id_not_available() {

publishPollService.pollInflightMessages("client", channel);

verify(messageIDPool, times(1)).takeIfAvailable(anyInt());
verify(freePacketIdRanges, times(1)).takeIfAvailable(anyInt());
verify(publishFlushHandler, times(1)).sendPublishes(any(List.class));
verify(messageIDPool).returnId(2);
verify(freePacketIdRanges).returnId(2);
}

@Test
Expand All @@ -231,7 +231,7 @@ public void test_inflight_messages_empty() {
ImmutableList.of()));
publishPollService.pollInflightMessages("client", channel);

verify(messageIDPool, never()).takeIfAvailable(anyInt());
verify(freePacketIdRanges, never()).takeIfAvailable(anyInt());
}

@Test
Expand All @@ -252,7 +252,7 @@ public void test_poll_shared_publishes() throws NoMessageIdAvailableException {
createPublish(),
TestMessageUtil.createMqtt3Publish(QoS.AT_MOST_ONCE))));

when(messageIDPool.takeNextId()).thenReturn(2).thenReturn(3);
when(freePacketIdRanges.takeNextId()).thenReturn(2).thenReturn(3);
when(channel.isActive()).thenReturn(true);
final AtomicInteger inFlightCount = new AtomicInteger(0);
clientConnection.setInFlightMessageCount(inFlightCount);
Expand All @@ -264,7 +264,7 @@ public void test_poll_shared_publishes() throws NoMessageIdAvailableException {

final ArgumentCaptor<List<PublishWithFuture>> captor = ArgumentCaptor.forClass(List.class);
verify(publishFlushHandler, times(1)).sendPublishes(captor.capture());
verify(messageIDPool, times(2)).takeNextId();
verify(freePacketIdRanges, times(2)).takeNextId();

final List<PublishWithFuture> values = captor.getValue();
assertEquals(2, values.get(0).getPacketIdentifier());
Expand All @@ -287,7 +287,7 @@ public void test_poll_shared_publishes_messages_in_flight() throws NoMessageIdAv
1)));
when(connectionPersistence.get("client1")).thenReturn(clientConnection);

when(messageIDPool.takeNextId()).thenReturn(2).thenReturn(3);
when(freePacketIdRanges.takeNextId()).thenReturn(2).thenReturn(3);
when(channel.isActive()).thenReturn(true);
clientConnection.setInFlightMessageCount(new AtomicInteger(1));
clientConnection.setInFlightMessagesSent(true);
Expand All @@ -308,7 +308,7 @@ public void test_poll_shared_publishes_messages_qos0_in_flight() throws NoMessag
1)));
when(connectionPersistence.get("client1")).thenReturn(clientConnection);

when(messageIDPool.takeNextId()).thenReturn(2).thenReturn(3);
when(freePacketIdRanges.takeNextId()).thenReturn(2).thenReturn(3);
when(channel.isActive()).thenReturn(true);

when(pipeline.get(PublishFlowHandler.class)).thenReturn(pubflishFlowHandler);
Expand All @@ -334,7 +334,7 @@ public void test_remove_shared_qos0_downgrade() throws NoMessageIdAvailableExcep
anyInt(),
anyLong())).thenReturn(Futures.immediateFuture(ImmutableList.of(publish)));

when(messageIDPool.takeNextId()).thenReturn(1);
when(freePacketIdRanges.takeNextId()).thenReturn(1);

publishPollService.pollSharedPublishesForClient("client", "group/topic", 0, false, null, channel);

Expand Down

0 comments on commit cef11f8

Please sign in to comment.