Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ranged IDs implementation for packet IDs #426

Merged
merged 6 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/main/java/com/hivemq/bootstrap/ClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
import com.hivemq.mqtt.message.ProtocolVersion;
import com.hivemq.mqtt.message.connect.CONNECT;
import com.hivemq.mqtt.message.mqtt5.Mqtt5UserProperties;
import com.hivemq.mqtt.message.pool.MessageIDPool;
import com.hivemq.mqtt.message.pool.SequentialMessageIDPoolImpl;
import com.hivemq.mqtt.message.pool.FreePacketIdRanges;
import com.hivemq.security.auth.SslClientCertificate;
import io.netty.channel.Channel;

Expand All @@ -52,7 +51,7 @@ public class ClientConnection implements ClientConnectionContext {

private final @NotNull Channel channel;
private final @NotNull PublishFlushHandler publishFlushHandler;
private final @NotNull MessageIDPool messageIDPool = new SequentialMessageIDPoolImpl();
private final @NotNull FreePacketIdRanges freePacketIdRanges = new FreePacketIdRanges();
private final @NotNull Listener connectedListener;
private volatile @NotNull ClientState clientState;

Expand Down Expand Up @@ -375,8 +374,8 @@ public void setQueueSizeMaximum(final @Nullable Long queueSizeMaximum) {
this.queueSizeMaximum = queueSizeMaximum;
}

public @NotNull MessageIDPool getMessageIDPool() {
return messageIDPool;
public @NotNull FreePacketIdRanges getFreePacketIdRanges() {
return freePacketIdRanges;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.hivemq.bootstrap.ClientConnection;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.mqtt.handler.publish.PublishStatus;
import com.hivemq.mqtt.message.pool.MessageIDPool;
import com.hivemq.mqtt.message.pool.FreePacketIdRanges;
import com.hivemq.mqtt.message.publish.PUBLISH;
import com.hivemq.mqtt.services.PublishPollService;
import com.hivemq.persistence.util.FutureUtils;
Expand All @@ -38,7 +38,7 @@ public class PublishStatusFutureCallback implements FutureCallback<PublishStatus
private final boolean sharedSubscription;
private final @NotNull String queueId;
private final @NotNull PUBLISH publish;
private final @NotNull MessageIDPool messageIDPool;
private final @NotNull FreePacketIdRanges messageIDPool;
private final int packetIdentifier;
private final @NotNull Channel channel;
private final @NotNull String client;
Expand All @@ -48,7 +48,7 @@ public PublishStatusFutureCallback(
final boolean sharedSubscription,
final @NotNull String queueId,
final @NotNull PUBLISH publish,
final @NotNull MessageIDPool messageIDPool,
final @NotNull FreePacketIdRanges messageIDPool,
final @NotNull Channel channel,
final @NotNull String client) {
this.publishPollService = publishPollService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.hivemq.mqtt.event.PubrelDroppedEvent;
import com.hivemq.mqtt.message.MessageWithID;
import com.hivemq.mqtt.message.QoS;
import com.hivemq.mqtt.message.pool.MessageIDPool;
import com.hivemq.mqtt.message.pool.FreePacketIdRanges;
import com.hivemq.mqtt.message.puback.PUBACK;
import com.hivemq.mqtt.message.pubcomp.PUBCOMP;
import com.hivemq.mqtt.message.publish.PUBLISH;
Expand Down Expand Up @@ -316,8 +316,8 @@ private void returnMessageId(

//Such a message ID must never be zero, but better be safe than sorry
if (messageId > 0) {
final MessageIDPool messageIDPool = ClientConnection.of(channel).getMessageIDPool();
messageIDPool.returnId(messageId);
final FreePacketIdRanges freePacketIdRanges = ClientConnection.of(channel).getFreePacketIdRanges();
freePacketIdRanges.returnId(messageId);
if (log.isTraceEnabled()) {
log.trace("Returning Message ID {} for client {} because of a {} message was received",
messageId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.hivemq.mqtt.handler.publish.PublishStatus;
import com.hivemq.mqtt.handler.publish.PublishWriteFailedListener;
import com.hivemq.mqtt.message.QoS;
import com.hivemq.mqtt.message.pool.MessageIDPool;
import com.hivemq.mqtt.message.pool.FreePacketIdRanges;
import com.hivemq.mqtt.message.publish.PUBLISH;
import com.hivemq.mqtt.message.publish.PUBLISHFactory;
import com.hivemq.mqtt.message.publish.PublishWithFuture;
Expand Down Expand Up @@ -243,8 +243,8 @@ public void onSuccess(final @Nullable PublishStatus status) {
}

if (qos0Publish.getPacketIdentifier() != 0) {
final MessageIDPool messageIDPool = ClientConnection.of(channel).getMessageIDPool();
messageIDPool.returnId(qos0Publish.getPacketIdentifier());
final FreePacketIdRanges freePacketIdRanges = ClientConnection.of(channel).getFreePacketIdRanges();
freePacketIdRanges.returnId(qos0Publish.getPacketIdentifier());
}
}

Expand Down
185 changes: 185 additions & 0 deletions src/main/java/com/hivemq/mqtt/message/pool/FreePacketIdRanges.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright 2019-present HiveMQ GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hivemq.mqtt.message.pool;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.mqtt.message.pool.exception.NoMessageIdAvailableException;

/**
* The purpose of this class is to reduce packet IDs allocation time for each independent client
* and to reduce the memory footprint of keeping track of allocated IDs.
* <p>
* This is achieved by keeping a list of {@link Range} objects each of which represents a contiguous interval of
* integer ids that are NOT currently assigned to any object using this instance of {@link FreePacketIdRanges}.
* The lower end of the {@link Range} instance is included whereas the upper end is excluded from the interval.
* That is, the {@link Range} instance with start 42 and end 42 contains only one ID - 42.
* The {@link Range} instance with start 10 and end 12 contains only two IDs: 10 and 11.
* <p>
* Initially, there is only one contiguous {@link Range} of IDs. The IDs are assigned starting from the lowest one in
* that interval.
* Upon assignment, the interval's lower end is incremented (interval size reduces from below).
* When the ID is returned, it either joins one of the existing {@link Range} intervals in the list (if it is adjacent)
* or it forms a new {@link Range} that is added to the list.
* <p>
* This class is NOT thread-safe.
* <a
* href="https://github.com/hivemq/hivemq-mqtt-client/blob/master/src/main/java/com/hivemq/client/internal/util/Ranges.java">The
* original implementation in the HiveMQ Java Client.</a>
*/
public class FreePacketIdRanges {

private static final int MIN_ALLOWED_MQTT_PACKET_ID = 1;

@VisibleForTesting
public static final int MAX_ALLOWED_MQTT_PACKET_ID = 65_535;

private @NotNull Range rootRange;

public FreePacketIdRanges() {
rootRange = new Range(MIN_ALLOWED_MQTT_PACKET_ID, MAX_ALLOWED_MQTT_PACKET_ID + 1);
}

/**
* Provides a new ID that is not currently allocated.
*
* @return a new ID if available in any of the ranges or {@link NoMessageIdAvailableException} if ran out of IDs.
*/
public int takeNextId() throws NoMessageIdAvailableException {
if (rootRange.start == rootRange.end) {
throw new NoMessageIdAvailableException();
}

final int id = rootRange.start;
rootRange.start++;
if ((rootRange.start == rootRange.end) && (rootRange.next != null)) {
rootRange = rootRange.next;
}
return id;
}

/**
* Takes the requested ID from the range if it is available.
*
* @param id an ID that the caller attempts to take.
*/
public void takeIfAvailable(final int id) {
Preconditions.checkArgument(id >= MIN_ALLOWED_MQTT_PACKET_ID && id <= MAX_ALLOWED_MQTT_PACKET_ID,
"Attempting to take an ID %s that is outside the valid packet IDs range.",
id);

Range current = rootRange;
Range prev = null;

while (current != null) {
if (id < current.start) {
return; // since the ranges are traversed in increasing order of IDs, the given id will not be found
Remit marked this conversation as resolved.
Show resolved Hide resolved
}

if (id < current.end) { // the id is int the current range of free ids

final int prevCurStart = current.start;
current.start = id + 1;
final Range lowerRange = prevCurStart == id ? null : new Range(prevCurStart, id, current);

if (lowerRange != null) {
if (prev != null) {
prev.next = lowerRange;
} else {
rootRange = lowerRange;
}
}

return; // id found and taken
}

// consider next range
prev = current;
current = current.next;
}
}

/**
* Returns the {@param id} into one of the ranges of free IDs.
*
* @param id an ID that the caller attempts to return (to free).
*/
public void returnId(final int id) {
Preconditions.checkArgument(id >= MIN_ALLOWED_MQTT_PACKET_ID && id <= MAX_ALLOWED_MQTT_PACKET_ID,
"Attempting to return an ID %s that is outside the valid packet IDs range.",
id);

Range current = rootRange;
if (id < current.start - 1) { // at least one element is between the returned and the next range
rootRange = new Range(id, id + 1, current);
return;
}
Range prev = current;
current = returnId(current, id);
while (current != null) {
if (id < current.start - 1) {
prev.next = new Range(id, id + 1, current);
return;
}
prev = current;
current = returnId(current, id);
}
}

private @Nullable Range returnId(final @NotNull Range range, final int id) throws IllegalStateException {
if (id == range.start - 1) { // if the returned element is directly adjacent to the range (from below)
range.start = id;
return null;
}

if (id < range.end) { // the returned element is within the range, i.e. it has been freed already
return null;
}

final Range next = range.next;
Preconditions.checkState(next != null, "The id is greater than maxId. This must not happen and is a bug.");
if (id == range.end) {
range.end++;
if (range.end == next.start) {
range.end = next.end;
range.next = next.next;
}
return null;
}
return next;
}

private static class Range {

int start;
int end;
@Nullable Range next;

Range(final int start, final int end) {
this.start = start;
this.end = end;
}

Range(final int start, final int end, final @NotNull Range next) {
this.start = start;
this.end = end;
this.next = next;
}
}
}
77 changes: 0 additions & 77 deletions src/main/java/com/hivemq/mqtt/message/pool/MessageIDPool.java

This file was deleted.

Loading
Loading