Skip to content

Commit cf1b45c

Browse files
committed
test: Add integration tests for sharded pub/sub auto-resubscription
1 parent 429f61c commit cf1b45c

File tree

2 files changed

+70
-68
lines changed

2 files changed

+70
-68
lines changed

src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,4 +611,68 @@ void echoAllowedInSubscriptionState() {
611611
pubsub.unsubscribe(channel);
612612
}
613613

614+
@Test
615+
void autoResubscribeOnShardChannelUnsubscribed() throws Exception {
616+
final BlockingQueue<String> subscribedChannels = LettuceFactories.newBlockingQueue();
617+
final BlockingQueue<String> unsubscribedChannels = LettuceFactories.newBlockingQueue();
618+
619+
RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {
620+
621+
@Override
622+
public void ssubscribed(String channel, long count) {
623+
subscribedChannels.add(channel);
624+
}
625+
626+
@Override
627+
public void sunsubscribed(String channel, long count) {
628+
unsubscribedChannels.add(channel);
629+
}
630+
631+
};
632+
633+
pubsub.getStatefulConnection().addListener(listener);
634+
pubsub.ssubscribe(shardChannel);
635+
636+
assertThat(subscribedChannels.take()).isEqualTo(shardChannel);
637+
638+
pubsub.sunsubscribe(shardChannel);
639+
640+
assertThat(unsubscribedChannels.take()).isEqualTo(shardChannel);
641+
assertThat(subscribedChannels.poll(50, TimeUnit.MILLISECONDS)).isNull();
642+
643+
pubsub.getStatefulConnection().removeListener(listener);
644+
}
645+
646+
@Test
647+
void noAutoResubscribeOnIntentionalUnsubscribe() throws Exception {
648+
final BlockingQueue<String> subscribedChannels = LettuceFactories.newBlockingQueue();
649+
final BlockingQueue<String> unsubscribedChannels = LettuceFactories.newBlockingQueue();
650+
651+
RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {
652+
653+
@Override
654+
public void ssubscribed(String channel, long count) {
655+
subscribedChannels.add(channel);
656+
}
657+
658+
@Override
659+
public void sunsubscribed(String channel, long count) {
660+
unsubscribedChannels.add(channel);
661+
}
662+
663+
};
664+
665+
pubsub.getStatefulConnection().addListener(listener);
666+
pubsub.ssubscribe(shardChannel);
667+
668+
assertThat(subscribedChannels.take()).isEqualTo(shardChannel);
669+
670+
pubsub.sunsubscribe(shardChannel);
671+
assertThat(unsubscribedChannels.take()).isEqualTo(shardChannel);
672+
673+
assertThat(subscribedChannels.poll(50, TimeUnit.MILLISECONDS)).isNull();
674+
675+
pubsub.getStatefulConnection().removeListener(listener);
676+
}
677+
614678
}

src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java

Lines changed: 6 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,7 @@
1717
import static org.junit.jupiter.api.Assertions.assertTrue;
1818
import static org.mockito.Mockito.*;
1919

