Skip to content

Commit 60ed412

Browse files
removed CONNECT message from client connection
1 parent 8f8f6f7 commit 60ed412

File tree

6 files changed

+32
-38
lines changed

6 files changed

+32
-38
lines changed

src/main/java/com/hivemq/bootstrap/ClientConnection.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.hivemq.mqtt.handler.publish.PublishFlushHandler;
3232
import com.hivemq.mqtt.message.ProtocolVersion;
3333
import com.hivemq.mqtt.message.connect.CONNECT;
34+
import com.hivemq.mqtt.message.connect.MqttWillPublish;
3435
import com.hivemq.mqtt.message.mqtt5.Mqtt5UserProperties;
3536
import com.hivemq.mqtt.message.pool.FreePacketIdRanges;
3637
import com.hivemq.security.auth.SslClientCertificate;
@@ -60,7 +61,7 @@ public class ClientConnection implements ClientConnectionContext {
6061
private @NotNull String clientId;
6162
private boolean cleanStart;
6263
private @Nullable ModifiableDefaultPermissions authPermissions;
63-
private @Nullable CONNECT connectMessage;
64+
private @Nullable MqttWillPublish willPublish;
6465
private @Nullable AtomicInteger inFlightMessageCount;
6566
private @Nullable Integer clientReceiveMaximum;
6667
private @Nullable Integer connectKeepAlive;
@@ -131,7 +132,7 @@ public class ClientConnection implements ClientConnectionContext {
131132
context.cleanStart,
132133
context.authPermissions,
133134
context.connectedListener,
134-
context.connectMessage,
135+
context.willPublish,
135136
context.clientReceiveMaximum,
136137
context.connectKeepAlive,
137138
context.queueSizeMaximum,
@@ -178,7 +179,7 @@ public ClientConnection(
178179
final boolean cleanStart,
179180
final @Nullable ModifiableDefaultPermissions authPermissions,
180181
final @NotNull Listener connectedListener,
181-
final @Nullable CONNECT connectMessage,
182+
final @Nullable MqttWillPublish mqttWillPublish,
182183
final @Nullable Integer clientReceiveMaximum,
183184
final @Nullable Integer connectKeepAlive,
184185
final @Nullable Long queueSizeMaximum,
@@ -219,7 +220,7 @@ public ClientConnection(
219220
this.cleanStart = cleanStart;
220221
this.authPermissions = authPermissions;
221222
this.connectedListener = connectedListener;
222-
this.connectMessage = connectMessage;
223+
this.willPublish = mqttWillPublish;
223224
this.clientReceiveMaximum = clientReceiveMaximum;
224225
this.connectKeepAlive = connectKeepAlive;
225226
this.queueSizeMaximum = queueSizeMaximum;
@@ -327,13 +328,13 @@ public void setAuthPermissions(final @NotNull ModifiableDefaultPermissions authP
327328
return connectedListener;
328329
}
329330

330-
public @Nullable CONNECT getConnectMessage() {
331-
return connectMessage;
331+
public @Nullable MqttWillPublish getWillPublish() {
332+
return willPublish;
332333
}
333334

334335
@Override
335-
public void setConnectMessage(final @Nullable CONNECT connectMessage) {
336-
this.connectMessage = connectMessage;
336+
public void setWillPublish(final @Nullable MqttWillPublish willPublish) {
337+
this.willPublish = willPublish;
337338
}
338339

339340
/**

src/main/java/com/hivemq/bootstrap/ClientConnectionContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.hivemq.extensions.events.client.parameters.ClientEventListeners;
3030
import com.hivemq.mqtt.message.ProtocolVersion;
3131
import com.hivemq.mqtt.message.connect.CONNECT;
32+
import com.hivemq.mqtt.message.connect.MqttWillPublish;
3233
import com.hivemq.mqtt.message.mqtt5.Mqtt5UserProperties;
3334
import com.hivemq.security.auth.SslClientCertificate;
3435
import io.netty.channel.Channel;
@@ -136,7 +137,7 @@ public interface ClientConnectionContext {
136137

137138
void setRequestProblemInformation(boolean problemInformationRequested);
138139

139-
void setConnectMessage(@Nullable CONNECT msg);
140+
void setWillPublish(@Nullable MqttWillPublish willPublish);
140141

141142
@NotNull String @Nullable [] getTopicAliasMapping();
142143

src/main/java/com/hivemq/bootstrap/UndefinedClientConnection.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.hivemq.mqtt.handler.publish.PublishFlushHandler;
3131
import com.hivemq.mqtt.message.ProtocolVersion;
3232
import com.hivemq.mqtt.message.connect.CONNECT;
33+
import com.hivemq.mqtt.message.connect.MqttWillPublish;
3334
import com.hivemq.mqtt.message.mqtt5.Mqtt5UserProperties;
3435
import com.hivemq.security.auth.SslClientCertificate;
3536
import io.netty.channel.Channel;
@@ -53,7 +54,7 @@ public class UndefinedClientConnection implements ClientConnectionContext {
5354
@Nullable String clientId;
5455
boolean cleanStart;
5556
@Nullable ModifiableDefaultPermissions authPermissions;
56-
@Nullable CONNECT connectMessage;
57+
@Nullable MqttWillPublish willPublish;
5758
@Nullable Integer clientReceiveMaximum;
5859
@Nullable Integer connectKeepAlive;
5960
@Nullable Long queueSizeMaximum;
@@ -165,8 +166,8 @@ public void setAuthPermissions(final @NotNull ModifiableDefaultPermissions authP
165166
}
166167

167168
@Override
168-
public void setConnectMessage(final @Nullable CONNECT connectMessage) {
169-
this.connectMessage = connectMessage;
169+
public void setWillPublish(final @Nullable MqttWillPublish willPublish) {
170+
this.willPublish = willPublish;
170171
}
171172

172173
@Override

src/main/java/com/hivemq/extensions/handler/PluginInitializerHandler.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import com.hivemq.mqtt.handler.connack.MqttConnacker;
3838
import com.hivemq.mqtt.handler.publish.DefaultPermissionsEvaluator;
3939
import com.hivemq.mqtt.message.connack.CONNACK;
40-
import com.hivemq.mqtt.message.connect.CONNECT;
4140
import com.hivemq.mqtt.message.connect.MqttWillPublish;
4241
import com.hivemq.mqtt.message.mqtt5.Mqtt5UserProperties;
4342
import com.hivemq.mqtt.message.reason.Mqtt5ConnAckReasonCode;
@@ -124,9 +123,9 @@ private void fireInitialize(
124123
if (pluginInitializerMap.isEmpty() && msg != null) {
125124
clientConnection.setPreventLwt(false);
126125
ctx.writeAndFlush(msg, promise);
127-
// Prevent leaking the retained CONNECT message for any existing ClientConnection.
128-
// The CONNECT message would otherwise be owned by the plugin initialization below outside this scope.
129-
clientConnection.setConnectMessage(null);
126+
// Prevent leaking the retained WILL message for any existing ClientConnection.
127+
// The WILL message would otherwise be owned by the plugin initialization below outside this scope.
128+
clientConnection.setWillPublish(null);
130129
return;
131130
}
132131

@@ -175,14 +174,14 @@ private void fireInitialize(
175174
@Override
176175
public void onSuccess(@Nullable final Void result) {
177176
authenticateWill(ctx, msg, promise);
178-
clientConnection.setConnectMessage(null);
177+
clientConnection.setWillPublish(null);
179178
}
180179

181180
@Override
182181
public void onFailure(final @NotNull Throwable t) {
183182
Exceptions.rethrowError(t);
184183
log.error("Calling initializer failed", t);
185-
clientConnection.setConnectMessage(null);
184+
clientConnection.setWillPublish(null);
186185
ctx.writeAndFlush(msg, promise);
187186
}
188187
}, ctx.executor());
@@ -195,13 +194,12 @@ private void authenticateWill(
195194

196195
final ClientConnection clientConnection = ClientConnection.of(ctx.channel());
197196

198-
final CONNECT connect = clientConnection.getConnectMessage();
199-
if (connect == null || connect.getWillPublish() == null) {
197+
final MqttWillPublish willPublish = clientConnection.getWillPublish();
198+
if (willPublish == null) {
200199
ctx.writeAndFlush(msg, promise);
201200
return;
202201
}
203202

204-
final MqttWillPublish willPublish = connect.getWillPublish();
205203
final ModifiableDefaultPermissions permissions = clientConnection.getAuthPermissions();
206204
if (DefaultPermissionsEvaluator.checkWillPublish(permissions, willPublish)) {
207205
clientConnection.setPreventLwt(false); //clear prevent flag, Will is authorized
@@ -213,7 +211,7 @@ private void authenticateWill(
213211
clientConnection.setPreventLwt(true);
214212
//We have already added the will to the session, so we need to remove it again
215213
final ListenableFuture<Void> removeWillFuture =
216-
clientSessionPersistence.deleteWill(connect.getClientIdentifier());
214+
clientSessionPersistence.deleteWill(clientConnection.getClientId());
217215
Futures.addCallback(removeWillFuture, new FutureCallback<>() {
218216
@Override
219217
public void onSuccess(@Nullable final Void result) {

src/main/java/com/hivemq/mqtt/handler/connect/ConnectHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -606,8 +606,8 @@ private void sendConnackSuccess(
606606

607607
final ChannelFuture connackSent;
608608

609-
// We retain the CONNECT message in memory during the initialization progress, e.g. for plugin initialization.
610-
clientConnection.setConnectMessage(msg);
609+
// We retain the WILL message in memory during the initialization progress, e.g. for plugin initialization.
610+
clientConnection.setWillPublish(msg.getWillPublish());
611611

612612
if (msg.getProtocolVersion() == ProtocolVersion.MQTTv5) {
613613
final CONNACK connack = buildMqtt5Connack(clientConnection, msg, sessionPresent);

src/test/java/com/hivemq/extensions/handler/PluginInitializerHandlerTest.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import com.hivemq.mqtt.message.ProtocolVersion;
4242
import com.hivemq.mqtt.message.QoS;
4343
import com.hivemq.mqtt.message.connack.CONNACK;
44-
import com.hivemq.mqtt.message.connect.CONNECT;
4544
import com.hivemq.mqtt.message.connect.MqttWillPublish;
4645
import com.hivemq.mqtt.message.mqtt5.Mqtt5UserProperties;
4746
import com.hivemq.mqtt.message.reason.Mqtt5ConnAckReasonCode;
@@ -112,7 +111,7 @@ public void setUp() throws Exception {
112111

113112
channel = new EmbeddedChannel();
114113
clientConnection = new DummyClientConnection(channel, publishFlushHandler);
115-
clientConnection.setConnectMessage(mock(CONNECT.class));
114+
clientConnection.setWillPublish(mock(MqttWillPublish.class));
116115
clientConnection.setClientId("test_client");
117116
clientConnection.setProtocolVersion(ProtocolVersion.MQTTv5);
118117

@@ -163,7 +162,7 @@ public void test_write_connack_no_initializer() throws Exception {
163162
verify(channelHandlerContext).writeAndFlush(any(Object.class), eq(channelPromise));
164163

165164
assertFalse(ClientConnection.of(channel).isPreventLwt());
166-
assertNull(clientConnection.getConnectMessage());
165+
assertNull(clientConnection.getWillPublish());
167166
}
168167

169168
@Test(timeout = 10000)
@@ -193,7 +192,7 @@ public void test_write_connack_fire_initialize() throws Exception {
193192
verify(initializers, timeout(5000).times(1)).getClientInitializerMap();
194193
verify(channelHandlerContext, timeout(5000)).writeAndFlush(any(Object.class), eq(channelPromise));
195194
verify(channelPipeline).remove(any(ChannelHandler.class));
196-
assertNull(clientConnection.getConnectMessage());
195+
assertNull(clientConnection.getWillPublish());
197196
}
198197

199198
@Test(timeout = 10000)
@@ -215,10 +214,7 @@ public void test_write_will_publish_not_authorized() throws Exception {
215214
.withPayload(new byte[]{1, 2, 3})
216215
.build();
217216

218-
final CONNECT connect =
219-
new CONNECT.Mqtt5Builder().withClientIdentifier("test-client").withWillPublish(willPublish).build();
220-
221-
ClientConnection.of(channel).setConnectMessage(connect);
217+
ClientConnection.of(channel).setWillPublish(willPublish);
222218

223219
final ModifiableDefaultPermissionsImpl permissions = new ModifiableDefaultPermissionsImpl();
224220
permissions.add(new TopicPermissionBuilderImpl(new TestConfigurationBootstrap().getFullConfigurationService()).topicFilter(
@@ -238,7 +234,7 @@ public void test_write_will_publish_not_authorized() throws Exception {
238234

239235
verify(channelPipeline).remove(any(ChannelHandler.class));
240236
assertTrue(ClientConnection.of(channel).isPreventLwt());
241-
assertNull(clientConnection.getConnectMessage());
237+
assertNull(clientConnection.getWillPublish());
242238
}
243239

244240
@Test(timeout = 10000)
@@ -251,10 +247,7 @@ public void test_write_will_publish_authorized() throws Exception {
251247
.withPayload(new byte[]{1, 2, 3})
252248
.build();
253249

254-
final CONNECT connect =
255-
new CONNECT.Mqtt5Builder().withClientIdentifier("test-client").withWillPublish(willPublish).build();
256-
257-
ClientConnection.of(channel).setConnectMessage(connect);
250+
ClientConnection.of(channel).setWillPublish(willPublish);
258251

259252
final ModifiableDefaultPermissionsImpl permissions = new ModifiableDefaultPermissionsImpl();
260253
permissions.add(new TopicPermissionBuilderImpl(new TestConfigurationBootstrap().getFullConfigurationService()).topicFilter(
@@ -271,7 +264,7 @@ public void test_write_will_publish_authorized() throws Exception {
271264

272265
verify(channelPipeline).remove(any(ChannelHandler.class));
273266
assertFalse(ClientConnection.of(channel).isPreventLwt());
274-
assertNull(clientConnection.getConnectMessage());
267+
assertNull(clientConnection.getWillPublish());
275268
}
276269

277270
private Map<String, ClientInitializer> createClientInitializerMap() throws Exception {

0 commit comments

Comments
 (0)