-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: update retry strategy for mutation calls to handle aborted transactions #1270
Open
aakashanandg
wants to merge
1
commit into
googleapis:main
Choose a base branch
from
aakashanandg:batch-retry
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+105
−10
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,13 +13,15 @@ | |
# limitations under the License. | ||
|
||
|
||
import time | ||
import unittest | ||
from tests._helpers import ( | ||
OpenTelemetryBase, | ||
StatusCode, | ||
enrich_with_otel_scope, | ||
) | ||
from google.cloud.spanner_v1 import RequestOptions | ||
from unittest.mock import patch | ||
|
||
TABLE_NAME = "citizens" | ||
COLUMNS = ["email", "first_name", "last_name", "age"] | ||
|
@@ -263,6 +265,71 @@ def test_commit_ok(self): | |
self.assertSpanAttributes( | ||
"CloudSpanner.Commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) | ||
) | ||
|
||
|
||
def test_aborted_exception_on_commit(self): | ||
# Test case to verify that an Aborted exception is raised when | ||
# batch.commit() is called and the transaction is aborted internally. | ||
import datetime | ||
from google.cloud.spanner_v1 import CommitResponse | ||
from google.cloud.spanner_v1 import TransactionOptions | ||
from google.cloud._helpers import UTC | ||
from google.cloud._helpers import _datetime_to_pb_timestamp | ||
from google.api_core.exceptions import Aborted | ||
|
||
now = datetime.datetime.utcnow().replace(tzinfo=UTC) | ||
now_pb = _datetime_to_pb_timestamp(now) | ||
response = CommitResponse(commit_timestamp=now_pb) | ||
database = _Database() | ||
# Setup the spanner API which throws Aborted exception when calling commit API. | ||
api = database.spanner_api = _FauxSpannerAPI(_commit_response=response, _aborted_error = True) | ||
|
||
# Create mock session and batch objects | ||
session = _Session(database) | ||
batch = self._make_one(session) | ||
batch.insert(TABLE_NAME, COLUMNS, VALUES) | ||
|
||
# Assertion: Ensure that calling batch.commit() raises the Aborted exception | ||
with self.assertRaises(Aborted) as context: | ||
batch.commit() | ||
|
||
# Verify additional details about the exception | ||
self.assertEqual(str(context.exception), "409 Transaction was aborted") | ||
|
||
def test_aborted_exception_on_commit_with_retries(self): | ||
# Test case to verify that an Aborted exception is raised when | ||
# batch.commit() is invoked, the transaction is internally aborted, | ||
# and the Spanner commit API calls are retried. | ||
import datetime | ||
from google.cloud.spanner_v1 import CommitResponse | ||
from google.cloud.spanner_v1 import TransactionOptions | ||
from google.cloud._helpers import UTC | ||
from google.cloud._helpers import _datetime_to_pb_timestamp | ||
from google.api_core.exceptions import Aborted | ||
|
||
now = datetime.datetime.utcnow().replace(tzinfo=UTC) | ||
now_pb = _datetime_to_pb_timestamp(now) | ||
response = CommitResponse(commit_timestamp=now_pb) | ||
database = _Database() | ||
# Setup the spanner API which throws Aborted exception when calling commit API. | ||
api = database.spanner_api = _FauxSpannerAPI(_commit_response=response, _aborted_error = True) | ||
|
||
# We will patch the commit method of _FauxSpannerAPI to track how many times it's called | ||
with patch.object(api, 'commit', wraps=api.commit) as mock_commit: | ||
# Set up a mock session and batch | ||
session = _Session(database) | ||
batch = self._make_one(session) | ||
batch.insert(TABLE_NAME, COLUMNS, VALUES) | ||
|
||
# Try committing the batch, which should call api.commit() and raise an Aborted exception | ||
# The retry logic should call commit() again after handling the Aborted exception | ||
try: | ||
batch.commit() | ||
except Aborted: | ||
pass | ||
|
||
# Verify that commit was called more than once (due to retry) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should verify the exact number. Based on my comment above, my guess is that you are getting 1 more commit than you would expect, as |
||
self.assertGreater(mock_commit.call_count, 1, "api.commit() was not called more than once on retry") | ||
|
||
def _test_commit_with_options( | ||
self, | ||
|
@@ -614,6 +681,33 @@ def __init__(self, database=None, name=TestBatch.SESSION_NAME): | |
@property | ||
def session_id(self): | ||
return self.name | ||
|
||
def run_in_transaction(self, fnc): | ||
""" | ||
Runs a function in a transaction, retrying if an exception occurs. | ||
:param fnc: The function to run in the transaction. | ||
:param max_retries: Maximum number of retry attempts. | ||
:param delay: Delay (in seconds) between retries. | ||
:return: The result of the function, or raises the exception after max retries. | ||
""" | ||
from google.api_core.exceptions import Aborted | ||
attempt = 0 | ||
max_retries = 3 | ||
delay = 1 | ||
while attempt < max_retries: | ||
try: | ||
result = fnc() | ||
return result | ||
except Aborted as exc: | ||
attempt += 1 | ||
if attempt < max_retries: | ||
print(f"Attempt {attempt} failed with Aborted. Retrying in {delay} seconds...") | ||
time.sleep(delay) # Wait before retrying | ||
else: | ||
raise exc # After max retries, raise the exception | ||
except Exception as exc: | ||
print(f"Unexpected exception occurred: {exc}") | ||
raise # Raise any other unexpected exception immediately | ||
|
||
|
||
class _Database(object): | ||
|
@@ -627,6 +721,7 @@ class _FauxSpannerAPI: | |
_committed = None | ||
_batch_request = None | ||
_rpc_error = False | ||
_aborted_error = False | ||
|
||
def __init__(self, **kwargs): | ||
self.__dict__.update(**kwargs) | ||
|
@@ -637,6 +732,7 @@ def commit( | |
metadata=None, | ||
): | ||
from google.api_core.exceptions import Unknown | ||
from google.api_core.exceptions import Aborted | ||
|
||
max_commit_delay = None | ||
if type(request).pb(request).HasField("max_commit_delay"): | ||
|
@@ -653,6 +749,8 @@ def commit( | |
) | ||
if self._rpc_error: | ||
raise Unknown("error") | ||
if self._aborted_error: | ||
raise Aborted("Transaction was aborted") | ||
return self._commit_response | ||
|
||
def batch_write( | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm.... Now that I see this, I realize that this cannot use
run_in_transaction
directly. The reason is that:batch.commit()
in its original form (so without the changes in this pull request) creates and commits a single-use read/write transaction.run_in_transaction
however creates a new read/write transaction and then executes the given method in the scope of that transaction.So what happens now is that you create two transactions:
run_in_transaction
creates a transaction that is not being used.method
that is passed in torun_in_transaction
executes aCommit
in a single-use read/write transaction.