Skip to content

Commit

Permalink
fix(mqtt): forward messages to subs when message comes from wrong cli…
Browse files Browse the repository at this point in the history
…ent (#1334)
  • Loading branch information
MikeDombo committed Oct 13, 2022
1 parent 47690aa commit cc703b0
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 9 deletions.
40 changes: 33 additions & 7 deletions src/main/java/com/aws/greengrass/mqttclient/MqttClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.inject.Inject;
Expand Down Expand Up @@ -740,14 +741,39 @@ Consumer<MqttMessage> getMessageHandlerForClient(AwsIotMqttClient client) {
// multiple clients such as A/B and A/#. Without this, an update to A/B would
// trigger twice if those 2 subscriptions were in different clients because
// both will receive the message from the cloud and call this handler.
Set<SubscribeRequest> subs = subscriptions.entrySet().stream()
.filter(s -> s.getValue() == client && MqttTopic
.topicIsSupersetOf(s.getKey().getTopic(), message.getTopic())).map(Map.Entry::getKey)
Predicate<Map.Entry<SubscribeRequest, AwsIotMqttClient>> subscriptionsMatchingTopic =
s -> MqttTopic.topicIsSupersetOf(s.getKey().getTopic(), message.getTopic());

Set<SubscribeRequest> exactlyMatchingSubs = subscriptions.entrySet().stream()
.filter(s -> s.getValue() == client)
.filter(subscriptionsMatchingTopic)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
if (subs.isEmpty()) {
logger.atError().kv(TOPIC_KEY, message.getTopic()).kv(CLIENT_ID_KEY, client.getClientId())
.log("Somehow got message from topic that no one subscribed to");
return;
Set<SubscribeRequest> subs = exactlyMatchingSubs;
if (exactlyMatchingSubs.isEmpty()) {
// We found no exact matches which means that we received a message on the wrong client, or
// we had no subscribers at all for the topic. We will now check if there is some subscriber
// which was in a different client. This can happen for IoT Jobs because they send the update/accepted
// message back to the same client which sent the update request, and not to the client that has
// subscribed to the update/accepted topic.

subs = subscriptions.entrySet().stream()
.filter(subscriptionsMatchingTopic)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());

if (subs.isEmpty()) {
// We found no subscribers at all, so we'll log out an error and exit.
logger.atError().kv(TOPIC_KEY, message.getTopic()).kv(CLIENT_ID_KEY, client.getClientId())
.log("Somehow got message from topic that no one subscribed to");
return;
} else {
// We did find at least one subscriber matching the topic, but it didn't match the client
// that we subscribed on. This is weird, but it can be expected for IoT Jobs as explained above.
logger.atWarn().kv(TOPIC_KEY, message.getTopic()).kv(CLIENT_ID_KEY, client.getClientId())
.log("Got a message from a topic on a different client than what we subscribed with."
+ " This is odd, but it isn't a problem");
}
}
subs.forEach((h) -> {
try {
Expand Down
44 changes: 42 additions & 2 deletions src/test/java/com/aws/greengrass/mqttclient/MqttClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,46 @@ void GIVEN_incoming_message_WHEN_received_THEN_subscribers_are_called()
abc.getLeft().get(0, TimeUnit.SECONDS);
}

@Test
void GIVEN_incoming_message_on_wrong_client_WHEN_received_THEN_subscribers_are_still_called()
throws ExecutionException, InterruptedException, TimeoutException {
MqttClient client = spy(new MqttClient(deviceConfiguration, (c) -> builder, ses, executorService));
AwsIotMqttClient mockClient1 = mock(AwsIotMqttClient.class);
AwsIotMqttClient mockClient2 = mock(AwsIotMqttClient.class);
when(mockClient1.subscribe(any(), any())).thenReturn(CompletableFuture.completedFuture(0));
// All subscriptions will go through mockClient1, but we're going to send the messages via mockClient2
when(client.getNewMqttClient()).thenReturn(mockClient1);
assertFalse(client.connected());

Pair<CompletableFuture<Void>, Consumer<MqttMessage>> abPlus = asyncAssertOnConsumer((m) -> {
assertThat(m.getTopic(), either(is("A/B/C")).or(is("A/B/D")));
}, 2);
client.subscribe(SubscribeRequest.builder().topic("A/B/+").callback(abPlus.getRight()).build());
Pair<CompletableFuture<Void>, Consumer<MqttMessage>> abc = asyncAssertOnConsumer((m) -> {
assertEquals("A/B/C", m.getTopic());
}, 2);
client.subscribe(SubscribeRequest.builder().topic("A/B/C").callback(abc.getRight()).build());
Pair<CompletableFuture<Void>, Consumer<MqttMessage>> abd = asyncAssertOnConsumer((m) -> {
assertEquals("A/B/D", m.getTopic());
});
client.subscribe(SubscribeRequest.builder().topic("A/B/D").callback(abd.getRight()).build());

Consumer<MqttMessage> handlerForClient2 = client.getMessageHandlerForClient(mockClient2);

handlerForClient2.accept(new MqttMessage("A/B/C", new byte[0]));
handlerForClient2.accept(new MqttMessage("A/B/D", new byte[0]));
handlerForClient2.accept(new MqttMessage("A/X/Y", new byte[0])); // No subscribers for this one

abPlus.getLeft().get(0, TimeUnit.SECONDS);
abd.getLeft().get(0, TimeUnit.SECONDS);

// Ensure, that even after removing the wildcard subscription, the other topics still get
// messages
client.unsubscribe(UnsubscribeRequest.builder().topic("A/B/+").callback(abPlus.getRight()).build());
handlerForClient2.accept(new MqttMessage("A/B/C", new byte[0]));
abc.getLeft().get(0, TimeUnit.SECONDS);
}

@Test
void GIVEN_3_connections_with_2_able_accept_new_WHEN_subscribe_THEN_closes_connection_with_no_subscribers()
throws ExecutionException, InterruptedException, TimeoutException {
Expand Down Expand Up @@ -404,11 +444,11 @@ void GIVEN_incoming_messages_to_2clients_WHEN_received_THEN_subscribers_are_call
client.subscribe(SubscribeRequest.builder().topic("A/B/C").callback(abc.getRight()).build());
Pair<CompletableFuture<Void>, Consumer<MqttMessage>> abd = asyncAssertOnConsumer((m) -> {
assertEquals("A/B/D", m.getTopic());
}, 1);
}, 2);
client.subscribe(SubscribeRequest.builder().topic("A/B/D").callback(abd.getRight()).build());
Pair<CompletableFuture<Void>, Consumer<MqttMessage>> abPlus = asyncAssertOnConsumer((m) -> {
assertThat(m.getTopic(), either(is("A/B/C")).or(is("A/B/D")).or(is("A/B/F")));
}, 3);
}, 5);
client.subscribe(SubscribeRequest.builder().topic("A/B/+").callback(abPlus.getRight()).build());

Consumer<MqttMessage> handler1 = client.getMessageHandlerForClient(mockIndividual1);
Expand Down

0 comments on commit cc703b0

Please sign in to comment.