-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Multi exec on cluster #3611
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Multi exec on cluster #3611
Conversation
dee69d2
to
9937904
Compare
@elena-kolevska this needs workflow approval, thanks |
Adds support for transactions based on multi/watch/exec on clusters. Transactions in this mode are limited to a single hash slot. Contributed-by: Scopely <[email protected]>
9937904
to
f9cc550
Compare
Please note: | ||
|
||
- RedisCluster pipelines currently only support key-based commands. | ||
- The pipeline gets its ‘read_from_replicas’ value from the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new preferred aproach is to configure load_balancing_strategy, starting from this release the read_from_replicas is deprecated.
@@ -1821,6 +1852,16 @@ def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: | |||
return self.address_remap((host, port)) | |||
return host, port | |||
|
|||
def find_connection_owner(self, connection: Connection) -> Optional[Redis]: | |||
node_name = get_node_name(connection.host, connection.port) | |||
for node in self.nodes_cache.values(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can throw runtime error: the pattern is similar as the one reported in issue #3565 . To avoid it you can use tuple(self.nodes_cache.values()).
# so that we can read them all in parallel as they come back. | ||
# we dont' multiplex on the sockets as they come available, | ||
# but that shouldn't make too much difference. | ||
node_commands = nodes.values() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this line needed? We have the same as first command in the try block.
except (ConnectionError, TimeoutError): | ||
for n in nodes.values(): | ||
n.connection_pool.release(n.connection) | ||
# Connection retries are being handled in the node's |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is not true anymore - no retries allowed on the node's connection.
# before reading anything | ||
# this allows us to flush all the requests out across the | ||
# network essentially in parallel | ||
# so that we can read them all in parallel as they come back. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is the sync client, this comment needs to be changed. We don't read the responses in parallel.
for n in node_commands: | ||
n.read() | ||
finally: | ||
# release all of the redis connections we allocated earlier |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vladvildanov can you please take a look at this comment? It sounds that it is explaining a problem with the releasing of the connections back to the pool in the finally block (although the goal has been to describe why this change is added...)
self._watching = True | ||
|
||
def _immediate_execute_command(self, *args, **options): | ||
retry = copy(self._pipe.retry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the retry object contain the retryable cluster errors here? Since we haven't started to use it actively in the cluster I don't think that all exceptions are added - like SlotNotCovered, ClusterDown, etc.
""" | ||
Starts transactional context. | ||
|
||
See: ClusterPipeline.reset() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See: ClusterPipeline.reset() --> See: ClusterPipeline.multi()
self, stack, raise_on_error=True, allow_redirections=True | ||
): | ||
""" | ||
Wrapper for CLUSTERDOWN error handling. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the comment, only the cluster down error is covered, while the flow actually reacts on several errors - the list currently is ConnectionError, TimeoutError, ClusterDownError, SlotNotCoveredError
IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} | ||
UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} | ||
SLOT_REDIRECT_ERRORS = (AskError, MovedError) | ||
CONNECTION_ERRORS = (ConnectionError,OSError,ClusterDownError) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that SlotNotCoveredError should also be handled as type of connection error.
def _execute_transaction_with_retries( | ||
self, stack: List["PipelineCommand"], raise_on_error: bool | ||
): | ||
retry = copy(self._pipe.retry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to avoid copying and updating the retry object possibly several times per transaction(if we have several commands for immediate execution), you may add it to the TransactionStrategy
properties and update the supported errors once in the init
method.
Pull Request check-list
Please make sure to review and check all of these items:
NOTE: these things are not required to open a PR and can be done
afterwards / while the PR is open.
Description of change
This adds support for transactions in cluster clients. These transactions are based on multi/watch/exec, same as for standalone client, but are limited to a single hash slot.