-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Search - request/response policies implementation with API overrides #3465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Change sticky cursor implementation to propagate more lightweight Cursor object, instead the whole AggregationReply.
Get connection by nodeId
Add FT.ALTER, FT.DROPINDEX (and DD), FT.SYNDUMP, FT.SYNUPDATE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements request/response policies for RediSearch in cluster environments, transitioning from cursor ID-based routing to cursor object-based routing with API overrides. The implementation enables keyless RediSearch commands to route randomly across cluster nodes while respecting ReadFrom policies and maintaining cursor stickiness.
Key changes:
- Replaced cursor ID parameters with
Cursor
objects containing both cursor ID and node ID for cluster routing - Added cluster-aware routing for keyless RediSearch commands that honor ReadFrom policies
- Updated cursor lifecycle to support sticky routing in cluster mode
Reviewed Changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 4 comments.
Show a summary per file
File | Description |
---|---|
AggregationReply.java |
Replaced cursor ID field with optional Cursor class containing cursor ID and node ID |
AbstractRedisAsyncCommands.java |
Updated cursor method signatures to accept Cursor objects instead of cursor IDs |
RedisAdvancedClusterAsyncCommandsImpl.java |
Added cluster routing implementation for keyless RediSearch commands |
PooledClusterConnectionProvider.java |
Added random connection selection method for keyless command routing |
Template and interface files | Updated API signatures across sync/async/reactive interfaces to use Cursor objects |
Test files | Updated integration tests to use new cursor API and verify cluster routing behavior |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
import io.lettuce.core.search.arguments.SynUpdateArgs; | ||
import reactor.core.publisher.Mono; | ||
|
||
import java.util.Optional; |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The import is placed after other package imports. Move it to the appropriate alphabetical position with other java.util imports around line 83.
Copilot uses AI. Check for mistakes.
src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java
Show resolved
Hide resolved
// RediSearch may either omit the cursor on the final page, or return a non-zero cursor | ||
// that requires one more empty READ to return 0. Be tolerant across versions. | ||
long effective = nextResult.getCursor().map(AggregationReply.Cursor::getCursorId).orElse(0L); |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider moving this comment closer to the conditional logic on line 1354 where it's actually used, or make the comment more specific about which versions exhibit this behavior.
// RediSearch may either omit the cursor on the final page, or return a non-zero cursor | |
// that requires one more empty READ to return 0. Be tolerant across versions. | |
long effective = nextResult.getCursor().map(AggregationReply.Cursor::getCursorId).orElse(0L); | |
long effective = nextResult.getCursor().map(AggregationReply.Cursor::getCursorId).orElse(0L); | |
// RediSearch (prior to v2.6) may either omit the cursor on the final page, or return a non-zero cursor | |
// that requires one more empty READ to return 0. Be tolerant across versions. |
Copilot uses AI. Check for mistakes.
private ConnectionIntent getConnectionIntent(ProtocolKeyword commandType) { | ||
try { | ||
RedisCommand probe = new Command(commandType, null); | ||
boolean isReadOnly = getStatefulConnection().getOptions().getReadOnlyCommands().isReadOnly(probe); | ||
return isReadOnly ? ConnectionIntent.READ : ConnectionIntent.WRITE; | ||
} catch (Exception e) { | ||
logger.error("Error while determining connection intent for " + commandType, e); | ||
return ConnectionIntent.WRITE; | ||
} | ||
} |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is duplicated in both RedisAdvancedClusterReactiveCommandsImpl
and RedisAdvancedClusterAsyncCommandsImpl
. Consider extracting this to a shared utility class or abstract base class to avoid code duplication.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nits only except the connection intent
* Obtain a node-scoped connection for the given {@link ConnectionIntent} by first selecting a random upstream (master) | ||
* shard to ensure even distribution across partitions and then delegating to the slot-based selection logic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically, especially in an interface, you would not explain the implementation, but rather focus on the goal : in this case we do not care HOW we obtain a node, what we care is that we are obtaining a random node from the list of all nodes in the topology, by also considering any readfrom policies.
This is because the actual implementation might change, but the overall method contract / purpose could remain the same.
return new PipelinedRedisFuture<>(failed); | ||
} | ||
String nodeId = nodeIdOpt.get(); | ||
StatefulRedisConnection<K, V> byNode = getStatefulConnection().getConnection(nodeId, ConnectionIntent.WRITE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that a cursor is created on a READ node? Would this logic work in this case?
E.g. FT.AGGREGATE -> route to random node that is READ -> connection intent is WRITE here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well it seems in this case intent doesn't really matter much. We are acquiring connection to a specific redis-server instance by nodeId. For better semantics i will change intent READ for ftCursorread and leave intent WRITE for ftCursordel
} | ||
|
||
@Override | ||
public CompletableFuture<StatefulRedisConnection<K, V>> getRandomConnectionAsync(ConnectionIntent connectionIntent) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a unit test for this method
@Override | ||
public RedisFuture<AggregationReply<K, V>> ftCursorread(String index, Cursor cursor, int count) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a unit test for this method (and reactive counterpart)
} | ||
|
||
@Override | ||
public RedisFuture<String> ftCursordel(String index, Cursor cursor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a unit test for this method (and reactive counterpart)
* @param <R> result type | ||
* @return RedisFuture wrapping the routed execution | ||
*/ | ||
<R> RedisFuture<R> routeKeyless(Supplier<RedisFuture<R>> superCall, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a unit test for this method (and reactive counterpart)
* @param <R> result type | ||
* @return RedisFuture wrapping the routed execution | ||
*/ | ||
<R> RedisFuture<R> routeKeyless(Supplier<RedisFuture<R>> superCall, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a unit test for this method (and reactive counterpart)
Implementing request/response policies for search - version with API overrides
Version with ClusterWriter routing implementation - #3409
Part of #3447