Skip to content

Multi exec on cluster #3611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: master
Choose a base branch
from

Conversation

robertosantamaria-scopely
Copy link

@robertosantamaria-scopely robertosantamaria-scopely commented Apr 21, 2025

Pull Request check-list

Please make sure to review and check all of these items:

  • Do tests and lints pass with this change?
  • Do the CI tests pass with this change (enable it first in your forked repo and wait for the github action build to finish)?
  • Is the new or changed code fully tested?
  • Is a documentation update included (if this change modifies existing APIs, or introduces new ones)?
  • Is there an example added to the examples folder (if applicable)?
  • Was the change added to CHANGES file?

NOTE: these things are not required to open a PR and can be done
afterwards / while the PR is open.

Description of change

This adds support for transactions in cluster clients. These transactions are based on multi/watch/exec, same as for standalone client, but are limited to a single hash slot.

@robertosantamaria-scopely robertosantamaria-scopely force-pushed the multi-exec-on-cluster branch 2 times, most recently from dee69d2 to 9937904 Compare April 22, 2025 08:06
@robertosantamaria-scopely
Copy link
Author

@elena-kolevska this needs workflow approval, thanks

Adds support for transactions based on multi/watch/exec on clusters.
Transactions in this mode are limited to a single hash slot.

Contributed-by: Scopely <[email protected]>
Please note:

- RedisCluster pipelines currently only support key-based commands.
- The pipeline gets its ‘read_from_replicas’ value from the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new preferred aproach is to configure load_balancing_strategy, starting from this release the read_from_replicas is deprecated.

@petyaslavova petyaslavova added the feature New feature label Apr 29, 2025
@vladvildanov vladvildanov marked this pull request as ready for review May 6, 2025 09:40
@@ -1821,6 +1852,16 @@ def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
return self.address_remap((host, port))
return host, port

def find_connection_owner(self, connection: Connection) -> Optional[Redis]:
node_name = get_node_name(connection.host, connection.port)
for node in self.nodes_cache.values():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can throw runtime error: the pattern is similar as the one reported in issue #3565 . To avoid it you can use tuple(self.nodes_cache.values()).

# so that we can read them all in parallel as they come back.
# we dont' multiplex on the sockets as they come available,
# but that shouldn't make too much difference.
node_commands = nodes.values()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line needed? We have the same as first command in the try block.

except (ConnectionError, TimeoutError):
for n in nodes.values():
n.connection_pool.release(n.connection)
# Connection retries are being handled in the node's
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not true anymore - no retries allowed on the node's connection.

# before reading anything
# this allows us to flush all the requests out across the
# network essentially in parallel
# so that we can read them all in parallel as they come back.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is the sync client, this comment needs to be changed. We don't read the responses in parallel.

for n in node_commands:
n.read()
finally:
# release all of the redis connections we allocated earlier
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vladvildanov can you please take a look at this comment? It sounds that it is explaining a problem with the releasing of the connections back to the pool in the finally block (although the goal has been to describe why this change is added...)

self._watching = True

def _immediate_execute_command(self, *args, **options):
retry = copy(self._pipe.retry)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the retry object contain the retryable cluster errors here? Since we haven't started to use it actively in the cluster I don't think that all exceptions are added - like SlotNotCovered, ClusterDown, etc.

"""
Starts transactional context.

See: ClusterPipeline.reset()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See: ClusterPipeline.reset() --> See: ClusterPipeline.multi()

self, stack, raise_on_error=True, allow_redirections=True
):
"""
Wrapper for CLUSTERDOWN error handling.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the comment, only the cluster down error is covered, while the flow actually reacts on several errors - the list currently is ConnectionError, TimeoutError, ClusterDownError, SlotNotCoveredError

IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
SLOT_REDIRECT_ERRORS = (AskError, MovedError)
CONNECTION_ERRORS = (ConnectionError,OSError,ClusterDownError)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that SlotNotCoveredError should also be handled as type of connection error.

def _execute_transaction_with_retries(
self, stack: List["PipelineCommand"], raise_on_error: bool
):
retry = copy(self._pipe.retry)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to avoid copying and updating the retry object possibly several times per transaction(if we have several commands for immediate execution), you may add it to the TransactionStrategy properties and update the supported errors once in the init method.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants