Skip to content

Commit b19e38e

Browse files
authored
fix: don't use overlapping session ids in MqttFlowTest (#732)
While looking into #468, I noticed the two failing tests were sharing the same session id, which reminded of #456. While in this case the two tests aren't sharing the same session, and I haven't investigated the details of this codebase further, I'm curious to see if the problem remains when we use unique session ids.
1 parent e091106 commit b19e38e

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public void establishClientBidirectionalConnectionAndSubscribeToATopic()
102102
throws InterruptedException, ExecutionException, TimeoutException {
103103
String clientId = "source-spec/flow";
104104
String topic = "source-spec/topic1";
105+
ByteString uniqueSessionId = ByteString.fromString("establishClientBidirectionalConnectionAndSubscribeToATopic-session");
105106

106107
// #create-streaming-flow
107108
MqttSessionSettings settings = MqttSessionSettings.create();
@@ -111,7 +112,7 @@ public void establishClientBidirectionalConnectionAndSubscribeToATopic()
111112
Tcp.get(system).outgoingConnection("localhost", 1883);
112113

113114
Flow<Command<Object>, DecodeErrorOrEvent<Object>, NotUsed> mqttFlow =
114-
Mqtt.clientSessionFlow(session, ByteString.fromString("1")).join(connection);
115+
Mqtt.clientSessionFlow(session, uniqueSessionId).join(connection);
115116
// #create-streaming-flow
116117

117118
// #run-streaming-flow
@@ -159,6 +160,7 @@ public void establishServerBidirectionalConnectionAndSubscribeToATopic()
159160
throws InterruptedException, ExecutionException, TimeoutException {
160161
String clientId = "flow-spec/flow";
161162
String topic = "source-spec/topic1";
163+
ByteString uniqueSessionId = ByteString.fromString("establishServerBidirectionalConnectionAndSubscribeToATopic-connection");
162164
String host = "localhost";
163165
int port = 9884;
164166

@@ -251,7 +253,7 @@ public void establishServerBidirectionalConnectionAndSubscribeToATopic()
251253
MqttClientSession clientSession = new ActorMqttClientSession(settings, system);
252254

253255
Flow<Command<Object>, DecodeErrorOrEvent<Object>, NotUsed> mqttFlow =
254-
Mqtt.clientSessionFlow(clientSession, ByteString.fromString("1")).join(connection);
256+
Mqtt.clientSessionFlow(clientSession, uniqueSessionId).join(connection);
255257

256258
Pair<SourceQueueWithComplete<Command<Object>>, CompletionStage<Publish>> run =
257259
Source.<Command<Object>>queue(3, OverflowStrategy.fail())

0 commit comments

Comments
 (0)