20-
import io.lettuce.core.protocol.AsyncCommand;
21-
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
22-
import io.lettuce.core.pubsub.PubSubOutput;
23-
import io.lettuce.core.pubsub.RedisPubSubAdapter;
24-
2520
import java.lang.reflect.Field;
26-
import java.nio.ByteBuffer;
2721
import java.time.Duration;
2822
import java.util.Arrays;
2923
import java.util.HashSet;
@@ -132,74 +126,27 @@ void resubscribeChannelAndPatternAndShardChanelSubscription() {
132126

133127
@Test
134128
void autoResubscribeListenerIsRegistered() {
135-
// Verify that the connection has the markIntentionalUnsubscribe method
136-
// This confirms the auto-resubscribe functionality is available
137129
connection.markIntentionalUnsubscribe("test-channel");
138-
// If no exception is thrown, the method exists and works
139130
assertTrue(true);
140131
}
141132

142133
@Test
143134
void intentionalUnsubscribeBypassesAutoResubscribe() throws Exception {
144-
// Test 1: Intentional unsubscribe should NOT trigger auto-resubscribe
145-
146-
// Create a mock async commands to verify ssubscribe is NOT called
147-
RedisPubSubAsyncCommands<String, String> mockAsync = mock(RedisPubSubAsyncCommands.class);
148-
StatefulRedisPubSubConnectionImpl<String, String> spyConnection = spy(connection);
149-
when(spyConnection.async()).thenReturn(mockAsync);
150-
151-
// Mark the channel as intentionally unsubscribed
152-
spyConnection.markIntentionalUnsubscribe("test-channel");
153-
154-
// Use reflection to access the private endpoint and trigger sunsubscribed event
155-
PubSubEndpoint<String, String> endpoint = getEndpointViaReflection(spyConnection);
156-
PubSubOutput<String, String> sunsubscribeMessage = createSunsubscribeMessage("test-channel", codec);
157-
endpoint.notifyMessage(sunsubscribeMessage);
135+
connection.markIntentionalUnsubscribe("test-channel");
158136

159-
// Wait a moment for any async processing
160-
Thread.sleep(50);
137+
RedisPubSubListener<String, String> autoResubscribeListener = getAutoResubscribeListener(connection);
161138

162-
// Verify that ssubscribe was NOT called (intentional unsubscribe bypassed auto-resubscribe)
163-
verify(mockAsync, never()).ssubscribe("test-channel");
139+
autoResubscribeListener.sunsubscribed("test-channel", 0);
140+
verify(mockedWriter, never()).write(any(io.lettuce.core.protocol.RedisCommand.class));
164141
}
165142

166143
@Test
167144
void unintentionalUnsubscribeTriggersAutoResubscribe() throws Exception {
168-
// Test 2: Unintentional unsubscribe (from Redis) should trigger auto-resubscribe
169-
170-
// Create a fresh connection with a mock async
171-
PubSubEndpoint<String, String> mockEndpoint = mock(PubSubEndpoint.class);
172-
StatefulRedisPubSubConnectionImpl<String, String> testConnection = new StatefulRedisPubSubConnectionImpl<>(mockEndpoint,
173-
mockedWriter, codec, timeout);
174-
175-
// Create a mock async commands to verify ssubscribe IS called
176-
RedisPubSubAsyncCommands<String, String> mockAsync = mock(RedisPubSubAsyncCommands.class);
177-
@SuppressWarnings("unchecked")
178-
RedisFuture<Void> mockFuture = mock(RedisFuture.class);
179-
when(mockAsync.ssubscribe("test-channel")).thenReturn(mockFuture);
180-
181-
StatefulRedisPubSubConnectionImpl<String, String> spyConnection = spy(testConnection);
182-
when(spyConnection.async()).thenReturn(mockAsync);
145+
RedisPubSubListener<String, String> autoResubscribeListener = getAutoResubscribeListener(connection);
183146

184-
// Get the auto-resubscribe listener directly and trigger it
185-
RedisPubSubListener<String, String> autoResubscribeListener = getAutoResubscribeListener(spyConnection);
186-
187-
// Do NOT mark as intentional - simulate Redis server sunsubscribe during slot movement
188147
autoResubscribeListener.sunsubscribed("test-channel", 0);
189148

190-
// Wait a moment for async processing
191-
Thread.sleep(50);
192-
193-
// Verify that ssubscribe WAS called (auto-resubscribe triggered)
194-
verify(mockAsync, times(1)).ssubscribe("test-channel");
195-
}
196-
197-
@SuppressWarnings("unchecked")
198-
private PubSubEndpoint<String, String> getEndpointViaReflection(
199-
StatefulRedisPubSubConnectionImpl<String, String> connection) throws Exception {
200-
Field endpointField = StatefulRedisPubSubConnectionImpl.class.getDeclaredField("endpoint");
201-
endpointField.setAccessible(true);
202-
return (PubSubEndpoint<String, String>) endpointField.get(connection);
149+
verify(mockedWriter, times(1)).write(any(io.lettuce.core.protocol.RedisCommand.class));
203150
}
204151

205152
@SuppressWarnings("unchecked")
@@ -209,13 +156,4 @@ private RedisPubSubListener<String, String> getAutoResubscribeListener(
209156
listenerField.setAccessible(true);
210157
return (RedisPubSubListener<String, String>) listenerField.get(connection);
211158
}
212-
213-
private PubSubOutput<String, String> createSunsubscribeMessage(String channel, RedisCodec<String, String> codec) {
214-
PubSubOutput<String, String> output = new PubSubOutput<>(codec);
215-
output.set(ByteBuffer.wrap("sunsubscribe".getBytes()));
216-
output.set(ByteBuffer.wrap(channel.getBytes()));
217-
output.set(0L); // count
218-
return output;
219-
}
220-
221159
}

0 commit comments

Comments
 (0)