From ad69c48f01b09cbc5270b9cefde23715d5ac54b6 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 16 Dec 2024 21:53:30 -0800 Subject: [PATCH] feat: add updated span events + trace more methods (#1259) * observability: add updated span events + traace more methods This change carves out parts of PR #1241 in smaller pieces to ease with smaller reviews. This change adds more span events, updates important spans to make them more distinct like changing: "CloudSpanner.ReadWriteTransaction" to more direct and more pointed spans like: * CloudSpanner.Transaction.execute_streaming_sql Also added important spans: * CloudSpanner.Database.run_in_transaction * CloudSpanner.Session.run_in_transaction * all: update review comments + show type for BeginTransaction + remove prints * Remove requested span event "Using Transaction" * Move attempts into try block * Transform Session.run_in_transaction retry exceptions into events * More comprehensive test for events and attributes for pool.get * Add test guards against Python3.7 for which OpenTelemetry is unavailable + address test feedback * Remove span event per mutation in favour of future TODO Referencing issue #1269, this update removes adding a span event per mutation, in favour of a future TODO. * Sort system-test.test_transaction_abort_then_retry_spans spans by create time * Delint tests --- .../spanner_v1/_opentelemetry_tracing.py | 12 +- google/cloud/spanner_v1/batch.py | 12 +- google/cloud/spanner_v1/database.py | 42 +++-- google/cloud/spanner_v1/pool.py | 90 ++++++---- google/cloud/spanner_v1/session.py | 135 +++++++++----- google/cloud/spanner_v1/snapshot.py | 10 +- google/cloud/spanner_v1/transaction.py | 10 +- tests/_helpers.py | 9 +- tests/system/_helpers.py | 18 ++ tests/system/test_observability_options.py | 143 ++++++++++++++- tests/system/test_session_api.py | 76 ++++---- tests/unit/test_batch.py | 11 +- tests/unit/test_pool.py | 170 ++++++++++++++++-- tests/unit/test_session.py | 15 +- tests/unit/test_snapshot.py | 18 +- tests/unit/test_transaction.py | 13 +- 16 files changed, 601 insertions(+), 183 deletions(-) diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 1caac59ecd..6f3997069e 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -56,11 +56,11 @@ def get_tracer(tracer_provider=None): @contextmanager -def trace_call(name, session, extra_attributes=None, observability_options=None): +def trace_call(name, session=None, extra_attributes=None, observability_options=None): if session: session._last_use_time = datetime.now() - if not HAS_OPENTELEMETRY_INSTALLED or not session: + if not (HAS_OPENTELEMETRY_INSTALLED and name): # Empty context manager. Users will have to check if the generated value is None or a span yield None return @@ -72,20 +72,24 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) # on by default. enable_extended_tracing = True + db_name = "" + if session and getattr(session, "_database", None): + db_name = session._database.name + if isinstance(observability_options, dict): # Avoid false positives with mock.Mock tracer_provider = observability_options.get("tracer_provider", None) enable_extended_tracing = observability_options.get( "enable_extended_tracing", enable_extended_tracing ) + db_name = observability_options.get("db_name", db_name) tracer = get_tracer(tracer_provider) # Set base attributes that we know for every trace created - db = session._database attributes = { "db.type": "spanner", "db.url": SpannerClient.DEFAULT_ENDPOINT, - "db.instance": "" if not db else db.name, + "db.instance": db_name, "net.host.name": SpannerClient.DEFAULT_ENDPOINT, OTEL_SCOPE_NAME: TRACER_NAME, OTEL_SCOPE_VERSION: TRACER_VERSION, diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 948740d7d4..8d62ac0883 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -70,6 +70,8 @@ def insert(self, table, columns, values): :param values: Values to be modified. """ self._mutations.append(Mutation(insert=_make_write_pb(table, columns, values))) + # TODO: Decide if we should add a span event per mutation: + # https://github.com/googleapis/python-spanner/issues/1269 def update(self, table, columns, values): """Update one or more existing table rows. @@ -84,6 +86,8 @@ def update(self, table, columns, values): :param values: Values to be modified. """ self._mutations.append(Mutation(update=_make_write_pb(table, columns, values))) + # TODO: Decide if we should add a span event per mutation: + # https://github.com/googleapis/python-spanner/issues/1269 def insert_or_update(self, table, columns, values): """Insert/update one or more table rows. @@ -100,6 +104,8 @@ def insert_or_update(self, table, columns, values): self._mutations.append( Mutation(insert_or_update=_make_write_pb(table, columns, values)) ) + # TODO: Decide if we should add a span event per mutation: + # https://github.com/googleapis/python-spanner/issues/1269 def replace(self, table, columns, values): """Replace one or more table rows. @@ -114,6 +120,8 @@ def replace(self, table, columns, values): :param values: Values to be modified. """ self._mutations.append(Mutation(replace=_make_write_pb(table, columns, values))) + # TODO: Decide if we should add a span event per mutation: + # https://github.com/googleapis/python-spanner/issues/1269 def delete(self, table, keyset): """Delete one or more table rows. @@ -126,6 +134,8 @@ def delete(self, table, keyset): """ delete = Mutation.Delete(table=table, key_set=keyset._to_pb()) self._mutations.append(Mutation(delete=delete)) + # TODO: Decide if we should add a span event per mutation: + # https://github.com/googleapis/python-spanner/issues/1269 class Batch(_BatchBase): @@ -207,7 +217,7 @@ def commit( ) observability_options = getattr(database, "observability_options", None) with trace_call( - "CloudSpanner.Commit", + f"CloudSpanner.{type(self).__name__}.commit", self._session, trace_attributes, observability_options=observability_options, diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index c8230ab503..88d2bb60f7 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -70,6 +70,7 @@ from google.cloud.spanner_v1._opentelemetry_tracing import ( add_span_event, get_current_span, + trace_call, ) @@ -720,6 +721,7 @@ def execute_pdml(): iterator = _restart_on_unavailable( method=method, + trace_name="CloudSpanner.ExecuteStreamingSql", request=request, transaction_selector=txn_selector, observability_options=self.observability_options, @@ -881,20 +883,25 @@ def run_in_transaction(self, func, *args, **kw): :raises Exception: reraises any non-ABORT exceptions raised by ``func``. """ - # Sanity check: Is there a transaction already running? - # If there is, then raise a red flag. Otherwise, mark that this one - # is running. - if getattr(self._local, "transaction_running", False): - raise RuntimeError("Spanner does not support nested transactions.") - self._local.transaction_running = True - - # Check out a session and run the function in a transaction; once - # done, flip the sanity check bit back. - try: - with SessionCheckout(self._pool) as session: - return session.run_in_transaction(func, *args, **kw) - finally: - self._local.transaction_running = False + observability_options = getattr(self, "observability_options", None) + with trace_call( + "CloudSpanner.Database.run_in_transaction", + observability_options=observability_options, + ): + # Sanity check: Is there a transaction already running? + # If there is, then raise a red flag. Otherwise, mark that this one + # is running. + if getattr(self._local, "transaction_running", False): + raise RuntimeError("Spanner does not support nested transactions.") + self._local.transaction_running = True + + # Check out a session and run the function in a transaction; once + # done, flip the sanity check bit back. + try: + with SessionCheckout(self._pool) as session: + return session.run_in_transaction(func, *args, **kw) + finally: + self._local.transaction_running = False def restore(self, source): """Restore from a backup to this database. @@ -1120,7 +1127,12 @@ def observability_options(self): if not (self._instance and self._instance._client): return None - return getattr(self._instance._client, "observability_options", None) + opts = getattr(self._instance._client, "observability_options", None) + if not opts: + opts = dict() + + opts["db_name"] = self.name + return opts class BatchCheckout(object): diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 4f90196b4a..03bff81b52 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -28,6 +28,7 @@ from google.cloud.spanner_v1._opentelemetry_tracing import ( add_span_event, get_current_span, + trace_call, ) from warnings import warn @@ -237,29 +238,41 @@ def bind(self, database): session_template=Session(creator_role=self.database_role), ) - returned_session_count = 0 - while not self._sessions.full(): - request.session_count = requested_session_count - self._sessions.qsize() + observability_options = getattr(self._database, "observability_options", None) + with trace_call( + "CloudSpanner.FixedPool.BatchCreateSessions", + observability_options=observability_options, + ) as span: + returned_session_count = 0 + while not self._sessions.full(): + request.session_count = requested_session_count - self._sessions.qsize() + add_span_event( + span, + f"Creating {request.session_count} sessions", + span_event_attributes, + ) + resp = api.batch_create_sessions( + request=request, + metadata=metadata, + ) + + add_span_event( + span, + "Created sessions", + dict(count=len(resp.session)), + ) + + for session_pb in resp.session: + session = self._new_session() + session._session_id = session_pb.name.split("/")[-1] + self._sessions.put(session) + returned_session_count += 1 + add_span_event( span, - f"Creating {request.session_count} sessions", + f"Requested for {requested_session_count} sessions, returned {returned_session_count}", span_event_attributes, ) - resp = api.batch_create_sessions( - request=request, - metadata=metadata, - ) - for session_pb in resp.session: - session = self._new_session() - session._session_id = session_pb.name.split("/")[-1] - self._sessions.put(session) - returned_session_count += 1 - - add_span_event( - span, - f"Requested for {requested_session_count} sessions, returned {returned_session_count}", - span_event_attributes, - ) def get(self, timeout=None): """Check a session out from the pool. @@ -550,25 +563,30 @@ def bind(self, database): span_event_attributes, ) - returned_session_count = 0 - while created_session_count < self.size: - resp = api.batch_create_sessions( - request=request, - metadata=metadata, - ) - for session_pb in resp.session: - session = self._new_session() - session._session_id = session_pb.name.split("/")[-1] - self.put(session) - returned_session_count += 1 + observability_options = getattr(self._database, "observability_options", None) + with trace_call( + "CloudSpanner.PingingPool.BatchCreateSessions", + observability_options=observability_options, + ) as span: + returned_session_count = 0 + while created_session_count < self.size: + resp = api.batch_create_sessions( + request=request, + metadata=metadata, + ) + for session_pb in resp.session: + session = self._new_session() + session._session_id = session_pb.name.split("/")[-1] + self.put(session) + returned_session_count += 1 - created_session_count += len(resp.session) + created_session_count += len(resp.session) - add_span_event( - current_span, - f"Requested for {requested_session_count} sessions, return {returned_session_count}", - span_event_attributes, - ) + add_span_event( + span, + f"Requested for {requested_session_count} sessions, returned {returned_session_count}", + span_event_attributes, + ) def get(self, timeout=None): """Check a session out from the pool. diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index 166d5488c6..d73a8cc2b5 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -243,6 +243,10 @@ def delete(self): with trace_call( "CloudSpanner.DeleteSession", self, + extra_attributes={ + "session.id": self._session_id, + "session.name": self.name, + }, observability_options=observability_options, ): api.delete_session(name=self.name, metadata=metadata) @@ -458,47 +462,98 @@ def run_in_transaction(self, func, *args, **kw): ) attempts = 0 - while True: - if self._transaction is None: - txn = self.transaction() - txn.transaction_tag = transaction_tag - txn.exclude_txn_from_change_streams = exclude_txn_from_change_streams - else: - txn = self._transaction - - try: - attempts += 1 - return_value = func(txn, *args, **kw) - except Aborted as exc: - del self._transaction - _delay_until_retry(exc, deadline, attempts) - continue - except GoogleAPICallError: - del self._transaction - raise - except Exception: - txn.rollback() - raise - - try: - txn.commit( - return_commit_stats=self._database.log_commit_stats, - request_options=commit_request_options, - max_commit_delay=max_commit_delay, - ) - except Aborted as exc: - del self._transaction - _delay_until_retry(exc, deadline, attempts) - except GoogleAPICallError: - del self._transaction - raise - else: - if self._database.log_commit_stats and txn.commit_stats: - self._database.logger.info( - "CommitStats: {}".format(txn.commit_stats), - extra={"commit_stats": txn.commit_stats}, + observability_options = getattr(self._database, "observability_options", None) + with trace_call( + "CloudSpanner.Session.run_in_transaction", + self, + observability_options=observability_options, + ) as span: + while True: + if self._transaction is None: + txn = self.transaction() + txn.transaction_tag = transaction_tag + txn.exclude_txn_from_change_streams = ( + exclude_txn_from_change_streams + ) + else: + txn = self._transaction + + span_attributes = dict() + + try: + attempts += 1 + span_attributes["attempt"] = attempts + txn_id = getattr(txn, "_transaction_id", "") or "" + if txn_id: + span_attributes["transaction.id"] = txn_id + + return_value = func(txn, *args, **kw) + + except Aborted as exc: + del self._transaction + if span: + delay_seconds = _get_retry_delay(exc.errors[0], attempts) + attributes = dict(delay_seconds=delay_seconds, cause=str(exc)) + attributes.update(span_attributes) + add_span_event( + span, + "Transaction was aborted in user operation, retrying", + attributes, + ) + + _delay_until_retry(exc, deadline, attempts) + continue + except GoogleAPICallError: + del self._transaction + add_span_event( + span, + "User operation failed due to GoogleAPICallError, not retrying", + span_attributes, + ) + raise + except Exception: + add_span_event( + span, + "User operation failed. Invoking Transaction.rollback(), not retrying", + span_attributes, + ) + txn.rollback() + raise + + try: + txn.commit( + return_commit_stats=self._database.log_commit_stats, + request_options=commit_request_options, + max_commit_delay=max_commit_delay, + ) + except Aborted as exc: + del self._transaction + if span: + delay_seconds = _get_retry_delay(exc.errors[0], attempts) + attributes = dict(delay_seconds=delay_seconds) + attributes.update(span_attributes) + add_span_event( + span, + "Transaction got aborted during commit, retrying afresh", + attributes, + ) + + _delay_until_retry(exc, deadline, attempts) + except GoogleAPICallError: + del self._transaction + add_span_event( + span, + "Transaction.commit failed due to GoogleAPICallError, not retrying", + span_attributes, ) - return return_value + raise + else: + if self._database.log_commit_stats and txn.commit_stats: + self._database.logger.info( + "CommitStats: {}".format(txn.commit_stats), + extra={"commit_stats": txn.commit_stats}, + ) + return return_value # Rational: this function factors out complex shared deadline / retry diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 89b5094706..6234c96435 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -335,7 +335,7 @@ def read( iterator = _restart_on_unavailable( restart, request, - "CloudSpanner.ReadOnlyTransaction", + f"CloudSpanner.{type(self).__name__}.read", self._session, trace_attributes, transaction=self, @@ -357,7 +357,7 @@ def read( iterator = _restart_on_unavailable( restart, request, - "CloudSpanner.ReadOnlyTransaction", + f"CloudSpanner.{type(self).__name__}.read", self._session, trace_attributes, transaction=self, @@ -578,7 +578,7 @@ def _get_streamed_result_set( iterator = _restart_on_unavailable( restart, request, - "CloudSpanner.ReadWriteTransaction", + f"CloudSpanner.{type(self).__name__}.execute_streaming_sql", self._session, trace_attributes, transaction=self, @@ -676,7 +676,7 @@ def partition_read( trace_attributes = {"table_id": table, "columns": columns} with trace_call( - "CloudSpanner.PartitionReadOnlyTransaction", + f"CloudSpanner.{type(self).__name__}.partition_read", self._session, trace_attributes, observability_options=getattr(database, "observability_options", None), @@ -926,7 +926,7 @@ def begin(self): ) txn_selector = self._make_txn_selector() with trace_call( - "CloudSpanner.BeginTransaction", + f"CloudSpanner.{type(self).__name__}.begin", self._session, observability_options=getattr(database, "observability_options", None), ): diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index fa8e5121ff..a8aef7f470 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -157,7 +157,7 @@ def begin(self): ) observability_options = getattr(database, "observability_options", None) with trace_call( - "CloudSpanner.BeginTransaction", + f"CloudSpanner.{type(self).__name__}.begin", self._session, observability_options=observability_options, ) as span: @@ -199,7 +199,7 @@ def rollback(self): ) observability_options = getattr(database, "observability_options", None) with trace_call( - "CloudSpanner.Rollback", + f"CloudSpanner.{type(self).__name__}.rollback", self._session, observability_options=observability_options, ): @@ -278,7 +278,7 @@ def commit( trace_attributes = {"num_mutations": len(self._mutations)} observability_options = getattr(database, "observability_options", None) with trace_call( - "CloudSpanner.Commit", + f"CloudSpanner.{type(self).__name__}.commit", self._session, trace_attributes, observability_options, @@ -447,7 +447,7 @@ def execute_update( response = self._execute_request( method, request, - "CloudSpanner.ReadWriteTransaction", + f"CloudSpanner.{type(self).__name__}.execute_update", self._session, trace_attributes, observability_options=observability_options, @@ -464,7 +464,7 @@ def execute_update( response = self._execute_request( method, request, - "CloudSpanner.ReadWriteTransaction", + f"CloudSpanner.{type(self).__name__}.execute_update", self._session, trace_attributes, observability_options=observability_options, diff --git a/tests/_helpers.py b/tests/_helpers.py index 81787c5a86..c7b1665e89 100644 --- a/tests/_helpers.py +++ b/tests/_helpers.py @@ -78,7 +78,7 @@ def tearDown(self): def assertNoSpans(self): if HAS_OPENTELEMETRY_INSTALLED: - span_list = self.ot_exporter.get_finished_spans() + span_list = self.get_finished_spans() self.assertEqual(len(span_list), 0) def assertSpanAttributes( @@ -119,11 +119,16 @@ def assertSpanNames(self, want_span_names): def get_finished_spans(self): if HAS_OPENTELEMETRY_INSTALLED: - return list( + span_list = list( filter( lambda span: span and span.name, self.ot_exporter.get_finished_spans(), ) ) + # Sort the spans by their start time in the hierarchy. + return sorted(span_list, key=lambda span: span.start_time) else: return [] + + def reset(self): + self.tearDown() diff --git a/tests/system/_helpers.py b/tests/system/_helpers.py index b62d453512..f157a8ee59 100644 --- a/tests/system/_helpers.py +++ b/tests/system/_helpers.py @@ -137,3 +137,21 @@ def cleanup_old_instances(spanner_client): def unique_id(prefix, separator="-"): return f"{prefix}{system.unique_resource_id(separator)}" + + +class FauxCall: + def __init__(self, code, details="FauxCall"): + self._code = code + self._details = details + + def initial_metadata(self): + return {} + + def trailing_metadata(self): + return {} + + def code(self): + return self._code + + def details(self): + return self._details diff --git a/tests/system/test_observability_options.py b/tests/system/test_observability_options.py index 8382255c15..42ce0de7fe 100644 --- a/tests/system/test_observability_options.py +++ b/tests/system/test_observability_options.py @@ -105,7 +105,10 @@ def test_propagation(enable_extended_tracing): len(from_inject_spans) >= 2 ) # "Expecting at least 2 spans from the injected trace exporter" gotNames = [span.name for span in from_inject_spans] - wantNames = ["CloudSpanner.CreateSession", "CloudSpanner.ReadWriteTransaction"] + wantNames = [ + "CloudSpanner.CreateSession", + "CloudSpanner.Snapshot.execute_streaming_sql", + ] assert gotNames == wantNames # Check for conformance of enable_extended_tracing @@ -128,6 +131,144 @@ def test_propagation(enable_extended_tracing): test_propagation(False) +@pytest.mark.skipif( + not _helpers.USE_EMULATOR, + reason="Emulator needed to run this tests", +) +@pytest.mark.skipif( + not HAS_OTEL_INSTALLED, + reason="Tracing requires OpenTelemetry", +) +def test_transaction_abort_then_retry_spans(): + from google.auth.credentials import AnonymousCredentials + from google.api_core.exceptions import Aborted + from google.rpc import code_pb2 + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + from opentelemetry.trace.status import StatusCode + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.sampling import ALWAYS_ON + + PROJECT = _helpers.EMULATOR_PROJECT + CONFIGURATION_NAME = "config-name" + INSTANCE_ID = _helpers.INSTANCE_ID + DISPLAY_NAME = "display-name" + DATABASE_ID = _helpers.unique_id("temp_db") + NODE_COUNT = 5 + LABELS = {"test": "true"} + + counters = dict(aborted=0) + + def select_in_txn(txn): + results = txn.execute_sql("SELECT 1") + for row in results: + _ = row + + if counters["aborted"] == 0: + counters["aborted"] = 1 + raise Aborted( + "Thrown from ClientInterceptor for testing", + errors=[_helpers.FauxCall(code_pb2.ABORTED)], + ) + + tracer_provider = TracerProvider(sampler=ALWAYS_ON) + trace_exporter = InMemorySpanExporter() + tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter)) + observability_options = dict( + tracer_provider=tracer_provider, + enable_extended_tracing=True, + ) + + client = Client( + project=PROJECT, + observability_options=observability_options, + credentials=AnonymousCredentials(), + ) + + instance = client.instance( + INSTANCE_ID, + CONFIGURATION_NAME, + display_name=DISPLAY_NAME, + node_count=NODE_COUNT, + labels=LABELS, + ) + + try: + instance.create() + except Exception: + pass + + db = instance.database(DATABASE_ID) + try: + db.create() + except Exception: + pass + + db.run_in_transaction(select_in_txn) + + span_list = trace_exporter.get_finished_spans() + # Sort the spans by their start time in the hierarchy. + span_list = sorted(span_list, key=lambda span: span.start_time) + got_span_names = [span.name for span in span_list] + want_span_names = [ + "CloudSpanner.Database.run_in_transaction", + "CloudSpanner.CreateSession", + "CloudSpanner.Session.run_in_transaction", + "CloudSpanner.Transaction.execute_streaming_sql", + "CloudSpanner.Transaction.execute_streaming_sql", + "CloudSpanner.Transaction.commit", + ] + + assert got_span_names == want_span_names + + got_events = [] + got_statuses = [] + + # Some event attributes are noisy/highly ephemeral + # and can't be directly compared against. + imprecise_event_attributes = ["exception.stacktrace", "delay_seconds", "cause"] + for span in span_list: + got_statuses.append( + (span.name, span.status.status_code, span.status.description) + ) + for event in span.events: + evt_attributes = event.attributes.copy() + for attr_name in imprecise_event_attributes: + if attr_name in evt_attributes: + evt_attributes[attr_name] = "EPHEMERAL" + + got_events.append((event.name, evt_attributes)) + + # Check for the series of events + want_events = [ + ("Acquiring session", {"kind": "BurstyPool"}), + ("Waiting for a session to become available", {"kind": "BurstyPool"}), + ("No sessions available in pool. Creating session", {"kind": "BurstyPool"}), + ("Creating Session", {}), + ( + "Transaction was aborted in user operation, retrying", + {"delay_seconds": "EPHEMERAL", "cause": "EPHEMERAL", "attempt": 1}, + ), + ("Starting Commit", {}), + ("Commit Done", {}), + ] + assert got_events == want_events + + # Check for the statues. + codes = StatusCode + want_statuses = [ + ("CloudSpanner.Database.run_in_transaction", codes.OK, None), + ("CloudSpanner.CreateSession", codes.OK, None), + ("CloudSpanner.Session.run_in_transaction", codes.OK, None), + ("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None), + ("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None), + ("CloudSpanner.Transaction.commit", codes.OK, None), + ] + assert got_statuses == want_statuses + + def _make_credentials(): from google.auth.credentials import AnonymousCredentials diff --git a/tests/system/test_session_api.py b/tests/system/test_session_api.py index b7337cb258..4e80657584 100644 --- a/tests/system/test_session_api.py +++ b/tests/system/test_session_api.py @@ -447,7 +447,7 @@ def test_batch_insert_then_read(sessions_database, ot_exporter): ) assert_span_attributes( ot_exporter, - "CloudSpanner.Commit", + "CloudSpanner.Batch.commit", attributes=_make_attributes(db_name, num_mutations=2), span=span_list[1], ) @@ -459,7 +459,7 @@ def test_batch_insert_then_read(sessions_database, ot_exporter): ) assert_span_attributes( ot_exporter, - "CloudSpanner.ReadOnlyTransaction", + "CloudSpanner.Snapshot.read", attributes=_make_attributes(db_name, columns=sd.COLUMNS, table_id=sd.TABLE), span=span_list[3], ) @@ -608,7 +608,18 @@ def test_transaction_read_and_insert_then_rollback( if ot_exporter is not None: span_list = ot_exporter.get_finished_spans() - assert len(span_list) == 8 + got_span_names = [span.name for span in span_list] + want_span_names = [ + "CloudSpanner.CreateSession", + "CloudSpanner.GetSession", + "CloudSpanner.Batch.commit", + "CloudSpanner.Transaction.begin", + "CloudSpanner.Transaction.read", + "CloudSpanner.Transaction.read", + "CloudSpanner.Transaction.rollback", + "CloudSpanner.Snapshot.read", + ] + assert got_span_names == want_span_names assert_span_attributes( ot_exporter, @@ -624,19 +635,19 @@ def test_transaction_read_and_insert_then_rollback( ) assert_span_attributes( ot_exporter, - "CloudSpanner.Commit", + "CloudSpanner.Batch.commit", attributes=_make_attributes(db_name, num_mutations=1), span=span_list[2], ) assert_span_attributes( ot_exporter, - "CloudSpanner.BeginTransaction", + "CloudSpanner.Transaction.begin", attributes=_make_attributes(db_name), span=span_list[3], ) assert_span_attributes( ot_exporter, - "CloudSpanner.ReadOnlyTransaction", + "CloudSpanner.Transaction.read", attributes=_make_attributes( db_name, table_id=sd.TABLE, @@ -646,7 +657,7 @@ def test_transaction_read_and_insert_then_rollback( ) assert_span_attributes( ot_exporter, - "CloudSpanner.ReadOnlyTransaction", + "CloudSpanner.Transaction.read", attributes=_make_attributes( db_name, table_id=sd.TABLE, @@ -656,13 +667,13 @@ def test_transaction_read_and_insert_then_rollback( ) assert_span_attributes( ot_exporter, - "CloudSpanner.Rollback", + "CloudSpanner.Transaction.rollback", attributes=_make_attributes(db_name), span=span_list[6], ) assert_span_attributes( ot_exporter, - "CloudSpanner.ReadOnlyTransaction", + "CloudSpanner.Snapshot.read", attributes=_make_attributes( db_name, table_id=sd.TABLE, @@ -1183,18 +1194,29 @@ def unit_of_work(transaction): session.run_in_transaction(unit_of_work) span_list = ot_exporter.get_finished_spans() - assert len(span_list) == 5 - expected_span_names = [ + got_span_names = [span.name for span in span_list] + want_span_names = [ "CloudSpanner.CreateSession", - "CloudSpanner.Commit", + "CloudSpanner.Batch.commit", "CloudSpanner.DMLTransaction", - "CloudSpanner.Commit", + "CloudSpanner.Transaction.commit", + "CloudSpanner.Session.run_in_transaction", "Test Span", ] - assert [span.name for span in span_list] == expected_span_names - for span in span_list[2:-1]: - assert span.context.trace_id == span_list[-1].context.trace_id - assert span.parent.span_id == span_list[-1].context.span_id + assert got_span_names == want_span_names + + def assert_parent_hierarchy(parent, children): + for child in children: + assert child.context.trace_id == parent.context.trace_id + assert child.parent.span_id == parent.context.span_id + + test_span = span_list[-1] + test_span_children = [span_list[-2]] + assert_parent_hierarchy(test_span, test_span_children) + + session_run_in_txn = span_list[-2] + session_run_in_txn_children = span_list[2:-2] + assert_parent_hierarchy(session_run_in_txn, session_run_in_txn_children) def test_execute_partitioned_dml( @@ -2844,31 +2866,13 @@ def test_mutation_groups_insert_or_update_then_query(not_emulator, sessions_data sd._check_rows_data(rows, sd.BATCH_WRITE_ROW_DATA) -class FauxCall: - def __init__(self, code, details="FauxCall"): - self._code = code - self._details = details - - def initial_metadata(self): - return {} - - def trailing_metadata(self): - return {} - - def code(self): - return self._code - - def details(self): - return self._details - - def _check_batch_status(status_code, expected=code_pb2.OK): if status_code != expected: _status_code_to_grpc_status_code = { member.value[0]: member for member in grpc.StatusCode } grpc_status_code = _status_code_to_grpc_status_code[status_code] - call = FauxCall(status_code) + call = _helpers.FauxCall(status_code) raise exceptions.from_grpc_status( grpc_status_code, "batch_update failed", errors=[call] ) diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index a7f7a6f970..a43678f3b9 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -212,7 +212,7 @@ def test_commit_grpc_error(self): batch.commit() self.assertSpanAttributes( - "CloudSpanner.Commit", + "CloudSpanner.Batch.commit", status=StatusCode.ERROR, attributes=dict(BASE_ATTRIBUTES, num_mutations=1), ) @@ -261,7 +261,8 @@ def test_commit_ok(self): self.assertEqual(max_commit_delay, None) self.assertSpanAttributes( - "CloudSpanner.Commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) + "CloudSpanner.Batch.commit", + attributes=dict(BASE_ATTRIBUTES, num_mutations=1), ) def _test_commit_with_options( @@ -327,7 +328,8 @@ def _test_commit_with_options( self.assertEqual(actual_request_options, expected_request_options) self.assertSpanAttributes( - "CloudSpanner.Commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) + "CloudSpanner.Batch.commit", + attributes=dict(BASE_ATTRIBUTES, num_mutations=1), ) self.assertEqual(max_commit_delay_in, max_commit_delay) @@ -438,7 +440,8 @@ def test_context_mgr_success(self): self.assertEqual(request_options, RequestOptions()) self.assertSpanAttributes( - "CloudSpanner.Commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) + "CloudSpanner.Batch.commit", + attributes=dict(BASE_ATTRIBUTES, num_mutations=1), ) def test_context_mgr_failure(self): diff --git a/tests/unit/test_pool.py b/tests/unit/test_pool.py index fbb35201eb..89715c741d 100644 --- a/tests/unit/test_pool.py +++ b/tests/unit/test_pool.py @@ -24,6 +24,7 @@ OpenTelemetryBase, StatusCode, enrich_with_otel_scope, + HAS_OPENTELEMETRY_INSTALLED, ) @@ -232,6 +233,9 @@ def test_get_non_expired(self): self.assertFalse(pool._sessions.full()) def test_spans_bind_get(self): + if not HAS_OPENTELEMETRY_INSTALLED: + return + # This tests retrieving 1 out of 4 sessions from the session pool. pool = self._make_one(size=4) database = _Database("name") @@ -239,29 +243,41 @@ def test_spans_bind_get(self): database._sessions.extend(SESSIONS) pool.bind(database) - with trace_call("pool.Get", SESSIONS[0]) as span: + with trace_call("pool.Get", SESSIONS[0]): pool.get() - wantEventNames = [ - "Acquiring session", - "Waiting for a session to become available", - "Acquired session", - ] - self.assertSpanEvents("pool.Get", wantEventNames, span) - # Check for the overall spans too. + span_list = self.get_finished_spans() + got_span_names = [span.name for span in span_list] + want_span_names = ["CloudSpanner.FixedPool.BatchCreateSessions", "pool.Get"] + assert got_span_names == want_span_names + + attrs = TestFixedSizePool.BASE_ATTRIBUTES.copy() + + # Check for the overall spans. + self.assertSpanAttributes( + "CloudSpanner.FixedPool.BatchCreateSessions", + status=StatusCode.OK, + attributes=attrs, + span=span_list[0], + ) + self.assertSpanAttributes( "pool.Get", + status=StatusCode.OK, attributes=TestFixedSizePool.BASE_ATTRIBUTES, + span=span_list[-1], ) - wantEventNames = [ "Acquiring session", "Waiting for a session to become available", "Acquired session", ] - self.assertSpanEvents("pool.Get", wantEventNames) + self.assertSpanEvents("pool.Get", wantEventNames, span_list[-1]) def test_spans_bind_get_empty_pool(self): + if not HAS_OPENTELEMETRY_INSTALLED: + return + # Tests trying to invoke pool.get() from an empty pool. pool = self._make_one(size=0) database = _Database("name") @@ -289,7 +305,23 @@ def test_spans_bind_get_empty_pool(self): attributes=TestFixedSizePool.BASE_ATTRIBUTES, ) + span_list = self.get_finished_spans() + got_all_events = [] + for span in span_list: + for event in span.events: + got_all_events.append((event.name, event.attributes)) + want_all_events = [ + ("Invalid session pool size(0) <= 0", {"kind": "FixedSizePool"}), + ("Acquiring session", {"kind": "FixedSizePool"}), + ("Waiting for a session to become available", {"kind": "FixedSizePool"}), + ("No sessions available in the pool", {"kind": "FixedSizePool"}), + ] + assert got_all_events == want_all_events + def test_spans_pool_bind(self): + if not HAS_OPENTELEMETRY_INSTALLED: + return + # Tests the exception generated from invoking pool.bind when # you have an empty pool. pool = self._make_one(size=1) @@ -304,20 +336,63 @@ def test_spans_pool_bind(self): except Exception: pass + span_list = self.get_finished_spans() + got_span_names = [span.name for span in span_list] + want_span_names = ["testBind", "CloudSpanner.FixedPool.BatchCreateSessions"] + assert got_span_names == want_span_names + wantEventNames = [ "Requesting 1 sessions", - "Creating 1 sessions", "exception", ] - self.assertSpanEvents("testBind", wantEventNames) + self.assertSpanEvents("testBind", wantEventNames, span_list[0]) - # Check for the overall spans. self.assertSpanAttributes( "testBind", status=StatusCode.ERROR, attributes=TestFixedSizePool.BASE_ATTRIBUTES, + span=span_list[0], ) + got_all_events = [] + + # Some event attributes are noisy/highly ephemeral + # and can't be directly compared against. + imprecise_event_attributes = ["exception.stacktrace", "delay_seconds", "cause"] + for span in span_list: + for event in span.events: + evt_attributes = event.attributes.copy() + for attr_name in imprecise_event_attributes: + if attr_name in evt_attributes: + evt_attributes[attr_name] = "EPHEMERAL" + + got_all_events.append((event.name, evt_attributes)) + + want_all_events = [ + ("Requesting 1 sessions", {"kind": "FixedSizePool"}), + ( + "exception", + { + "exception.type": "IndexError", + "exception.message": "pop from empty list", + "exception.stacktrace": "EPHEMERAL", + "exception.escaped": "False", + }, + ), + ("Creating 1 sessions", {"kind": "FixedSizePool"}), + ("Created sessions", {"count": 1}), + ( + "exception", + { + "exception.type": "IndexError", + "exception.message": "pop from empty list", + "exception.stacktrace": "EPHEMERAL", + "exception.escaped": "False", + }, + ), + ] + assert got_all_events == want_all_events + def test_get_expired(self): pool = self._make_one(size=4) database = _Database("name") @@ -364,6 +439,7 @@ def test_put_full(self): SESSIONS = [_Session(database)] * 4 database._sessions.extend(SESSIONS) pool.bind(database) + self.reset() with self.assertRaises(queue.Full): pool.put(_Session(database)) @@ -458,6 +534,9 @@ def test_get_empty(self): self.assertTrue(pool._sessions.empty()) def test_spans_get_empty_pool(self): + if not HAS_OPENTELEMETRY_INSTALLED: + return + # This scenario tests a pool that hasn't been filled up # and pool.get() acquires from a pool, waiting for a session # to become available. @@ -474,16 +553,23 @@ def test_spans_get_empty_pool(self): session.create.assert_called() self.assertTrue(pool._sessions.empty()) + span_list = self.get_finished_spans() + got_span_names = [span.name for span in span_list] + want_span_names = ["pool.Get"] + assert got_span_names == want_span_names + + create_span = span_list[-1] self.assertSpanAttributes( "pool.Get", attributes=TestBurstyPool.BASE_ATTRIBUTES, + span=create_span, ) wantEventNames = [ "Acquiring session", "Waiting for a session to become available", "No sessions available in pool. Creating session", ] - self.assertSpanEvents("pool.Get", wantEventNames) + self.assertSpanEvents("pool.Get", wantEventNames, span=create_span) def test_get_non_empty_session_exists(self): pool = self._make_one() @@ -708,6 +794,7 @@ def test_get_hit_no_ping(self): SESSIONS = [_Session(database)] * 4 database._sessions.extend(SESSIONS) pool.bind(database) + self.reset() session = pool.get() @@ -731,6 +818,8 @@ def test_get_hit_w_ping(self): with _Monkey(MUT, _NOW=lambda: sessions_created): pool.bind(database) + self.reset() + session = pool.get() self.assertIs(session, SESSIONS[0]) @@ -753,6 +842,7 @@ def test_get_hit_w_ping_expired(self): with _Monkey(MUT, _NOW=lambda: sessions_created): pool.bind(database) + self.reset() session = pool.get() @@ -799,7 +889,39 @@ def test_put_full(self): pool.put(_Session(database)) self.assertTrue(pool._sessions.full()) - self.assertNoSpans() + + def test_spans_put_full(self): + if not HAS_OPENTELEMETRY_INSTALLED: + return + + import queue + + pool = self._make_one(size=4) + database = _Database("name") + SESSIONS = [_Session(database)] * 4 + database._sessions.extend(SESSIONS) + pool.bind(database) + + with self.assertRaises(queue.Full): + pool.put(_Session(database)) + + self.assertTrue(pool._sessions.full()) + + span_list = self.get_finished_spans() + got_span_names = [span.name for span in span_list] + want_span_names = ["CloudSpanner.PingingPool.BatchCreateSessions"] + assert got_span_names == want_span_names + + attrs = TestPingingPool.BASE_ATTRIBUTES.copy() + self.assertSpanAttributes( + "CloudSpanner.PingingPool.BatchCreateSessions", + attributes=attrs, + span=span_list[-1], + ) + wantEventNames = ["Requested for 4 sessions, returned 4"] + self.assertSpanEvents( + "CloudSpanner.PingingPool.BatchCreateSessions", wantEventNames + ) def test_put_non_full(self): import datetime @@ -828,6 +950,7 @@ def test_clear(self): SESSIONS = [_Session(database)] * 10 database._sessions.extend(SESSIONS) pool.bind(database) + self.reset() self.assertTrue(pool._sessions.full()) api = database.spanner_api @@ -852,6 +975,7 @@ def test_ping_oldest_fresh(self): SESSIONS = [_Session(database)] * 1 database._sessions.extend(SESSIONS) pool.bind(database) + self.reset() pool.ping() @@ -886,6 +1010,7 @@ def test_ping_oldest_stale_and_not_exists(self): SESSIONS[0]._exists = False database._sessions.extend(SESSIONS) pool.bind(database) + self.reset() later = datetime.datetime.utcnow() + datetime.timedelta(seconds=4000) with _Monkey(MUT, _NOW=lambda: later): @@ -896,6 +1021,9 @@ def test_ping_oldest_stale_and_not_exists(self): self.assertNoSpans() def test_spans_get_and_leave_empty_pool(self): + if not HAS_OPENTELEMETRY_INSTALLED: + return + # This scenario tests the spans generated from pulling a span # out the pool and leaving it empty. pool = self._make_one() @@ -914,15 +1042,21 @@ def test_spans_get_and_leave_empty_pool(self): # session.create.assert_called() self.assertTrue(pool._sessions.empty()) + span_list = self.get_finished_spans() + got_span_names = [span.name for span in span_list] + want_span_names = ["CloudSpanner.PingingPool.BatchCreateSessions", "pool.Get"] + assert got_span_names == want_span_names + self.assertSpanAttributes( "pool.Get", attributes=TestPingingPool.BASE_ATTRIBUTES, + span=span_list[-1], ) wantEventNames = [ "Waiting for a session to become available", "Acquired session", ] - self.assertSpanEvents("pool.Get", wantEventNames) + self.assertSpanEvents("pool.Get", wantEventNames, span_list[-1]) class TestSessionCheckout(unittest.TestCase): @@ -1095,6 +1229,10 @@ def session(self, **kwargs): # sessions into pool (important for order tests) return self._sessions.pop(0) + @property + def observability_options(self): + return dict(db_name=self.name) + class _Queue(object): _size = 1 diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 966adadcbd..0d60e98cd0 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -558,8 +558,11 @@ def test_delete_hit(self): metadata=[("google-cloud-resource-prefix", database.name)], ) + attrs = {"session.id": session._session_id, "session.name": session.name} + attrs.update(TestSession.BASE_ATTRIBUTES) self.assertSpanAttributes( - "CloudSpanner.DeleteSession", attributes=TestSession.BASE_ATTRIBUTES + "CloudSpanner.DeleteSession", + attributes=attrs, ) def test_delete_miss(self): @@ -580,10 +583,13 @@ def test_delete_miss(self): metadata=[("google-cloud-resource-prefix", database.name)], ) + attrs = {"session.id": session._session_id, "session.name": session.name} + attrs.update(TestSession.BASE_ATTRIBUTES) + self.assertSpanAttributes( "CloudSpanner.DeleteSession", status=StatusCode.ERROR, - attributes=TestSession.BASE_ATTRIBUTES, + attributes=attrs, ) def test_delete_error(self): @@ -604,10 +610,13 @@ def test_delete_error(self): metadata=[("google-cloud-resource-prefix", database.name)], ) + attrs = {"session.id": session._session_id, "session.name": session.name} + attrs.update(TestSession.BASE_ATTRIBUTES) + self.assertSpanAttributes( "CloudSpanner.DeleteSession", status=StatusCode.ERROR, - attributes=TestSession.BASE_ATTRIBUTES, + attributes=attrs, ) def test_snapshot_not_created(self): diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index 479a0d62e9..a4446a0d1e 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -616,7 +616,7 @@ def test_read_other_error(self): list(derived.read(TABLE_NAME, COLUMNS, keyset)) self.assertSpanAttributes( - "CloudSpanner.ReadOnlyTransaction", + "CloudSpanner._Derived.read", status=StatusCode.ERROR, attributes=dict( BASE_ATTRIBUTES, table_id=TABLE_NAME, columns=tuple(COLUMNS) @@ -773,7 +773,7 @@ def _read_helper( ) self.assertSpanAttributes( - "CloudSpanner.ReadOnlyTransaction", + "CloudSpanner._Derived.read", attributes=dict( BASE_ATTRIBUTES, table_id=TABLE_NAME, columns=tuple(COLUMNS) ), @@ -868,7 +868,7 @@ def test_execute_sql_other_error(self): self.assertEqual(derived._execute_sql_count, 1) self.assertSpanAttributes( - "CloudSpanner.ReadWriteTransaction", + "CloudSpanner._Derived.execute_streaming_sql", status=StatusCode.ERROR, attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY}), ) @@ -1024,7 +1024,7 @@ def _execute_sql_helper( self.assertEqual(derived._execute_sql_count, sql_count + 1) self.assertSpanAttributes( - "CloudSpanner.ReadWriteTransaction", + "CloudSpanner._Derived.execute_streaming_sql", status=StatusCode.OK, attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY_WITH_PARAM}), ) @@ -1195,7 +1195,7 @@ def _partition_read_helper( ) self.assertSpanAttributes( - "CloudSpanner.PartitionReadOnlyTransaction", + "CloudSpanner._Derived.partition_read", status=StatusCode.OK, attributes=dict( BASE_ATTRIBUTES, table_id=TABLE_NAME, columns=tuple(COLUMNS) @@ -1226,7 +1226,7 @@ def test_partition_read_other_error(self): list(derived.partition_read(TABLE_NAME, COLUMNS, keyset)) self.assertSpanAttributes( - "CloudSpanner.PartitionReadOnlyTransaction", + "CloudSpanner._Derived.partition_read", status=StatusCode.ERROR, attributes=dict( BASE_ATTRIBUTES, table_id=TABLE_NAME, columns=tuple(COLUMNS) @@ -1697,7 +1697,7 @@ def test_begin_w_other_error(self): snapshot.begin() self.assertSpanAttributes( - "CloudSpanner.BeginTransaction", + "CloudSpanner.Snapshot.begin", status=StatusCode.ERROR, attributes=BASE_ATTRIBUTES, ) @@ -1755,7 +1755,7 @@ def test_begin_ok_exact_staleness(self): ) self.assertSpanAttributes( - "CloudSpanner.BeginTransaction", + "CloudSpanner.Snapshot.begin", status=StatusCode.OK, attributes=BASE_ATTRIBUTES, ) @@ -1791,7 +1791,7 @@ def test_begin_ok_exact_strong(self): ) self.assertSpanAttributes( - "CloudSpanner.BeginTransaction", + "CloudSpanner.Snapshot.begin", status=StatusCode.OK, attributes=BASE_ATTRIBUTES, ) diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index e426f912b2..d3d7035854 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -162,7 +162,7 @@ def test_begin_w_other_error(self): transaction.begin() self.assertSpanAttributes( - "CloudSpanner.BeginTransaction", + "CloudSpanner.Transaction.begin", status=StatusCode.ERROR, attributes=TestTransaction.BASE_ATTRIBUTES, ) @@ -195,7 +195,7 @@ def test_begin_ok(self): ) self.assertSpanAttributes( - "CloudSpanner.BeginTransaction", attributes=TestTransaction.BASE_ATTRIBUTES + "CloudSpanner.Transaction.begin", attributes=TestTransaction.BASE_ATTRIBUTES ) def test_begin_w_retry(self): @@ -266,7 +266,7 @@ def test_rollback_w_other_error(self): self.assertFalse(transaction.rolled_back) self.assertSpanAttributes( - "CloudSpanner.Rollback", + "CloudSpanner.Transaction.rollback", status=StatusCode.ERROR, attributes=TestTransaction.BASE_ATTRIBUTES, ) @@ -299,7 +299,8 @@ def test_rollback_ok(self): ) self.assertSpanAttributes( - "CloudSpanner.Rollback", attributes=TestTransaction.BASE_ATTRIBUTES + "CloudSpanner.Transaction.rollback", + attributes=TestTransaction.BASE_ATTRIBUTES, ) def test_commit_not_begun(self): @@ -345,7 +346,7 @@ def test_commit_w_other_error(self): self.assertIsNone(transaction.committed) self.assertSpanAttributes( - "CloudSpanner.Commit", + "CloudSpanner.Transaction.commit", status=StatusCode.ERROR, attributes=dict(TestTransaction.BASE_ATTRIBUTES, num_mutations=1), ) @@ -427,7 +428,7 @@ def _commit_helper( self.assertEqual(transaction.commit_stats.mutation_count, 4) self.assertSpanAttributes( - "CloudSpanner.Commit", + "CloudSpanner.Transaction.commit", attributes=dict( TestTransaction.BASE_ATTRIBUTES, num_mutations=len(transaction._mutations),