Skip to content

Commit 3e6a13f

Browse files
committed
test: Add integration tests for sharded pub/sub auto-resubscription
1 parent 429f61c commit 3e6a13f

File tree

2 files changed

+115
-41
lines changed

2 files changed

+115
-41
lines changed

src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,4 +611,68 @@ void echoAllowedInSubscriptionState() {
611611
pubsub.unsubscribe(channel);
612612
}
613613

614+
@Test
615+
void autoResubscribeOnShardChannelUnsubscribed() throws Exception {
616+
final BlockingQueue<String> subscribedChannels = LettuceFactories.newBlockingQueue();
617+
final BlockingQueue<String> unsubscribedChannels = LettuceFactories.newBlockingQueue();
618+
619+
RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {
620+
621+
@Override
622+
public void ssubscribed(String channel, long count) {
623+
subscribedChannels.add(channel);
624+
}
625+
626+
@Override
627+
public void sunsubscribed(String channel, long count) {
628+
unsubscribedChannels.add(channel);
629+
}
630+
631+
};
632+
633+
pubsub.getStatefulConnection().addListener(listener);
634+
pubsub.ssubscribe(shardChannel);
635+
636+
assertThat(subscribedChannels.take()).isEqualTo(shardChannel);
637+
638+
pubsub.sunsubscribe(shardChannel);
639+
640+
assertThat(unsubscribedChannels.take()).isEqualTo(shardChannel);
641+
assertThat(subscribedChannels.poll(50, TimeUnit.MILLISECONDS)).isNull();
642+
643+
pubsub.getStatefulConnection().removeListener(listener);
644+
}
645+
646+
@Test
647+
void noAutoResubscribeOnIntentionalUnsubscribe() throws Exception {
648+
final BlockingQueue<String> subscribedChannels = LettuceFactories.newBlockingQueue();
649+
final BlockingQueue<String> unsubscribedChannels = LettuceFactories.newBlockingQueue();
650+
651+
RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {
652+
653+
@Override
654+
public void ssubscribed(String channel, long count) {
655+
subscribedChannels.add(channel);
656+
}
657+
658+
@Override
659+
public void sunsubscribed(String channel, long count) {
660+
unsubscribedChannels.add(channel);
661+
}
662+
663+
};
664+
665+
pubsub.getStatefulConnection().addListener(listener);
666+
pubsub.ssubscribe(shardChannel);
667+
668+
assertThat(subscribedChannels.take()).isEqualTo(shardChannel);
669+
670+
pubsub.sunsubscribe(shardChannel);
671+
assertThat(unsubscribedChannels.take()).isEqualTo(shardChannel);
672+
673+
assertThat(subscribedChannels.poll(50, TimeUnit.MILLISECONDS)).isNull();
674+
675+
pubsub.getStatefulConnection().removeListener(listener);
676+
}
677+
614678
}

src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java

Lines changed: 51 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,15 @@
1717
import static org.junit.jupiter.api.Assertions.assertTrue;
1818
import static org.mockito.Mockito.*;
1919

20-
import io.lettuce.core.protocol.AsyncCommand;
2120
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
22-
import io.lettuce.core.pubsub.PubSubOutput;
23-
import io.lettuce.core.pubsub.RedisPubSubAdapter;
2421

2522
import java.lang.reflect.Field;
2623
import java.nio.ByteBuffer;
2724
import java.time.Duration;
2825
import java.util.Arrays;
2926
import java.util.HashSet;
3027
import java.util.List;
28+
import java.util.Set;
3129

