-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(x-goog-spanner-request-id): implement Request-ID #1264
base: main
Are you sure you want to change the base?
Conversation
🤖 I detect that the PR title and the commit message differ and there's only one commit. To use the PR title for the commit history, you can use Github's automerge feature with squashing, or use -- conventional-commit-lint bot |
2451afe
to
4f1da67
Compare
b021c9e
to
47da5e4
Compare
8616572
to
caa60c2
Compare
This change introduces AtomicCounter, a concurrency/thread-safe counter do deal with the multi-threaded nature of variables. It permits operations: * atomic_counter += 1 * value = atomic_counter + 1 * atomic_counter.value that'll be paramount to bringing in the logic for x-goog-spanner-request-id in much reduced changelists. Updates googleapis#1261 Carved out from PR googleapis#1264
9903cac
to
529333a
Compare
* feat(x-goog-spanner-request-id): introduce AtomicCounter This change introduces AtomicCounter, a concurrency/thread-safe counter do deal with the multi-threaded nature of variables. It permits operations: * atomic_counter += 1 * value = atomic_counter + 1 * atomic_counter.value that'll be paramount to bringing in the logic for x-goog-spanner-request-id in much reduced changelists. Updates #1261 Carved out from PR #1264 * Tests for with_request_id * chore: remove sleep * chore: remove unused import --------- Co-authored-by: Knut Olav Løite <[email protected]>
2d6a3ea
to
65757b5
Compare
channel_id = len(self.__transports_to_channel_id) + 1 | ||
self.__transports_to_channel_id[api._transport] = channel_id | ||
|
||
return channel_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we can safely cache this value once it has been calculated once. self.spanner_api
is never going to change after it has been created, and the same is also true for api._transport
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is indeed cached as it is saved in the Database static __transports_to_channel_id map or would you prefer I name it with capital letters so that it is much easier to spot as __TRANSPORTS_TO_CHANNEL_ID?
google/cloud/spanner_v1/database.py
Outdated
@@ -698,10 +728,20 @@ def execute_partitioned_dml( | |||
_metadata_with_leader_aware_routing(self._route_to_leader_enabled) | |||
) | |||
|
|||
# Attempt will be incremented inside _restart_on_unavailable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume that this comment applies to partial_attempt
, no? If so, can we move it down to that variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It applies to begin_txn_attempt
and I've updated the comment and moving the comment before the variable itself. Thank you.
google/cloud/spanner_v1/database.py
Outdated
trace_name="CloudSpanner.ExecuteStreamingSql", | ||
request=request, | ||
transaction_selector=txn_selector, | ||
observability_options=self.observability_options, | ||
attempt=begin_txn_attempt, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this should be partial_attempt
. Retries of this RPC will not retry the BeginTransaction
RPC, it will just continue to use the same transaction ID as the one that was returned by the original BeginTransaction
RPC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oooh, okay, that's very interesting and thanks for letting me know, let me make that update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did however already pass in and increment partial_attempt inside wrapped_method. I'll instead remove that and increment them in the appropriate sites.
google/cloud/spanner_v1/instance.py
Outdated
@@ -501,6 +501,7 @@ def database( | |||
proto_descriptors=proto_descriptors, | |||
) | |||
else: | |||
print("enabled interceptors") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove?
google/cloud/spanner_v1/pool.py
Outdated
@@ -243,6 +243,7 @@ def bind(self, database): | |||
"CloudSpanner.FixedPool.BatchCreateSessions", | |||
observability_options=observability_options, | |||
) as span: | |||
attempt = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't think we need this. We can just pass in 1 as a constant in the function below just as for the other functions.
tests/unit/test_request_id_header.py
Outdated
if n_finished == len(threads): | ||
break | ||
|
||
time.sleep(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we remove this? The call to thread.join()
should ensure that the threads have finished.
(1, REQ_RAND_PROCESS_ID, client_id, channel_id, 1, 1), | ||
), | ||
( | ||
"/google.spanner.v1.Spanner/GetSession", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: This PR should get rid of all of these GetSession
calls, so rebasing on main my cause this test to fail.
] | ||
assert got_stream_segments == want_stream_segments | ||
|
||
def test_database_run_in_transaction_retries_on_abort(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test does not appear to actually verify anything (other than that it does not cause any errors). Can we either remove it, or add verifications for the scenario it should cover?
tests/unit/test_session.py
Outdated
("x-goog-spanner-route-to-leader", "true"), | ||
( | ||
"x-goog-spanner-request-id", | ||
f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.2", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is attempt
set to 2 here because the transaction is retried? If so, then that is not correct. The attempt
number should only be increased if the RPC itself is being retried due to a retryable error for that (single) RPC. E.g. if a ServiceUnavailable
error is returned.
tests/unit/test_session.py
Outdated
("x-goog-spanner-route-to-leader", "true"), | ||
( | ||
"x-goog-spanner-request-id", | ||
f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.3.2", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here as for the BeginTransaction
RPC: Transaction retries should not increase the attempt count. Only retries of a single RPC due to UNAVAILABLE
errors (and other retryable error codes).
…propagation Generates a request_id that is then injected inside metadata that's sent over to the Cloud Spanner backend. Officially inject the first set of x-goog-spanner-request-id values into header metadata Add request-id interceptor to use in asserting tests Wrap Snapshot methods with x-goog-request-id metadata injector Setup scaffolding for XGoogRequestIdHeader checks Wire up XGoogSpannerRequestIdInterceptor for TestDatabase checks Inject header in more Session using spots plus more tests Base for tests with retries on abort More plumbing for Transaction and Database Update unit tests for Transaction Wrap more in Transaction + update tests Update tests Plumb in more tests Update TestDatabase Fixes googleapis#1261
f8e0717
to
7fc3f3f
Compare
07a00f9
to
70eeaef
Compare
Implements x-goog-spanner-request-id propagation for end-to-end debugging.
Fixes #1261