Skip to content

Commit

Permalink
Merge branch 'main' into ldc_exp
Browse files Browse the repository at this point in the history
  • Loading branch information
saranyailla authored Dec 6, 2024
2 parents 4b3ae05 + 2671dd1 commit edd199c
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 5 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@
When updating the version here, ensure you match the correct aws-crt version below.
Get the correct version from: https://github.com/aws/aws-iot-device-sdk-java-v2/blob/main/sdk/pom.xml#L45
!-->
<version>1.21.0</version>
<version>1.23.0</version>
<exclusions>
<exclusion>
<groupId>software.amazon.awssdk.crt</groupId>
Expand All @@ -249,7 +249,7 @@
<dependency>
<groupId>software.amazon.awssdk.crt</groupId>
<artifactId>aws-crt</artifactId>
<version>0.30.8</version>
<version>0.33.5</version>
<classifier>fips-where-available</classifier>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

package com.aws.greengrass.builtin.services.pubsub;

import com.aws.greengrass.util.Utils;

import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Trie to manage subscriptions.
Expand Down Expand Up @@ -69,11 +72,43 @@ public boolean remove(String topic, K cb) {
* @return if changed after removal
*/
public boolean remove(String topic, Set<K> cbs) {
SubscriptionTrie<K> sub = lookup(topic);
if (sub == null) {
if (Utils.isEmpty(topic) || Utils.isEmpty(cbs)) {
return false;
}
return sub.subscriptionCallbacks.removeAll(cbs);

AtomicBoolean isSubscriptionRemoved = new AtomicBoolean(false);
removeRecursively(topic.split(TOPIC_LEVEL_SEPARATOR), this, cbs, 0, isSubscriptionRemoved);
return isSubscriptionRemoved.get();
}

private boolean canRemove(SubscriptionTrie<K> node) {
// returns false if the topic level is prefix of another topic or has callbacks registered
return node.children.isEmpty() && node.subscriptionCallbacks.isEmpty();
}

/*
This method removes the requested callback from a topic and prunes the subscription trie recursively by
1. navigating to the last node of the requested topic
1.a. at this node, the method persists the result of the requested callback removal for the penultimate result
1.b. returns true if requested callback was removed and if current topic node can be pruned
2. the previous topic node receives the result from 1.b.
2.a. if true, remove child topic node and return if current node can be pruned
2.b. if false, simply do nothing and return false implying current node cannot be pruned
*/
private boolean removeRecursively(String[] topicNodes, SubscriptionTrie<K> topicNode, Set<K> cbs, int index,
AtomicBoolean subscriptionRemoved) {
if (index == topicNodes.length) {
subscriptionRemoved.set(topicNode.subscriptionCallbacks.removeAll(cbs));
return subscriptionRemoved.get() && canRemove(topicNode);
}

if (removeRecursively(topicNodes, topicNode.children.get(topicNodes[index]), cbs, index + 1,
subscriptionRemoved)) {
topicNode.children.remove(topicNodes[index]);
return canRemove(topicNode);
}
return false;
}

/**
Expand Down Expand Up @@ -183,3 +218,4 @@ public static boolean isWildcard(String topic) {
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,51 @@ public void GIVEN_subscription_wildcard_WHEN_remove_topic_THEN_no_matches() {
assertEquals(0, trie.size());
}

@Test
public void GIVEN_subscriptions_with_wildcards_WHEN_remove_topics_THEN_clean_up_trie() {
assertEquals(0, trie.size());
SubscriptionCallback cb1 = generateSubscriptionCallback();
SubscriptionCallback cb2 = generateSubscriptionCallback();
SubscriptionCallback cb3 = generateSubscriptionCallback();
String topic = "foo/+/bar/#";
trie.add("bar", cb1);
trie.add(topic, cb1);
trie.add(topic, cb2);
// Topic is not registered with the callback, mark it as removed when requested to remove
assertThat("remove topic", trie.remove(topic, cb3), is(false));
assertThat(trie.get(topic), containsInAnyOrder(cb1, cb2));

trie.add("foo/#", cb3);
trie.add("foo/+", cb2);

assertThat(trie.get(topic), containsInAnyOrder(cb1, cb2, cb3));
assertEquals(5, trie.size());

assertThat("remove topic", trie.remove("foo/+", cb2), is(true));
assertEquals(4, trie.size());
assertThat(trie.get("foo/+"), containsInAnyOrder(cb3)); // foo/+ still matches with foo/#
assertThat(trie.get(topic), containsInAnyOrder(cb1, cb2, cb3)); // foo/+/bar/# still exists

assertThat("remove topic", trie.remove("foo/#", cb3), is(true));
assertFalse(trie.containsKey("foo/#"));
assertThat(trie.get("foo/#"), is(empty()));
assertThat(trie.get("foo/+"), is(empty())); // foo/+ doesn't match with any existing topic
assertThat(trie.get(topic), containsInAnyOrder(cb1, cb2)); // foo/+/bar/# still exists
assertEquals(3, trie.size());

assertThat("remove topic", trie.remove(topic, cb1), is(true));
assertThat(trie.get(topic), contains(cb2));
assertEquals(2, trie.size());
assertTrue(trie.containsKey("foo/+"));
assertTrue(trie.containsKey("foo/+/bar/#"));

assertThat("remove topic", trie.remove(topic, cb2), is(true));
assertThat(trie.get(topic), is(empty()));
assertEquals(1, trie.size());
assertFalse(trie.containsKey("foo/+"));
assertFalse(trie.containsKey("foo/+/bar/#"));
}

@Test
void GIVEN_topics_WHEN_isWildcard_THEN_returns_whether_it_uses_wildcard() {
assertTrue(SubscriptionTrie.isWildcard("+"));
Expand Down

0 comments on commit edd199c

Please sign in to comment.