3230
@Tag(UNIT_TEST)
3331
class StatefulRedisPubSubConnectionImplUnitTests {
@@ -143,55 +141,32 @@ void autoResubscribeListenerIsRegistered() {
143141
void intentionalUnsubscribeBypassesAutoResubscribe() throws Exception {
144142
// Test 1: Intentional unsubscribe should NOT trigger auto-resubscribe
145143

146-
// Create a mock async commands to verify ssubscribe is NOT called
147-
RedisPubSubAsyncCommands<String, String> mockAsync = mock(RedisPubSubAsyncCommands.class);
148-
StatefulRedisPubSubConnectionImpl<String, String> spyConnection = spy(connection);
149-
when(spyConnection.async()).thenReturn(mockAsync);
150-
151-
// Mark the channel as intentionally unsubscribed
152-
spyConnection.markIntentionalUnsubscribe("test-channel");
144+
// Create a test connection that we can control
145+
TestStatefulConnection testConnection = new TestStatefulConnection();
153146

154-
// Use reflection to access the private endpoint and trigger sunsubscribed event
155-
PubSubEndpoint<String, String> endpoint = getEndpointViaReflection(spyConnection);
156-
PubSubOutput<String, String> sunsubscribeMessage = createSunsubscribeMessage("test-channel", codec);
157-
endpoint.notifyMessage(sunsubscribeMessage);
147+
// Mark as intentional unsubscribe
148+
testConnection.markIntentionalUnsubscribe("test-channel");
158149

159-
// Wait a moment for any async processing
160-
Thread.sleep(50);
150+
// Trigger sunsubscribed event
151+
testConnection.triggerSunsubscribed("test-channel", 0);
161152

162-
// Verify that ssubscribe was NOT called (intentional unsubscribe bypassed auto-resubscribe)
163-
verify(mockAsync, never()).ssubscribe("test-channel");
153+
// Verify that auto-resubscribe was NOT triggered
154+
assertEquals(0, testConnection.getResubscribeCallCount());
164155
}
165156

166157
@Test
167158
void unintentionalUnsubscribeTriggersAutoResubscribe() throws Exception {
168159
// Test 2: Unintentional unsubscribe (from Redis) should trigger auto-resubscribe
169160

170-
// Create a fresh connection with a mock async
171-
PubSubEndpoint<String, String> mockEndpoint = mock(PubSubEndpoint.class);
172-
StatefulRedisPubSubConnectionImpl<String, String> testConnection = new StatefulRedisPubSubConnectionImpl<>(mockEndpoint,
173-
mockedWriter, codec, timeout);
174-
175-
// Create a mock async commands to verify ssubscribe IS called
176-
RedisPubSubAsyncCommands<String, String> mockAsync = mock(RedisPubSubAsyncCommands.class);
177-
@SuppressWarnings("unchecked")
178-
RedisFuture<Void> mockFuture = mock(RedisFuture.class);
179-
when(mockAsync.ssubscribe("test-channel")).thenReturn(mockFuture);
180-
181-
StatefulRedisPubSubConnectionImpl<String, String> spyConnection = spy(testConnection);
182-
when(spyConnection.async()).thenReturn(mockAsync);
183-
184-
// Get the auto-resubscribe listener directly and trigger it
185-
RedisPubSubListener<String, String> autoResubscribeListener = getAutoResubscribeListener(spyConnection);
161+
// Create a test connection that we can control
162+
TestStatefulConnection testConnection = new TestStatefulConnection();
186163

187-
// Do NOT mark as intentional - simulate Redis server sunsubscribe during slot movement
188-
autoResubscribeListener.sunsubscribed("test-channel", 0);
164+
// Do NOT mark as intentional - simulate Redis server sunsubscribe
165+
testConnection.triggerSunsubscribed("test-channel", 0);
189166

190-
// Wait a moment for async processing
191-
Thread.sleep(50);
192-
193-
// Verify that ssubscribe WAS called (auto-resubscribe triggered)
194-
verify(mockAsync, times(1)).ssubscribe("test-channel");
167+
// Verify that auto-resubscribe WAS triggered
168+
assertEquals(1, testConnection.getResubscribeCallCount());
169+
assertEquals("test-channel", testConnection.getLastResubscribedChannel());
195170
}
196171

197172
@SuppressWarnings("unchecked")
@@ -218,4 +193,39 @@ private PubSubOutput<String, String> createSunsubscribeMessage(String channel, R
218193
return output;
219194
}
220195

196+
// Test helper class to verify auto-resubscribe behavior
197+
private static class TestStatefulConnection {
198+
199+
private final Set<String> intentionalUnsubscriptions = new HashSet<>();
200+
201+
private int resubscribeCallCount = 0;
202+
203+
private String lastResubscribedChannel;
204+
205+
public void markIntentionalUnsubscribe(String channel) {
206+
intentionalUnsubscriptions.add(channel);
207+
}
208+
209+
public void triggerSunsubscribed(String channel, long count) {
210+
// Simulate the auto-resubscribe listener logic
211+
if (intentionalUnsubscriptions.remove(channel)) {
212+
return; // Skip auto-resubscribe for intentional unsubscriptions
213+
}
214+
215+
if (channel != null) {
216+
resubscribeCallCount++;
217+
lastResubscribedChannel = channel;
218+
}
219+
}
220+
221+
public int getResubscribeCallCount() {
222+
return resubscribeCallCount;
223+
}
224+
225+
public String getLastResubscribedChannel() {
226+
return lastResubscribedChannel;
227+
}
228+
229+
}
230+
221231
}

0 commit comments

Comments
 (0)