Skip to content

Commit a486000

Browse files
fix: prune subscriptions trie when topics are unsubscribed (#1671)
* fix: prune subscriptions trie when topics are unsubscribed * chore: remove and prune subscriptions recursively --------- Co-authored-by: Siddhant Srivastava <[email protected]> Co-authored-by: Siddhant Srivastava <[email protected]>
1 parent 72da2a0 commit a486000

File tree

2 files changed

+84
-3
lines changed

2 files changed

+84
-3
lines changed

src/main/java/com/aws/greengrass/builtin/services/pubsub/SubscriptionTrie.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@
55

66
package com.aws.greengrass.builtin.services.pubsub;
77

8+
import com.aws.greengrass.util.Utils;
9+
810
import java.util.Collections;
911
import java.util.HashSet;
1012
import java.util.Map;
1113
import java.util.Set;
1214
import java.util.concurrent.ConcurrentHashMap;
15+
import java.util.concurrent.atomic.AtomicBoolean;
1316

1417
/**
1518
* Trie to manage subscriptions.
@@ -69,11 +72,43 @@ public boolean remove(String topic, K cb) {
6972
* @return if changed after removal
7073
*/
7174
public boolean remove(String topic, Set<K> cbs) {
72-
SubscriptionTrie<K> sub = lookup(topic);
73-
if (sub == null) {
75+
if (Utils.isEmpty(topic) || Utils.isEmpty(cbs)) {
7476
return false;
7577
}
76-
return sub.subscriptionCallbacks.removeAll(cbs);
78+
79+
AtomicBoolean isSubscriptionRemoved = new AtomicBoolean(false);
80+
removeRecursively(topic.split(TOPIC_LEVEL_SEPARATOR), this, cbs, 0, isSubscriptionRemoved);
81+
return isSubscriptionRemoved.get();
82+
}
83+
84+
private boolean canRemove(SubscriptionTrie<K> node) {
85+
// returns false if the topic level is prefix of another topic or has callbacks registered
86+
return node.children.isEmpty() && node.subscriptionCallbacks.isEmpty();
87+
}
88+
89+
/*
90+
This method removes the requested callback from a topic and prunes the subscription trie recursively by
91+
1. navigating to the last node of the requested topic
92+
1.a. at this node, the method persists the result of the requested callback removal for the penultimate result
93+
1.b. returns true if requested callback was removed and if current topic node can be pruned
94+
95+
2. the previous topic node receives the result from 1.b.
96+
2.a. if true, remove child topic node and return if current node can be pruned
97+
2.b. if false, simply do nothing and return false implying current node cannot be pruned
98+
*/
99+
private boolean removeRecursively(String[] topicNodes, SubscriptionTrie<K> topicNode, Set<K> cbs, int index,
100+
AtomicBoolean subscriptionRemoved) {
101+
if (index == topicNodes.length) {
102+
subscriptionRemoved.set(topicNode.subscriptionCallbacks.removeAll(cbs));
103+
return subscriptionRemoved.get() && canRemove(topicNode);
104+
}
105+
106+
if (removeRecursively(topicNodes, topicNode.children.get(topicNodes[index]), cbs, index + 1,
107+
subscriptionRemoved)) {
108+
topicNode.children.remove(topicNodes[index]);
109+
return canRemove(topicNode);
110+
}
111+
return false;
77112
}
78113

79114
/**
@@ -183,3 +218,4 @@ public static boolean isWildcard(String topic) {
183218
}
184219

185220
}
221+

src/test/java/com/aws/greengrass/builtin/services/pubsub/SubscriptionTrieTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,51 @@ public void GIVEN_subscription_wildcard_WHEN_remove_topic_THEN_no_matches() {
137137
assertEquals(0, trie.size());
138138
}
139139

140+
@Test
141+
public void GIVEN_subscriptions_with_wildcards_WHEN_remove_topics_THEN_clean_up_trie() {
142+
assertEquals(0, trie.size());
143+
SubscriptionCallback cb1 = generateSubscriptionCallback();
144+
SubscriptionCallback cb2 = generateSubscriptionCallback();
145+
SubscriptionCallback cb3 = generateSubscriptionCallback();
146+
String topic = "foo/+/bar/#";
147+
trie.add("bar", cb1);
148+
trie.add(topic, cb1);
149+
trie.add(topic, cb2);
150+
// Topic is not registered with the callback, mark it as removed when requested to remove
151+
assertThat("remove topic", trie.remove(topic, cb3), is(false));
152+
assertThat(trie.get(topic), containsInAnyOrder(cb1, cb2));
153+
154+
trie.add("foo/#", cb3);
155+
trie.add("foo/+", cb2);
156+
157+
assertThat(trie.get(topic), containsInAnyOrder(cb1, cb2, cb3));
158+
assertEquals(5, trie.size());
159+
160+
assertThat("remove topic", trie.remove("foo/+", cb2), is(true));
161+
assertEquals(4, trie.size());
162+
assertThat(trie.get("foo/+"), containsInAnyOrder(cb3)); // foo/+ still matches with foo/#
163+
assertThat(trie.get(topic), containsInAnyOrder(cb1, cb2, cb3)); // foo/+/bar/# still exists
164+
165+
assertThat("remove topic", trie.remove("foo/#", cb3), is(true));
166+
assertFalse(trie.containsKey("foo/#"));
167+
assertThat(trie.get("foo/#"), is(empty()));
168+
assertThat(trie.get("foo/+"), is(empty())); // foo/+ doesn't match with any existing topic
169+
assertThat(trie.get(topic), containsInAnyOrder(cb1, cb2)); // foo/+/bar/# still exists
170+
assertEquals(3, trie.size());
171+
172+
assertThat("remove topic", trie.remove(topic, cb1), is(true));
173+
assertThat(trie.get(topic), contains(cb2));
174+
assertEquals(2, trie.size());
175+
assertTrue(trie.containsKey("foo/+"));
176+
assertTrue(trie.containsKey("foo/+/bar/#"));
177+
178+
assertThat("remove topic", trie.remove(topic, cb2), is(true));
179+
assertThat(trie.get(topic), is(empty()));
180+
assertEquals(1, trie.size());
181+
assertFalse(trie.containsKey("foo/+"));
182+
assertFalse(trie.containsKey("foo/+/bar/#"));
183+
}
184+
140185
@Test
141186
void GIVEN_topics_WHEN_isWildcard_THEN_returns_whether_it_uses_wildcard() {
142187
assertTrue(SubscriptionTrie.isWildcard("+"));

0 commit comments

Comments
 (0)