diff --git a/google/cloud/spanner_v1/_helpers.py b/google/cloud/spanner_v1/_helpers.py index 27e53200ed..7d471b1bec 100644 --- a/google/cloud/spanner_v1/_helpers.py +++ b/google/cloud/spanner_v1/_helpers.py @@ -32,10 +32,11 @@ from google.cloud.spanner_v1 import TypeCode from google.cloud.spanner_v1 import ExecuteSqlRequest from google.cloud.spanner_v1 import JsonObject -from google.cloud.spanner_v1.request_id_header import with_request_id +from google.cloud.spanner_v1.request_id_header import REQ_ID_HEADER_KEY, with_request_id from google.rpc.error_details_pb2 import RetryInfo import random +from typing import Callable # Validation error messages NUMERIC_MAX_SCALE_ERR_MSG = ( @@ -641,6 +642,85 @@ def __radd__(self, n): """ return self.__add__(n) + def reset(self): + with self.__lock: + self.__value = 0 + def _metadata_with_request_id(*args, **kwargs): return with_request_id(*args, **kwargs) + + +patched = {} + + +def inject_retry_header_control(api): + # For each method, add an _attempt value that'll then be + # retrieved for each retry. + # 1. Patch the __getattribute__ method to match items in our manifest. + target = type(api) + hex_id = hex(id(target)) + if patched.get(hex_id, None) is not None: + return + + orig_getattribute = getattr(target, "__getattribute__") + + def patched_getattribute(obj, key, *args, **kwargs): + if key.startswith("_"): + return orig_getattribute(obj, key, *args, **kwargs) + + attr = orig_getattribute(obj, key, *args, **kwargs) + print("args", args, "attr.dir", dir(attr)) + + # 0. If we already patched it, we can return immediately. + if getattr(attr, "_patched", None) is not None: + return attr + + # 1. Skip over non-methods. + if not callable(attr): + return attr + + # 2. Skip modifying private and mangled methods. + mangled_or_private = attr.__name__.startswith("_") + if mangled_or_private: + return attr + + print("\033[35mattr", attr, "hex_id", hex(id(attr)), "\033[00m") + + # 3. Wrap the callable attribute and then capture its metadata keyed argument. + def wrapped_attr(*args, **kwargs): + metadata = kwargs.get("metadata", []) + if not metadata: + # Increment the reinvocation count. + print("not metatadata", attr.__name__) + wrapped_attr._attempt += 1 + return attr(*args, **kwargs) + + # 4. Find all the headers that match the target header key. + all_metadata = [] + for key, value in metadata: + if key is REQ_ID_HEADER_KEY: + print("key", key, "value", value, "attempt", wrapped_attr._attempt) + # 5. Increment the original_attempt with that of our re-invocation count. + splits = value.split(".") + hdr_attempt_plus_reinvocation = ( + int(splits[-1]) + wrapped_attr._attempt + ) + splits[-1] = str(hdr_attempt_plus_reinvocation) + value = ".".join(splits) + + all_metadata.append((key, value)) + + # Increment the reinvocation count. + wrapped_attr._attempt += 1 + + kwargs["metadata"] = all_metadata + print("\033[34mwrap_callable", hex(id(attr)), attr.__name__, "\033[00m") + return attr(*args, **kwargs) + + wrapped_attr._attempt = 0 + wrapped_attr._patched = True + return wrapped_attr + + setattr(target, "__getattribute__", patched_getattribute) + patched[hex_id] = True diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 6a9f1f48f5..9845350645 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -230,7 +230,11 @@ def commit( method = functools.partial( api.commit, request=request, - metadata=metadata, + metadata=database.metadata_with_request_id( + database._next_nth_request, + 1, + metadata, + ), ) deadline = time.time() + kwargs.get( "timeout_secs", DEFAULT_RETRY_TIMEOUT_SECS @@ -352,7 +356,11 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals method = functools.partial( api.batch_write, request=request, - metadata=metadata, + metadata=database.metadata_with_request_id( + database._next_nth_request, + 1, + metadata, + ), ) response = _retry( method, diff --git a/google/cloud/spanner_v1/client.py b/google/cloud/spanner_v1/client.py index afe6264717..58c2e1f552 100644 --- a/google/cloud/spanner_v1/client.py +++ b/google/cloud/spanner_v1/client.py @@ -48,6 +48,7 @@ from google.cloud.spanner_v1._helpers import _merge_query_options from google.cloud.spanner_v1._helpers import _metadata_with_prefix from google.cloud.spanner_v1.instance import Instance +from google.cloud.spanner_v1._helpers import AtomicCounter _CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__) EMULATOR_ENV_VAR = "SPANNER_EMULATOR_HOST" @@ -147,6 +148,8 @@ class Client(ClientWithProject): SCOPE = (SPANNER_ADMIN_SCOPE,) """The scopes required for Google Cloud Spanner.""" + NTH_CLIENT = AtomicCounter() + def __init__( self, project=None, @@ -199,6 +202,12 @@ def __init__( self._route_to_leader_enabled = route_to_leader_enabled self._directed_read_options = directed_read_options self._observability_options = observability_options + self._nth_client_id = Client.NTH_CLIENT.increment() + self._nth_request = AtomicCounter(0) + + @property + def _next_nth_request(self): + return self._nth_request.increment() @property def credentials(self): diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 963debdab8..b03268e0d8 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -50,8 +50,11 @@ from google.cloud.spanner_v1 import SpannerClient from google.cloud.spanner_v1._helpers import _merge_query_options from google.cloud.spanner_v1._helpers import ( + AtomicCounter, _metadata_with_prefix, _metadata_with_leader_aware_routing, + _metadata_with_request_id, + inject_retry_header_control, ) from google.cloud.spanner_v1.batch import Batch from google.cloud.spanner_v1.batch import MutationGroups @@ -149,6 +152,9 @@ class Database(object): _spanner_api: SpannerClient = None + __transport_lock = threading.Lock() + __transports_to_channel_id = dict() + def __init__( self, database_id, @@ -183,6 +189,7 @@ def __init__( self._reconciling = False self._directed_read_options = self._instance._client.directed_read_options self._proto_descriptors = proto_descriptors + self._channel_id = 0 # It'll be created when _spanner_api is created. if pool is None: pool = BurstyPool(database_role=database_role) @@ -421,6 +428,15 @@ def logger(self): @property def spanner_api(self): + """Helper for session-related API calls.""" + api = self.__generate_spanner_api() + if not api: + return api + + inject_retry_header_control(api) + return api + + def __generate_spanner_api(self): """Helper for session-related API calls.""" if self._spanner_api is None: client_info = self._instance._client._client_info @@ -441,8 +457,26 @@ def spanner_api(self): client_info=client_info, client_options=client_options, ) + + with self.__transport_lock: + transport = self._spanner_api._transport + channel_id = self.__transports_to_channel_id.get(transport, None) + if channel_id is None: + channel_id = len(self.__transports_to_channel_id) + 1 + self.__transports_to_channel_id[transport] = channel_id + self._channel_id = channel_id + return self._spanner_api + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + def __eq__(self, other): if not isinstance(other, self.__class__): return NotImplemented @@ -485,7 +519,10 @@ def create(self): database_dialect=self._database_dialect, proto_descriptors=self._proto_descriptors, ) - future = api.create_database(request=request, metadata=metadata) + future = api.create_database( + request=request, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) return future def exists(self): @@ -501,7 +538,12 @@ def exists(self): metadata = _metadata_with_prefix(self.name) try: - api.get_database_ddl(database=self.name, metadata=metadata) + api.get_database_ddl( + database=self.name, + metadata=self.metadata_with_request_id( + self._next_nth_request, 1, metadata + ), + ) except NotFound: return False return True @@ -518,10 +560,16 @@ def reload(self): """ api = self._instance._client.database_admin_api metadata = _metadata_with_prefix(self.name) - response = api.get_database_ddl(database=self.name, metadata=metadata) + response = api.get_database_ddl( + database=self.name, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) self._ddl_statements = tuple(response.statements) self._proto_descriptors = response.proto_descriptors - response = api.get_database(name=self.name, metadata=metadata) + response = api.get_database( + name=self.name, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) self._state = DatabasePB.State(response.state) self._create_time = response.create_time self._restore_info = response.restore_info @@ -566,7 +614,10 @@ def update_ddl(self, ddl_statements, operation_id="", proto_descriptors=None): proto_descriptors=proto_descriptors, ) - future = api.update_database_ddl(request=request, metadata=metadata) + future = api.update_database_ddl( + request=request, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) return future def update(self, fields): @@ -604,7 +655,9 @@ def update(self, fields): metadata = _metadata_with_prefix(self.name) future = api.update_database( - database=database_pb, update_mask=field_mask, metadata=metadata + database=database_pb, + update_mask=field_mask, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), ) return future @@ -617,7 +670,10 @@ def drop(self): """ api = self._instance._client.database_admin_api metadata = _metadata_with_prefix(self.name) - api.drop_database(database=self.name, metadata=metadata) + api.drop_database( + database=self.name, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) def execute_partitioned_dml( self, @@ -698,6 +754,12 @@ def execute_partitioned_dml( _metadata_with_leader_aware_routing(self._route_to_leader_enabled) ) + begin_txn_nth_request = self._next_nth_request + begin_txn_attempt = AtomicCounter(0) + partial_nth_request = self._next_nth_request + # partial_attempt will be incremented inside _restart_on_unavailable. + partial_attempt = AtomicCounter(0) + def execute_pdml(): with trace_call( "CloudSpanner.Database.execute_partitioned_pdml", @@ -706,7 +768,13 @@ def execute_pdml(): with SessionCheckout(self._pool) as session: add_span_event(span, "Starting BeginTransaction") txn = api.begin_transaction( - session=session.name, options=txn_options, metadata=metadata + session=session.name, + options=txn_options, + metadata=self.metadata_with_request_id( + begin_txn_nth_request, + begin_txn_attempt.increment(), + metadata, + ), ) txn_selector = TransactionSelector(id=txn.id) @@ -721,7 +789,9 @@ def execute_pdml(): ) method = functools.partial( api.execute_streaming_sql, - metadata=metadata, + metadata=self.metadata_with_request_id( + partial_nth_request, partial_attempt.increment(), metadata + ), ) iterator = _restart_on_unavailable( @@ -739,6 +809,16 @@ def execute_pdml(): return _retry_on_aborted(execute_pdml, DEFAULT_RETRY_BACKOFF)() + @property + def _next_nth_request(self): + if self._instance and self._instance._client: + return self._instance._client._next_nth_request + return 1 + + @property + def _nth_client_id(self): + return self._instance._client._nth_client_id + def session(self, labels=None, database_role=None): """Factory to create a session for this database. @@ -950,7 +1030,7 @@ def restore(self, source): ) future = api.restore_database( request=request, - metadata=metadata, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), ) return future @@ -1019,7 +1099,10 @@ def list_database_roles(self, page_size=None): parent=self.name, page_size=page_size, ) - return api.list_database_roles(request=request, metadata=metadata) + return api.list_database_roles( + request=request, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) def table(self, table_id): """Factory to create a table object within this database. @@ -1103,7 +1186,10 @@ def get_iam_policy(self, policy_version=None): requested_policy_version=policy_version ), ) - response = api.get_iam_policy(request=request, metadata=metadata) + response = api.get_iam_policy( + request=request, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) return response def set_iam_policy(self, policy): @@ -1125,7 +1211,10 @@ def set_iam_policy(self, policy): resource=self.name, policy=policy, ) - response = api.set_iam_policy(request=request, metadata=metadata) + response = api.set_iam_policy( + request=request, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) return response @property diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 596f76a1f1..85a2ba4fef 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -253,7 +253,9 @@ def bind(self, database): ) resp = api.batch_create_sessions( request=request, - metadata=metadata, + metadata=database.metadata_with_request_id( + database._next_nth_request, 1, metadata + ), ) add_span_event( @@ -557,9 +559,13 @@ def bind(self, database): while returned_session_count < self.size: resp = api.batch_create_sessions( request=request, - metadata=metadata, + metadata=database.metadata_with_request_id( + database._next_nth_request, 1, metadata + ), ) + print("resp.PingingPool", resp) + add_span_event( span, f"Created {len(resp.session)} sessions", diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index ccc0c4ebdc..e74d18ea12 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -168,7 +168,9 @@ def create(self): ): session_pb = api.create_session( request=request, - metadata=metadata, + metadata=self._database.metadata_with_request_id( + self._database._next_nth_request, 1, metadata + ), ) self._session_id = session_pb.name.split("/")[-1] @@ -193,7 +195,8 @@ def exists(self): current_span, "Checking if Session exists", {"session.id": self._session_id} ) - api = self._database.spanner_api + database = self._database + api = database.spanner_api metadata = _metadata_with_prefix(self._database.name) if self._database._route_to_leader_enabled: metadata.append( @@ -207,7 +210,12 @@ def exists(self): "CloudSpanner.GetSession", self, observability_options=observability_options ) as span: try: - api.get_session(name=self.name, metadata=metadata) + api.get_session( + name=self.name, + metadata=database.metadata_with_request_id( + database._next_nth_request, 1, metadata + ), + ) if span: span.set_attribute("session_found", True) except NotFound: @@ -237,8 +245,11 @@ def delete(self): current_span, "Deleting Session", {"session.id": self._session_id} ) - api = self._database.spanner_api - metadata = _metadata_with_prefix(self._database.name) + database = self._database + api = database.spanner_api + metadata = database.metadata_with_request_id( + database._next_nth_request, 1, _metadata_with_prefix(database.name) + ) observability_options = getattr(self._database, "observability_options", None) with trace_call( "CloudSpanner.DeleteSession", @@ -258,10 +269,18 @@ def ping(self): """ if self._session_id is None: raise ValueError("Session ID not set by back-end") - api = self._database.spanner_api - metadata = _metadata_with_prefix(self._database.name) + database = self._database + api = database.spanner_api + database = self._database request = ExecuteSqlRequest(session=self.name, sql="SELECT 1") - api.execute_sql(request=request, metadata=metadata) + api.execute_sql( + request=request, + metadata=database.metadata_with_request_id( + database._next_nth_request, + 1, + _metadata_with_prefix(database.name), + ), + ) self._last_use_time = datetime.now() def snapshot(self, **kw): diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index f9edbe96fa..78c3e4fe69 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -38,6 +38,7 @@ _retry, _check_rst_stream_error, _SessionWrapper, + AtomicCounter, ) from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from google.cloud.spanner_v1.streamed import StreamedResultSet @@ -323,10 +324,14 @@ def read( data_boost_enabled=data_boost_enabled, directed_read_options=directed_read_options, ) + + nth_request = getattr(database, "_next_nth_request", 0) + all_metadata = database.metadata_with_request_id(nth_request, 1, metadata) + restart = functools.partial( api.streaming_read, request=request, - metadata=metadata, + metadata=all_metadata, retry=retry, timeout=timeout, ) @@ -539,13 +544,24 @@ def execute_sql( data_boost_enabled=data_boost_enabled, directed_read_options=directed_read_options, ) - restart = functools.partial( - api.execute_streaming_sql, - request=request, - metadata=metadata, - retry=retry, - timeout=timeout, - ) + + nth_request = getattr(database, "_next_nth_request", 0) + if not isinstance(nth_request, int): + raise Exception(f"failed to get an integer back: {nth_request}") + + attempt = AtomicCounter(0) + + def wrapped_restart(*args, **kwargs): + restart = functools.partial( + api.execute_streaming_sql, + request=request, + metadata=database.metadata_with_request_id( + nth_request, attempt.increment(), metadata + ), + retry=retry, + timeout=timeout, + ) + return restart(*args, **kwargs) trace_attributes = {"db.statement": sql} observability_options = getattr(database, "observability_options", None) @@ -554,7 +570,7 @@ def execute_sql( # lock is added to handle the inline begin for first rpc with self._lock: return self._get_streamed_result_set( - restart, + wrapped_restart, request, trace_attributes, column_info, @@ -563,7 +579,7 @@ def execute_sql( ) else: return self._get_streamed_result_set( - restart, + wrapped_restart, request, trace_attributes, column_info, @@ -690,13 +706,16 @@ def partition_read( extra_attributes=trace_attributes, observability_options=getattr(database, "observability_options", None), ): + nth_request = getattr(database, "_next_nth_request", 0) + all_metadata = database.metadata_with_request_id(nth_request, 1, metadata) method = functools.partial( api.partition_read, request=request, - metadata=metadata, + metadata=all_metadata, retry=retry, timeout=timeout, ) + response = _retry( method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, @@ -793,13 +812,16 @@ def partition_query( trace_attributes, observability_options=getattr(database, "observability_options", None), ): + nth_request = getattr(database, "_next_nth_request", 0) + all_metadata = database.metadata_with_request_id(nth_request, 1, metadata) method = functools.partial( api.partition_query, request=request, - metadata=metadata, + metadata=all_metadata, retry=retry, timeout=timeout, ) + response = _retry( method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, @@ -939,12 +961,15 @@ def begin(self): self._session, observability_options=getattr(database, "observability_options", None), ): + nth_request = getattr(database, "_next_nth_request", 0) + all_metadata = database.metadata_with_request_id(nth_request, 1, metadata) method = functools.partial( api.begin_transaction, session=self._session.name, options=txn_selector.begin, - metadata=metadata, + metadata=all_metadata, ) + response = _retry( method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, diff --git a/google/cloud/spanner_v1/testing/database_test.py b/google/cloud/spanner_v1/testing/database_test.py index 54afda11e0..2c8c651bfc 100644 --- a/google/cloud/spanner_v1/testing/database_test.py +++ b/google/cloud/spanner_v1/testing/database_test.py @@ -25,7 +25,9 @@ from google.cloud.spanner_v1.testing.interceptors import ( MethodCountInterceptor, MethodAbortInterceptor, + XGoogRequestIDHeaderInterceptor, ) +from google.cloud.spanner_v1._helpers import inject_retry_header_control class TestDatabase(Database): @@ -34,6 +36,8 @@ class TestDatabase(Database): currently, and we don't want to make changes in the Database class for testing purpose as this is a hack to use interceptors in tests.""" + _interceptors = [] + def __init__( self, database_id, @@ -67,6 +71,14 @@ def __init__( @property def spanner_api(self): + api = self.__generate_spanner_api() + if not api: + return api + + inject_retry_header_control(api) + return api + + def __generate_spanner_api(self): """Helper for session-related API calls.""" if self._spanner_api is None: client = self._instance._client @@ -74,6 +86,8 @@ def spanner_api(self): client_options = client._client_options if self._instance.emulator_host is not None: channel = grpc.insecure_channel(self._instance.emulator_host) + self._x_goog_request_id_interceptor = XGoogRequestIDHeaderInterceptor() + self._interceptors.append(self._x_goog_request_id_interceptor) channel = grpc.intercept_channel(channel, *self._interceptors) transport = SpannerGrpcTransport(channel=channel) self._spanner_api = SpannerClient( @@ -110,3 +124,7 @@ def _create_spanner_client_for_tests(self, client_options, credentials): client_options=client_options, transport=transport, ) + + def reset(self): + if self._x_goog_request_id_interceptor: + self._x_goog_request_id_interceptor.reset() diff --git a/google/cloud/spanner_v1/testing/interceptors.py b/google/cloud/spanner_v1/testing/interceptors.py index a8b015a87d..4fe4ed147d 100644 --- a/google/cloud/spanner_v1/testing/interceptors.py +++ b/google/cloud/spanner_v1/testing/interceptors.py @@ -13,6 +13,8 @@ # limitations under the License. from collections import defaultdict +import threading + from grpc_interceptor import ClientInterceptor from google.api_core.exceptions import Aborted @@ -63,3 +65,67 @@ def reset(self): self._method_to_abort = None self._count = 0 self._connection = None + + +X_GOOG_REQUEST_ID = "x-goog-spanner-request-id" + + +class XGoogRequestIDHeaderInterceptor(ClientInterceptor): + def __init__(self): + self._unary_req_segments = [] + self._stream_req_segments = [] + self.__lock = threading.Lock() + + def intercept(self, method, request_or_iterator, call_details): + metadata = call_details.metadata + x_goog_request_id = None + for key, value in metadata: + if key == X_GOOG_REQUEST_ID: + x_goog_request_id = value + break + + if not x_goog_request_id: + raise Exception( + f"Missing {X_GOOG_REQUEST_ID} header in {call_details.method}" + ) + + response_or_iterator = method(request_or_iterator, call_details) + streaming = getattr(response_or_iterator, "__iter__", None) is not None + with self.__lock: + if streaming: + self._stream_req_segments.append( + (call_details.method, parse_request_id(x_goog_request_id)) + ) + else: + self._unary_req_segments.append( + (call_details.method, parse_request_id(x_goog_request_id)) + ) + + return response_or_iterator + + @property + def unary_request_ids(self): + return self._unary_req_segments + + @property + def stream_request_ids(self): + return self._stream_req_segments + + def reset(self): + self._stream_req_segments.clear() + self._unary_req_segments.clear() + + +def parse_request_id(request_id_str): + splits = request_id_str.split(".") + version, rand_process_id, client_id, channel_id, nth_request, nth_attempt = list( + map(lambda v: int(v), splits) + ) + return ( + version, + rand_process_id, + client_id, + channel_id, + nth_request, + nth_attempt, + ) diff --git a/google/cloud/spanner_v1/testing/mock_spanner.py b/google/cloud/spanner_v1/testing/mock_spanner.py index f60dbbe72a..e2ac14e976 100644 --- a/google/cloud/spanner_v1/testing/mock_spanner.py +++ b/google/cloud/spanner_v1/testing/mock_spanner.py @@ -22,8 +22,6 @@ from google.cloud.spanner_v1 import ( TransactionOptions, ResultSetMetadata, - ExecuteSqlRequest, - ExecuteBatchDmlRequest, ) from google.cloud.spanner_v1.testing.mock_database_admin import DatabaseAdminServicer import google.cloud.spanner_v1.testing.spanner_database_admin_pb2_grpc as database_admin_grpc @@ -55,6 +53,7 @@ def pop_error(self, context): name = inspect.currentframe().f_back.f_code.co_name error: _Status | None = self.errors.pop(name, None) if error: + print("context.abort_with_status", error) context.abort_with_status(error) def get_result_as_partial_result_sets( @@ -107,6 +106,7 @@ def CreateSession(self, request, context): def BatchCreateSessions(self, request, context): self._requests.append(request) + self.mock_spanner.pop_error(context) sessions = [] for i in range(request.session_count): sessions.append( @@ -186,9 +186,7 @@ def BeginTransaction(self, request, context): self._requests.append(request) return self.__create_transaction(request.session, request.options) - def __maybe_create_transaction( - self, request: ExecuteSqlRequest | ExecuteBatchDmlRequest - ): + def __maybe_create_transaction(self, request): started_transaction = None if not request.transaction.begin == TransactionOptions(): started_transaction = self.__create_transaction( diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index cc59789248..7f15c1ff00 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -30,6 +30,7 @@ from google.cloud.spanner_v1 import ExecuteSqlRequest from google.cloud.spanner_v1 import TransactionSelector from google.cloud.spanner_v1 import TransactionOptions +from google.cloud.spanner_v1._helpers import AtomicCounter from google.cloud.spanner_v1.snapshot import _SnapshotBase from google.cloud.spanner_v1.batch import _BatchBase from google.cloud.spanner_v1._opentelemetry_tracing import add_span_event, trace_call @@ -161,12 +162,20 @@ def begin(self): self._session, observability_options=observability_options, ) as span: - method = functools.partial( - api.begin_transaction, - session=self._session.name, - options=txn_options, - metadata=metadata, - ) + attempt = AtomicCounter(0) + nth_request = database._next_nth_request + + def wrapped_method(*args, **kwargs): + attempt.increment() + method = functools.partial( + api.begin_transaction, + session=self._session.name, + options=txn_options, + metadata=database.metadata_with_request_id( + nth_request, attempt.value, metadata + ), + ) + return method(*args, **kwargs) def beforeNextRetry(nthRetry, delayInSeconds): add_span_event( @@ -176,7 +185,7 @@ def beforeNextRetry(nthRetry, delayInSeconds): ) response = _retry( - method, + wrapped_method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, beforeNextRetry=beforeNextRetry, ) @@ -197,22 +206,33 @@ def rollback(self): database._route_to_leader_enabled ) ) + observability_options = getattr(database, "observability_options", None) with trace_call( f"CloudSpanner.{type(self).__name__}.rollback", self._session, observability_options=observability_options, ): - method = functools.partial( - api.rollback, - session=self._session.name, - transaction_id=self._transaction_id, - metadata=metadata, - ) + attempt = AtomicCounter(0) + nth_request = database._next_nth_request + + def wrapped_method(*args, **kwargs): + attempt.increment() + method = functools.partial( + api.rollback, + session=self._session.name, + transaction_id=self._transaction_id, + metadata=database.metadata_with_request_id( + nth_request, attempt.value, metadata + ), + ) + return method(*args, **kwargs) + _retry( - method, + wrapped_method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, ) + self.rolled_back = True del self._session._transaction @@ -287,11 +307,19 @@ def commit( add_span_event(span, "Starting Commit") - method = functools.partial( - api.commit, - request=request, - metadata=metadata, - ) + attempt = AtomicCounter(0) + nth_request = database._next_nth_request + + def wrapped_method(*args, **kwargs): + attempt.increment() + method = functools.partial( + api.commit, + request=request, + metadata=database.metadata_with_request_id( + nth_request, attempt.value, metadata + ), + ) + return method(*args, **kwargs) def beforeNextRetry(nthRetry, delayInSeconds): add_span_event( @@ -301,7 +329,7 @@ def beforeNextRetry(nthRetry, delayInSeconds): ) response = _retry( - method, + wrapped_method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, beforeNextRetry=beforeNextRetry, ) @@ -435,19 +463,27 @@ def execute_update( request_options=request_options, ) - method = functools.partial( - api.execute_sql, - request=request, - metadata=metadata, - retry=retry, - timeout=timeout, - ) + nth_request = database._next_nth_request + attempt = AtomicCounter(0) + + def wrapped_method(*args, **kwargs): + attempt.increment() + method = functools.partial( + api.execute_sql, + request=request, + metadata=database.metadata_with_request_id( + nth_request, attempt.value, metadata + ), + retry=retry, + timeout=timeout, + ) + return method(*args, **kwargs) if self._transaction_id is None: # lock is added to handle the inline begin for first rpc with self._lock: response = self._execute_request( - method, + wrapped_method, request, f"CloudSpanner.{type(self).__name__}.execute_update", self._session, @@ -464,7 +500,7 @@ def execute_update( self._transaction_id = response.metadata.transaction.id else: response = self._execute_request( - method, + wrapped_method, request, f"CloudSpanner.{type(self).__name__}.execute_update", self._session, @@ -560,19 +596,27 @@ def batch_update( request_options=request_options, ) - method = functools.partial( - api.execute_batch_dml, - request=request, - metadata=metadata, - retry=retry, - timeout=timeout, - ) + nth_request = database._next_nth_request + attempt = AtomicCounter(0) + + def wrapped_method(*args, **kwargs): + attempt.increment() + method = functools.partial( + api.execute_batch_dml, + request=request, + metadata=database.metadata_with_request_id( + nth_request, attempt.value, metadata + ), + retry=retry, + timeout=timeout, + ) + return method(*args, **kwargs) if self._transaction_id is None: # lock is added to handle the inline begin for first rpc with self._lock: response = self._execute_request( - method, + wrapped_method, request, "CloudSpanner.DMLTransaction", self._session, @@ -590,7 +634,7 @@ def batch_update( break else: response = self._execute_request( - method, + wrapped_method, request, "CloudSpanner.DMLTransaction", self._session, diff --git a/tests/mockserver_tests/mock_server_test_base.py b/tests/mockserver_tests/mock_server_test_base.py index b332c88d7c..2f89415b55 100644 --- a/tests/mockserver_tests/mock_server_test_base.py +++ b/tests/mockserver_tests/mock_server_test_base.py @@ -20,6 +20,7 @@ start_mock_server, SpannerServicer, ) +from google.cloud.spanner_v1.client import Client import google.cloud.spanner_v1.types.type as spanner_type import google.cloud.spanner_v1.types.result_set as result_set from google.api_core.client_options import ClientOptions @@ -78,6 +79,27 @@ def unavailable_status() -> _Status: return status +# Creates an UNAVAILABLE status with the smallest possible retry delay. +def unavailable_status() -> _Status: + error = status_pb2.Status( + code=code_pb2.UNAVAILABLE, + message="Service unavailable.", + ) + retry_info = RetryInfo(retry_delay=Duration(seconds=0, nanos=1)) + status = _Status( + code=code_to_grpc_status_code(error.code), + details=error.message, + trailing_metadata=( + ("grpc-status-details-bin", error.SerializeToString()), + ( + "google.rpc.retryinfo-bin", + retry_info.SerializeToString(), + ), + ), + ) + return status + + def add_error(method: str, error: status_pb2.Status): MockServerTestBase.spanner_service.mock_spanner.add_error(method, error) @@ -153,6 +175,7 @@ def setup_class(cls): def teardown_class(cls): if MockServerTestBase.server is not None: MockServerTestBase.server.stop(grace=None) + Client.NTH_CLIENT.reset() MockServerTestBase.server = None def setup_method(self, *args, **kwargs): @@ -186,6 +209,8 @@ def instance(self) -> Instance: def database(self) -> Database: if self._database is None: self._database = self.instance.database( - "test-database", pool=FixedSizePool(size=10) + "test-database", + pool=FixedSizePool(size=10), + enable_interceptors_in_tests=True, ) return self._database diff --git a/tests/mockserver_tests/test_request_id_header.py b/tests/mockserver_tests/test_request_id_header.py new file mode 100644 index 0000000000..29d2df29b2 --- /dev/null +++ b/tests/mockserver_tests/test_request_id_header.py @@ -0,0 +1,363 @@ +# Copyright 2024 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import threading + +from google.cloud.spanner_v1 import ( + BatchCreateSessionsRequest, + BeginTransactionRequest, + ExecuteSqlRequest, +) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID +from google.cloud.spanner_v1.testing.mock_spanner import SpannerServicer +from tests.mockserver_tests.mock_server_test_base import ( + MockServerTestBase, + add_select1_result, + aborted_status, + add_error, + unavailable_status, +) + + +class TestRequestIDHeader(MockServerTestBase): + def tearDown(self): + self.database._x_goog_request_id_interceptor.reset() + + def test_snapshot_execute_sql(self): + add_select1_result() + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + with self.database.snapshot() as snapshot: + results = snapshot.execute_sql("select 1") + result_list = [] + for row in results: + result_list.append(row) + self.assertEqual(1, row[0]) + self.assertEqual(1, len(result_list)) + + requests = self.spanner_service.requests + self.assertEqual(2, len(requests), msg=requests) + self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[1], ExecuteSqlRequest)) + + NTH_CLIENT = self.database._nth_client_id + CHANNEL_ID = self.database._channel_id + # Now ensure monotonicity of the received request-id segments. + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 1), + ) + ] + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 1), + ) + ] + + assert got_unary_segments == want_unary_segments + assert got_stream_segments == want_stream_segments + + def test_snapshot_read_concurrent(self): + db = self.database + # Trigger BatchCreateSessions firstly. + with db.snapshot() as snapshot: + rows = snapshot.execute_sql("select 1") + for row in rows: + _ = row + + # The other requests can then proceed. + def select1(): + with db.snapshot() as snapshot: + rows = snapshot.execute_sql("select 1") + res_list = [] + for row in rows: + self.assertEqual(1, row[0]) + res_list.append(row) + self.assertEqual(1, len(res_list)) + + n = 10 + threads = [] + for i in range(n): + th = threading.Thread(target=select1, name=f"snapshot-select1-{i}") + th.run() + threads.append(th) + + random.shuffle(threads) + + while True: + n_finished = 0 + for thread in threads: + if thread.is_alive(): + thread.join() + else: + n_finished += 1 + + if n_finished == len(threads): + break + + requests = self.spanner_service.requests + self.assertEqual(2 + n * 2, len(requests), msg=requests) + + client_id = db._nth_client_id + channel_id = db._channel_id + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 1, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 3, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 5, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 7, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 9, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 11, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 13, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 15, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 17, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 19, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 21, 1), + ), + ] + assert got_unary_segments == want_unary_segments + + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 2, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 4, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 6, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 8, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 10, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 12, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 14, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 16, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 18, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 20, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 22, 1), + ), + ] + assert got_stream_segments == want_stream_segments + + def test_database_run_in_transaction_retries_on_abort(self): + counters = dict(aborted=0) + want_failed_attempts = 2 + + def select_in_txn(txn): + results = txn.execute_sql("select 1") + for row in results: + _ = row + + if counters["aborted"] < want_failed_attempts: + counters["aborted"] += 1 + add_error(SpannerServicer.Commit.__name__, aborted_status()) + + add_select1_result() + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + + self.database.run_in_transaction(select_in_txn) + + def test_database_execute_partitioned_dml_request_id(self): + add_select1_result() + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + _ = self.database.execute_partitioned_dml("select 1") + + requests = self.spanner_service.requests + self.assertEqual(3, len(requests), msg=requests) + self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[1], BeginTransactionRequest)) + self.assertTrue(isinstance(requests[2], ExecuteSqlRequest)) + + # Now ensure monotonicity of the received request-id segments. + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + NTH_CLIENT = self.database._nth_client_id + CHANNEL_ID = self.database._channel_id + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 1), + ), + ( + "/google.spanner.v1.Spanner/BeginTransaction", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 1), + ), + ] + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 3, 1), + ) + ] + + assert got_unary_segments == want_unary_segments + assert got_stream_segments == want_stream_segments + + def test_unary_retryable_error(self): + add_select1_result() + add_error(SpannerServicer.BatchCreateSessions.__name__, unavailable_status()) + + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + with self.database.snapshot() as snapshot: + results = snapshot.execute_sql("select 1") + result_list = [] + for row in results: + result_list.append(row) + self.assertEqual(1, row[0]) + self.assertEqual(1, len(result_list)) + + requests = self.spanner_service.requests + self.assertEqual(3, len(requests), msg=requests) + self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[1], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[2], ExecuteSqlRequest)) + + NTH_CLIENT = self.database._nth_client_id + CHANNEL_ID = self.database._channel_id + # Now ensure monotonicity of the received request-id segments. + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 1), + ), + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 2), + ), + ] + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 1), + ) + ] + + print("got_unaries", got_unary_segments) + assert got_unary_segments == want_unary_segments + assert got_stream_segments == want_stream_segments + + def test_streaming_retryable_error(self): + add_select1_result() + add_error(SpannerServicer.ExecuteStreamingSql.__name__, unavailable_status()) + add_error(SpannerServicer.ExecuteStreamingSql.__name__, unavailable_status()) + + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + with self.database.snapshot() as snapshot: + results = snapshot.execute_sql("select 1") + result_list = [] + for row in results: + result_list.append(row) + self.assertEqual(1, row[0]) + self.assertEqual(1, len(result_list)) + + requests = self.spanner_service.requests + self.assertEqual(3, len(requests), msg=requests) + self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[1], ExecuteSqlRequest)) + self.assertTrue(isinstance(requests[2], ExecuteSqlRequest)) + + NTH_CLIENT = self.database._nth_client_id + CHANNEL_ID = self.database._channel_id + # Now ensure monotonicity of the received request-id segments. + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 1), + ), + ] + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 2), + ), + ] + + assert got_unary_segments == want_unary_segments + assert got_stream_segments == want_stream_segments + + def canonicalize_request_id_headers(self): + src = self.database._x_goog_request_id_interceptor + return src._stream_req_segments, src._unary_req_segments diff --git a/tests/unit/test_atomic_counter.py b/tests/unit/test_atomic_counter.py index 92d10cac79..e8d8b6b7ce 100644 --- a/tests/unit/test_atomic_counter.py +++ b/tests/unit/test_atomic_counter.py @@ -15,6 +15,7 @@ import random import threading import unittest + from google.cloud.spanner_v1._helpers import AtomicCounter diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index eb5069b497..26a8210918 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -21,6 +21,10 @@ enrich_with_otel_scope, ) from google.cloud.spanner_v1 import RequestOptions +from google.cloud.spanner_v1._helpers import ( + _metadata_with_request_id, +) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID TABLE_NAME = "citizens" COLUMNS = ["email", "first_name", "last_name", "age"] @@ -256,6 +260,10 @@ def test_commit_ok(self): [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + ), ], ) self.assertEqual(request_options, RequestOptions()) @@ -355,6 +363,10 @@ def _test_commit_with_options( [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), ], ) self.assertEqual(actual_request_options, expected_request_options) @@ -467,6 +479,10 @@ def test_context_mgr_success(self): [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Database.NTH_CLIENT}.1.1.1", + ), ], ) self.assertEqual(request_options, RequestOptions()) @@ -601,6 +617,10 @@ def _test_batch_write_with_request_options( [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + ), ], ) if request_options is None: @@ -654,6 +674,32 @@ def session_id(self): class _Database(object): name = "testing" _route_to_leader_enabled = True + NTH_CLIENT = 1 + + def __init__(self): + self._nth_request = 0 + + @property + def _next_nth_request(self): + self._nth_request += 1 + return self._nth_request + + @property + def _nth_client_id(self): + return 1 + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + + @property + def _channel_id(self): + return 1 class _FauxSpannerAPI: diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 13a37f66fe..e715decdb8 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -26,6 +26,11 @@ from google.protobuf.field_mask_pb2 import FieldMask from google.cloud.spanner_v1 import RequestOptions, DirectedReadOptions +from google.cloud.spanner_v1._helpers import ( + AtomicCounter, + _metadata_with_request_id, +) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID DML_WO_PARAM = """ DELETE FROM citizens @@ -111,7 +116,9 @@ def _make_database_admin_api(): def _make_spanner_api(): from google.cloud.spanner_v1 import SpannerClient - return mock.create_autospec(SpannerClient, instance=True) + api = mock.create_autospec(SpannerClient, instance=True) + api._transport = "transport" + return api def test_ctor_defaults(self): from google.cloud.spanner_v1.pool import BurstyPool @@ -545,7 +552,13 @@ def test_create_grpc_error(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_create_already_exists(self): @@ -572,7 +585,13 @@ def test_create_already_exists(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_create_instance_not_found(self): @@ -598,7 +617,13 @@ def test_create_instance_not_found(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_create_success(self): @@ -634,7 +659,13 @@ def test_create_success(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_create_success_w_encryption_config_dict(self): @@ -671,7 +702,13 @@ def test_create_success_w_encryption_config_dict(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_create_success_w_proto_descriptors(self): @@ -706,7 +743,13 @@ def test_create_success_w_proto_descriptors(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_exists_grpc_error(self): @@ -724,7 +767,13 @@ def test_exists_grpc_error(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_exists_not_found(self): @@ -741,7 +790,13 @@ def test_exists_not_found(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_exists_success(self): @@ -760,7 +815,13 @@ def test_exists_success(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_reload_grpc_error(self): @@ -778,7 +839,13 @@ def test_reload_grpc_error(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_reload_not_found(self): @@ -796,7 +863,13 @@ def test_reload_not_found(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_reload_success(self): @@ -855,11 +928,23 @@ def test_reload_success(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) api.get_database.assert_called_once_with( name=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.2.1", + ), + ], ) def test_update_ddl_grpc_error(self): @@ -885,7 +970,13 @@ def test_update_ddl_grpc_error(self): api.update_database_ddl.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_update_ddl_not_found(self): @@ -911,7 +1002,13 @@ def test_update_ddl_not_found(self): api.update_database_ddl.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_update_ddl(self): @@ -938,7 +1035,13 @@ def test_update_ddl(self): api.update_database_ddl.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_update_ddl_w_operation_id(self): @@ -965,7 +1068,13 @@ def test_update_ddl_w_operation_id(self): api.update_database_ddl.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_update_success(self): @@ -991,7 +1100,13 @@ def test_update_success(self): api.update_database.assert_called_once_with( database=expected_database, update_mask=field_mask, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_update_ddl_w_proto_descriptors(self): @@ -1019,7 +1134,13 @@ def test_update_ddl_w_proto_descriptors(self): api.update_database_ddl.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_drop_grpc_error(self): @@ -1037,7 +1158,13 @@ def test_drop_grpc_error(self): api.drop_database.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_drop_not_found(self): @@ -1055,7 +1182,13 @@ def test_drop_not_found(self): api.drop_database.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_drop_success(self): @@ -1072,7 +1205,13 @@ def test_drop_success(self): api.drop_database.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def _execute_partitioned_dml_helper( @@ -1145,18 +1284,34 @@ def _execute_partitioned_dml_helper( exclude_txn_from_change_streams=exclude_txn_from_change_streams, ) - api.begin_transaction.assert_called_with( - session=session.name, - options=txn_options, - metadata=[ - ("google-cloud-resource-prefix", database.name), - ("x-goog-spanner-route-to-leader", "true"), - ], - ) if retried: self.assertEqual(api.begin_transaction.call_count, 2) + api.begin_transaction.assert_called_with( + session=session.name, + options=txn_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.2", + ), + ], + ) else: self.assertEqual(api.begin_transaction.call_count, 1) + api.begin_transaction.assert_called_with( + session=session.name, + options=txn_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], + ) if params: expected_params = Struct( @@ -1192,6 +1347,10 @@ def _execute_partitioned_dml_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.{database._channel_id}.2.1", + ), ], ) if retried: @@ -1212,6 +1371,10 @@ def _execute_partitioned_dml_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.2.2", + ), ], ) self.assertEqual(api.execute_streaming_sql.call_count, 2) @@ -1486,7 +1649,13 @@ def test_restore_grpc_error(self): api.restore_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_restore_not_found(self): @@ -1512,7 +1681,13 @@ def test_restore_not_found(self): api.restore_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_restore_success(self): @@ -1549,7 +1724,13 @@ def test_restore_success(self): api.restore_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_restore_success_w_encryption_config_dict(self): @@ -1590,7 +1771,13 @@ def test_restore_success_w_encryption_config_dict(self): api.restore_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_restore_w_invalid_encryption_config_dict(self): @@ -1737,7 +1924,13 @@ def test_list_database_roles_grpc_error(self): api.list_database_roles.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) def test_list_database_roles_defaults(self): @@ -1758,7 +1951,13 @@ def test_list_database_roles_defaults(self): api.list_database_roles.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1", + ), + ], ) self.assertIsNotNone(resp) @@ -1845,6 +2044,10 @@ def test_context_mgr_success(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + ), ], ) @@ -1892,6 +2095,10 @@ def test_context_mgr_w_commit_stats_success(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + ), ], ) @@ -1939,6 +2146,10 @@ def test_context_mgr_w_aborted_commit_status(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + ), ], ) @@ -3013,6 +3224,10 @@ def test_context_mgr_success(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1", + ), ], ) @@ -3111,6 +3326,8 @@ def _make_database_admin_api(): class _Client(object): + NTH_CLIENT = AtomicCounter() + def __init__( self, project=TestDatabase.PROJECT_ID, @@ -3129,6 +3346,13 @@ def __init__( self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") self.route_to_leader_enabled = route_to_leader_enabled self.directed_read_options = directed_read_options + self._nth_client_id = _Client.NTH_CLIENT.increment() + self._nth_request = AtomicCounter() + self.credentials = {} + + @property + def _next_nth_request(self): + return self._nth_request.increment() class _Instance(object): @@ -3157,6 +3381,31 @@ def __init__(self, name, instance=None): self.logger = mock.create_autospec(Logger, instance=True) self._directed_read_options = None + @property + def _next_nth_request(self): + if self._instance and self._instance._client: + return self._instance._client._next_nth_request + return 1 + + @property + def _nth_client_id(self): + if self._instance and self._instance._client: + return self._instance._client._nth_client_id + return 1 + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + + @property + def _channel_id(self): + return 1 + class _Pool(object): _bound = None diff --git a/tests/unit/test_pool.py b/tests/unit/test_pool.py index 9b5d2c9885..8e2baf976b 100644 --- a/tests/unit/test_pool.py +++ b/tests/unit/test_pool.py @@ -19,6 +19,11 @@ from datetime import datetime, timedelta import mock +from google.cloud.spanner_v1._helpers import ( + _metadata_with_request_id, + AtomicCounter, +) + from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from tests._helpers import ( OpenTelemetryBase, @@ -1183,6 +1188,9 @@ def session_id(self): class _Database(object): + NTH_REQUEST = AtomicCounter() + NTH_CLIENT_ID = AtomicCounter() + def __init__(self, name): self.name = name self._sessions = [] @@ -1237,6 +1245,27 @@ def session(self, **kwargs): def observability_options(self): return dict(db_name=self.name) + @property + def _next_nth_request(self): + return self.NTH_REQUEST.increment() + + @property + def _nth_client_id(self): + return self.NTH_CLIENT_ID.increment() + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + + @property + def _channel_id(self): + return 1 + class _Queue(object): _size = 1 diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 55c91435f8..d6b7b369db 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -22,6 +22,11 @@ StatusCode, enrich_with_otel_scope, ) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID +from google.cloud.spanner_v1._helpers import ( + _metadata_with_request_id, + AtomicCounter, +) def _make_rpc_error(error_cls, trailing_metadata=None): @@ -66,6 +71,20 @@ def _make_database(name=DATABASE_NAME, database_role=None): database.log_commit_stats = False database.database_role = database_role database._route_to_leader_enabled = True + nth_client_id = AtomicCounter(1) + database.NTH_CLIENT = nth_client_id + next_nth_request = AtomicCounter(0) + + def metadata_with_request_id(nth_request, nth_attempt, prior_metadata=[]): + return _metadata_with_request_id( + nth_client_id.value, + 1, + next_nth_request.increment(), + nth_attempt, + prior_metadata, + ) + + database.metadata_with_request_id = metadata_with_request_id return database @staticmethod @@ -168,6 +187,10 @@ def test_create_w_database_role(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -206,6 +229,10 @@ def test_create_session_span_annotations(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -235,6 +262,10 @@ def test_create_wo_database_role(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -265,6 +296,10 @@ def test_create_ok(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -298,6 +333,10 @@ def test_create_w_labels(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -347,6 +386,10 @@ def test_exists_hit(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -375,6 +418,10 @@ def test_exists_hit_wo_span(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -397,6 +444,10 @@ def test_exists_miss(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -426,6 +477,10 @@ def test_exists_miss_wo_span(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -449,6 +504,10 @@ def test_exists_error(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -483,7 +542,13 @@ def test_ping_hit(self): gax_api.execute_sql.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], ) def test_ping_miss(self): @@ -507,7 +572,13 @@ def test_ping_miss(self): gax_api.execute_sql.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], ) def test_ping_error(self): @@ -531,7 +602,13 @@ def test_ping_error(self): gax_api.execute_sql.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], ) def test_delete_wo_session_id(self): @@ -555,7 +632,13 @@ def test_delete_hit(self): gax_api.delete_session.assert_called_once_with( name=self.SESSION_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], ) attrs = {"session.id": session._session_id, "session.name": session.name} @@ -580,7 +663,13 @@ def test_delete_miss(self): gax_api.delete_session.assert_called_once_with( name=self.SESSION_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], ) attrs = {"session.id": session._session_id, "session.name": session.name} @@ -607,7 +696,13 @@ def test_delete_error(self): gax_api.delete_session.assert_called_once_with( name=self.SESSION_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], ) attrs = {"session.id": session._session_id, "session.name": session.name} @@ -983,6 +1078,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) request = CommitRequest( @@ -996,6 +1095,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -1051,6 +1154,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -1113,10 +1220,25 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], - ) - ] - * 2, + ), + mock.call( + session=self.SESSION_NAME, + options=expected_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.3.1", + ), + ], + ), + ], ) request = CommitRequest( session=self.SESSION_NAME, @@ -1132,10 +1254,24 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), + ], + ), + mock.call( + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.4.1", + ), ], - ) - ] - * 2, + ), + ], ) def test_run_in_transaction_w_abort_w_retry_metadata(self): @@ -1212,10 +1348,25 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], - ) - ] - * 2, + ), + mock.call( + session=self.SESSION_NAME, + options=expected_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.3.1", + ), + ], + ), + ], ) request = CommitRequest( session=self.SESSION_NAME, @@ -1231,10 +1382,24 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), + ], + ), + mock.call( + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.4.1", + ), ], - ) - ] - * 2, + ), + ], ) def test_run_in_transaction_w_callback_raises_abort_wo_metadata(self): @@ -1310,6 +1475,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) request = CommitRequest( @@ -1323,6 +1492,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -1400,6 +1573,10 @@ def _time(_results=[1, 1.5]): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) request = CommitRequest( @@ -1413,6 +1590,10 @@ def _time(_results=[1, 1.5]): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -1471,6 +1652,7 @@ def _time(_results=[1, 2, 4, 8]): self.assertEqual(kw, {}) expected_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) + print("gax_api", gax_api.begin_transaction.call_args_list[2]) self.assertEqual( gax_api.begin_transaction.call_args_list, [ @@ -1480,10 +1662,37 @@ def _time(_results=[1, 2, 4, 8]): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], + ), + mock.call( + session=self.SESSION_NAME, + options=expected_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.3.1", + ), + ], + ), + mock.call( + session=self.SESSION_NAME, + options=expected_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.5.1", + ), ], - ) - ] - * 3, + ), + ], ) request = CommitRequest( session=self.SESSION_NAME, @@ -1499,10 +1708,35 @@ def _time(_results=[1, 2, 4, 8]): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], - ) - ] - * 3, + ), + mock.call( + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.4.1", + ), + ], + ), + mock.call( + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.6.1", + ), + ], + ), + ], ) def test_run_in_transaction_w_commit_stats_success(self): @@ -1562,6 +1796,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) request = CommitRequest( @@ -1576,6 +1814,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], ) database.logger.info.assert_called_once_with( @@ -1632,6 +1874,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) request = CommitRequest( @@ -1646,6 +1892,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], ) database.logger.info.assert_not_called() @@ -1709,6 +1959,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) request = CommitRequest( @@ -1722,6 +1976,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -1785,6 +2043,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) request = CommitRequest( @@ -1798,6 +2060,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -1885,10 +2151,25 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], - ) - ] - * 2, + ), + mock.call( + session=self.SESSION_NAME, + options=expected_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.3.1", + ), + ], + ), + ], ) request = CommitRequest( session=self.SESSION_NAME, @@ -1904,10 +2185,24 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.2.1", + ), ], - ) - ] - * 2, + ), + mock.call( + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.4.1", + ), + ], + ), + ], ) def test_delay_helper_w_no_delay(self): diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index 02cc35e017..1c1539ef84 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -25,6 +25,11 @@ ) from google.cloud.spanner_v1.param_types import INT64 from google.api_core.retry import Retry +from google.cloud.spanner_v1._helpers import ( + AtomicCounter, + _metadata_with_request_id, +) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID TABLE_NAME = "citizens" COLUMNS = ["email", "first_name", "last_name", "age"] @@ -278,7 +283,7 @@ def test_iteration_w_raw_raising_retryable_internal_error(self): fail_after=True, error=InternalServerError( "Received unexpected EOS on DATA frame from server" - ) + ), ) after = _MockIterator(*LAST) request = mock.Mock(test="test", spec=["test", "resume_token"]) @@ -450,7 +455,7 @@ def test_iteration_w_raw_raising_retryable_internal_error_after_token(self): fail_after=True, error=InternalServerError( "Received unexpected EOS on DATA frame from server" - ) + ), ) after = _MockIterator(*SECOND) request = mock.Mock(test="test", spec=["test", "resume_token"]) @@ -767,7 +772,13 @@ def _read_helper( ) api.streaming_read.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), + ], retry=retry, timeout=timeout, ) @@ -1016,7 +1027,13 @@ def _execute_sql_helper( ) api.execute_streaming_sql.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), + ], timeout=timeout, retry=retry, ) @@ -1189,6 +1206,10 @@ def _partition_read_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=retry, timeout=timeout, @@ -1368,6 +1389,10 @@ def _partition_query_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=retry, timeout=timeout, @@ -1764,7 +1789,13 @@ def test_begin_ok_exact_staleness(self): api.begin_transaction.assert_called_once_with( session=session.name, options=expected_txn_options, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), + ], ) self.assertSpanAttributes( @@ -1800,7 +1831,13 @@ def test_begin_ok_exact_strong(self): api.begin_transaction.assert_called_once_with( session=session.name, options=expected_txn_options, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), + ], ) self.assertSpanAttributes( @@ -1811,10 +1848,18 @@ def test_begin_ok_exact_strong(self): class _Client(object): + NTH_CLIENT = AtomicCounter() + def __init__(self): from google.cloud.spanner_v1 import ExecuteSqlRequest self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") + self._nth_client_id = _Client.NTH_CLIENT.increment() + self._nth_request = AtomicCounter() + + @property + def _next_nth_request(self): + return self._nth_request.increment() class _Instance(object): @@ -1833,6 +1878,27 @@ def __init__(self, directed_read_options=None): def observability_options(self): return dict(db_name=self.name) + @property + def _next_nth_request(self): + return self._instance._client._next_nth_request + + @property + def _nth_client_id(self): + return self._instance._client._nth_client_id + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + + @property + def _channel_id(self): + return 1 + class _Session(object): def __init__(self, database=None, name=TestSnapshot.SESSION_NAME): diff --git a/tests/unit/test_spanner.py b/tests/unit/test_spanner.py index ff34a109af..327b7ef49b 100644 --- a/tests/unit/test_spanner.py +++ b/tests/unit/test_spanner.py @@ -37,9 +37,12 @@ from google.cloud.spanner_v1.keyset import KeySet from google.cloud.spanner_v1._helpers import ( + AtomicCounter, _make_value_pb, _merge_query_options, + _metadata_with_request_id, ) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID import mock @@ -517,6 +520,10 @@ def test_transaction_should_include_begin_with_first_update(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -532,6 +539,10 @@ def test_transaction_should_include_begin_with_first_query(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], timeout=TIMEOUT, retry=RETRY, @@ -549,6 +560,10 @@ def test_transaction_should_include_begin_with_first_read(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -565,6 +580,10 @@ def test_transaction_should_include_begin_with_first_batch_update(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -590,6 +609,10 @@ def test_transaction_should_include_begin_w_exclude_txn_from_change_streams_with metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -608,6 +631,10 @@ def test_transaction_should_use_transaction_id_if_error_with_first_batch_update( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -622,6 +649,10 @@ def test_transaction_should_use_transaction_id_if_error_with_first_batch_update( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -638,6 +669,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_query(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -651,6 +686,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_query(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -667,6 +706,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_update(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -680,6 +723,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_update(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -701,6 +748,10 @@ def test_transaction_execute_sql_w_directed_read_options(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -724,6 +775,10 @@ def test_transaction_streaming_read_w_directed_read_options(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -740,6 +795,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_read(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -751,6 +810,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_read(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -767,6 +830,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_batch_update(se metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -779,6 +846,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_batch_update(se metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -819,6 +890,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -829,6 +904,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -837,6 +916,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.3.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -880,6 +963,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.3.1", + ), ], ) @@ -888,6 +975,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -898,6 +989,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -946,6 +1041,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.3.1", + ), ], ) @@ -954,6 +1053,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -964,6 +1067,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -1012,6 +1119,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.3.1", + ), ], ) req = self._execute_sql_expected_request(database) @@ -1020,6 +1131,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -1030,6 +1145,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -1048,18 +1167,32 @@ def test_transaction_should_execute_sql_with_route_to_leader_disabled(self): api.execute_streaming_sql.assert_called_once_with( request=self._execute_sql_expected_request(database=database), - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), + ], timeout=TIMEOUT, retry=RETRY, ) class _Client(object): + NTH_CLIENT = AtomicCounter() + def __init__(self): from google.cloud.spanner_v1 import ExecuteSqlRequest self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") self.directed_read_options = None + self._nth_client_id = _Client.NTH_CLIENT.increment() + self._nth_request = AtomicCounter() + + @property + def _next_nth_request(self): + return self._nth_request.increment() class _Instance(object): @@ -1074,6 +1207,27 @@ def __init__(self): self._route_to_leader_enabled = True self._directed_read_options = None + @property + def _next_nth_request(self): + return self._instance._client._next_nth_request + + @property + def _nth_client_id(self): + return self._instance._client._nth_client_id + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + + @property + def _channel_id(self): + return 1 + class _Session(object): _transaction = None diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 9707632421..8169a873f6 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -20,6 +20,11 @@ from google.cloud.spanner_v1 import TypeCode from google.api_core.retry import Retry from google.api_core import gapic_v1 +from google.cloud.spanner_v1._helpers import ( + AtomicCounter, + _metadata_with_request_id, +) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID from tests._helpers import ( HAS_OPENTELEMETRY_INSTALLED, @@ -192,6 +197,10 @@ def test_begin_ok(self): [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -296,6 +305,10 @@ def test_rollback_ok(self): [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -481,6 +494,10 @@ def _commit_helper( [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) self.assertEqual(actual_request_options, expected_request_options) @@ -655,6 +672,10 @@ def _execute_update_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -848,6 +869,10 @@ def _batch_update_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=retry, timeout=timeout, @@ -963,6 +988,10 @@ def test_context_mgr_success(self): [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -993,11 +1022,19 @@ def test_context_mgr_failure(self): class _Client(object): + NTH_CLIENT = AtomicCounter() + def __init__(self): from google.cloud.spanner_v1 import ExecuteSqlRequest self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") self.directed_read_options = None + self._nth_client_id = _Client.NTH_CLIENT.increment() + self._nth_request = AtomicCounter() + + @property + def _next_nth_request(self): + return self._nth_request.increment() class _Instance(object): @@ -1012,6 +1049,27 @@ def __init__(self): self._route_to_leader_enabled = True self._directed_read_options = None + @property + def _next_nth_request(self): + return self._instance._client._next_nth_request + + @property + def _nth_client_id(self): + return self._instance._client._nth_client_id + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + + @property + def _channel_id(self): + return 1 + class _Session(object): _transaction = None