diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 1caac59ecd..495666de0c 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -56,15 +56,15 @@ def get_tracer(tracer_provider=None): @contextmanager -def trace_call(name, session, extra_attributes=None, observability_options=None): - if session: - session._last_use_time = datetime.now() - - if not HAS_OPENTELEMETRY_INSTALLED or not session: +def trace_call(name, session=None, extra_attributes=None, observability_options=None): + if not HAS_OPENTELEMETRY_INSTALLED: # Empty context manager. Users will have to check if the generated value is None or a span yield None return + if session: + session._last_use_time = datetime.now() + tracer_provider = None # By default enable_extended_tracing=True because in a bid to minimize @@ -72,20 +72,24 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) # on by default. enable_extended_tracing = True + db_name = "" + if session and getattr(session, "_database", None): + db_name = session._database.name + if isinstance(observability_options, dict): # Avoid false positives with mock.Mock tracer_provider = observability_options.get("tracer_provider", None) enable_extended_tracing = observability_options.get( "enable_extended_tracing", enable_extended_tracing ) + db_name = observability_options.get("db_name", db_name) tracer = get_tracer(tracer_provider) # Set base attributes that we know for every trace created - db = session._database attributes = { "db.type": "spanner", "db.url": SpannerClient.DEFAULT_ENDPOINT, - "db.instance": "" if not db else db.name, + "db.instance": db_name, "net.host.name": SpannerClient.DEFAULT_ENDPOINT, OTEL_SCOPE_NAME: TRACER_NAME, OTEL_SCOPE_VERSION: TRACER_VERSION, @@ -99,6 +103,17 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) if not enable_extended_tracing: attributes.pop("db.statement", False) + attributes.pop("sql", False) + else: + # Otherwise there are places where the annotated sql was inserted + # directly from the arguments as "sql", and transform those into "db.statement". + db_statement = attributes.get("db.statement", None) + if not db_statement: + sql = attributes.get("sql", None) + if sql: + attributes = attributes.copy() + attributes.pop("sql", False) + attributes["db.statement"] = sql with tracer.start_as_current_span( name, kind=trace.SpanKind.CLIENT, attributes=attributes @@ -131,3 +146,16 @@ def get_current_span(): def add_span_event(span, event_name, event_attributes=None): if span: span.add_event(event_name, event_attributes) + + +def add_event_on_current_span(event_name, event_attributes=None, span=None): + if not span: + span = get_current_span() + + add_span_event(span, event_name, event_attributes) + + +def record_span_exception_and_status(span, exc): + if span: + span.set_status(Status(StatusCode.ERROR, str(exc))) + span.record_exception(exc) diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 948740d7d4..2c32457e3c 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -26,7 +26,10 @@ _metadata_with_prefix, _metadata_with_leader_aware_routing, ) -from google.cloud.spanner_v1._opentelemetry_tracing import trace_call +from google.cloud.spanner_v1._opentelemetry_tracing import ( + add_event_on_current_span, + trace_call, +) from google.cloud.spanner_v1 import RequestOptions from google.cloud.spanner_v1._helpers import _retry from google.cloud.spanner_v1._helpers import _check_rst_stream_error @@ -70,6 +73,10 @@ def insert(self, table, columns, values): :param values: Values to be modified. """ self._mutations.append(Mutation(insert=_make_write_pb(table, columns, values))) + add_event_on_current_span( + "insert mutations added", + dict(table=table, columns=columns), + ) def update(self, table, columns, values): """Update one or more existing table rows. @@ -84,6 +91,10 @@ def update(self, table, columns, values): :param values: Values to be modified. """ self._mutations.append(Mutation(update=_make_write_pb(table, columns, values))) + add_event_on_current_span( + "update mutations added", + dict(table=table, columns=columns), + ) def insert_or_update(self, table, columns, values): """Insert/update one or more table rows. @@ -100,6 +111,10 @@ def insert_or_update(self, table, columns, values): self._mutations.append( Mutation(insert_or_update=_make_write_pb(table, columns, values)) ) + add_event_on_current_span( + "insert_or_update mutations added", + dict(table=table, columns=columns), + ) def replace(self, table, columns, values): """Replace one or more table rows. @@ -114,6 +129,10 @@ def replace(self, table, columns, values): :param values: Values to be modified. """ self._mutations.append(Mutation(replace=_make_write_pb(table, columns, values))) + add_event_on_current_span( + "replace mutations added", + dict(table=table, columns=columns), + ) def delete(self, table, keyset): """Delete one or more table rows. @@ -126,6 +145,10 @@ def delete(self, table, keyset): """ delete = Mutation.Delete(table=table, key_set=keyset._to_pb()) self._mutations.append(Mutation(delete=delete)) + add_event_on_current_span( + "delete mutations added", + dict(table=table), + ) class Batch(_BatchBase): @@ -207,7 +230,7 @@ def commit( ) observability_options = getattr(database, "observability_options", None) with trace_call( - "CloudSpanner.Commit", + f"CloudSpanner.{type(self).__name__}.commit", self._session, trace_attributes, observability_options=observability_options, diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index c8230ab503..88d2bb60f7 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -70,6 +70,7 @@ from google.cloud.spanner_v1._opentelemetry_tracing import ( add_span_event, get_current_span, + trace_call, ) @@ -720,6 +721,7 @@ def execute_pdml(): iterator = _restart_on_unavailable( method=method, + trace_name="CloudSpanner.ExecuteStreamingSql", request=request, transaction_selector=txn_selector, observability_options=self.observability_options, @@ -881,20 +883,25 @@ def run_in_transaction(self, func, *args, **kw): :raises Exception: reraises any non-ABORT exceptions raised by ``func``. """ - # Sanity check: Is there a transaction already running? - # If there is, then raise a red flag. Otherwise, mark that this one - # is running. - if getattr(self._local, "transaction_running", False): - raise RuntimeError("Spanner does not support nested transactions.") - self._local.transaction_running = True - - # Check out a session and run the function in a transaction; once - # done, flip the sanity check bit back. - try: - with SessionCheckout(self._pool) as session: - return session.run_in_transaction(func, *args, **kw) - finally: - self._local.transaction_running = False + observability_options = getattr(self, "observability_options", None) + with trace_call( + "CloudSpanner.Database.run_in_transaction", + observability_options=observability_options, + ): + # Sanity check: Is there a transaction already running? + # If there is, then raise a red flag. Otherwise, mark that this one + # is running. + if getattr(self._local, "transaction_running", False): + raise RuntimeError("Spanner does not support nested transactions.") + self._local.transaction_running = True + + # Check out a session and run the function in a transaction; once + # done, flip the sanity check bit back. + try: + with SessionCheckout(self._pool) as session: + return session.run_in_transaction(func, *args, **kw) + finally: + self._local.transaction_running = False def restore(self, source): """Restore from a backup to this database. @@ -1120,7 +1127,12 @@ def observability_options(self): if not (self._instance and self._instance._client): return None - return getattr(self._instance._client, "observability_options", None) + opts = getattr(self._instance._client, "observability_options", None) + if not opts: + opts = dict() + + opts["db_name"] = self.name + return opts class BatchCheckout(object): diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 4f90196b4a..7b73b3da91 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -28,6 +28,7 @@ from google.cloud.spanner_v1._opentelemetry_tracing import ( add_span_event, get_current_span, + trace_call, ) from warnings import warn @@ -237,29 +238,40 @@ def bind(self, database): session_template=Session(creator_role=self.database_role), ) - returned_session_count = 0 - while not self._sessions.full(): - request.session_count = requested_session_count - self._sessions.qsize() + observability_options = getattr(self._database, "observability_options", None) + with trace_call( + "Cloudspanner.FixedPool.BatchCreateSessions", + observability_options=observability_options, + ) as span: + returned_session_count = 0 + while not self._sessions.full(): + request.session_count = requested_session_count - self._sessions.qsize() + add_span_event( + span, + f"Creating {request.session_count} sessions", + span_event_attributes, + ) + resp = api.batch_create_sessions( + request=request, + metadata=metadata, + ) + + add_event_on_current_span( + "Created sessions", + dict(count=len(resp.session)), + ) + + for session_pb in resp.session: + session = self._new_session() + session._session_id = session_pb.name.split("/")[-1] + self._sessions.put(session) + returned_session_count += 1 + add_span_event( span, - f"Creating {request.session_count} sessions", + f"Requested for {requested_session_count} sessions, returned {returned_session_count}", span_event_attributes, ) - resp = api.batch_create_sessions( - request=request, - metadata=metadata, - ) - for session_pb in resp.session: - session = self._new_session() - session._session_id = session_pb.name.split("/")[-1] - self._sessions.put(session) - returned_session_count += 1 - - add_span_event( - span, - f"Requested for {requested_session_count} sessions, returned {returned_session_count}", - span_event_attributes, - ) def get(self, timeout=None): """Check a session out from the pool. @@ -550,25 +562,30 @@ def bind(self, database): span_event_attributes, ) - returned_session_count = 0 - while created_session_count < self.size: - resp = api.batch_create_sessions( - request=request, - metadata=metadata, - ) - for session_pb in resp.session: - session = self._new_session() - session._session_id = session_pb.name.split("/")[-1] - self.put(session) - returned_session_count += 1 + observability_options = getattr(self._database, "observability_options", None) + with trace_call( + "Cloudspanner.PingingPool.BatchCreateSessions", + observability_options=observability_options, + ) as span: + returned_session_count = 0 + while created_session_count < self.size: + resp = api.batch_create_sessions( + request=request, + metadata=metadata, + ) + for session_pb in resp.session: + session = self._new_session() + session._session_id = session_pb.name.split("/")[-1] + self.put(session) + returned_session_count += 1 - created_session_count += len(resp.session) + created_session_count += len(resp.session) - add_span_event( - current_span, - f"Requested for {requested_session_count} sessions, return {returned_session_count}", - span_event_attributes, - ) + add_span_event( + span, + f"Requested for {requested_session_count} sessions, return {returned_session_count}", + span_event_attributes, + ) def get(self, timeout=None): """Check a session out from the pool. diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index 166d5488c6..12b6336c35 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -34,6 +34,7 @@ from google.cloud.spanner_v1._opentelemetry_tracing import ( add_span_event, get_current_span, + record_span_exception_and_status, trace_call, ) from google.cloud.spanner_v1.batch import Batch @@ -243,6 +244,10 @@ def delete(self): with trace_call( "CloudSpanner.DeleteSession", self, + extra_attributes={ + "session.id": self._session_id, + "session.name": self.name, + }, observability_options=observability_options, ): api.delete_session(name=self.name, metadata=metadata) @@ -458,47 +463,100 @@ def run_in_transaction(self, func, *args, **kw): ) attempts = 0 - while True: - if self._transaction is None: - txn = self.transaction() - txn.transaction_tag = transaction_tag - txn.exclude_txn_from_change_streams = exclude_txn_from_change_streams - else: - txn = self._transaction + observability_options = getattr(self._database, "observability_options", None) + with trace_call( + "CloudSpanner.Session.run_in_transaction", + self, + observability_options=observability_options, + ) as span: + while True: + if self._transaction is None: + add_span_event(span, "Creating Transaction") + txn = self.transaction() + txn.transaction_tag = transaction_tag + txn.exclude_txn_from_change_streams = ( + exclude_txn_from_change_streams + ) + else: + txn = self._transaction - try: attempts += 1 - return_value = func(txn, *args, **kw) - except Aborted as exc: - del self._transaction - _delay_until_retry(exc, deadline, attempts) - continue - except GoogleAPICallError: - del self._transaction - raise - except Exception: - txn.rollback() - raise - - try: - txn.commit( - return_commit_stats=self._database.log_commit_stats, - request_options=commit_request_options, - max_commit_delay=max_commit_delay, - ) - except Aborted as exc: - del self._transaction - _delay_until_retry(exc, deadline, attempts) - except GoogleAPICallError: - del self._transaction - raise - else: - if self._database.log_commit_stats and txn.commit_stats: - self._database.logger.info( - "CommitStats: {}".format(txn.commit_stats), - extra={"commit_stats": txn.commit_stats}, + span_attributes = {"attempt": attempts} + txn_id = getattr(txn, "_transaction_id", "") or "" + if txn_id: + span_attributes["transaction.id"] = txn_id + + add_span_event(span, "Using Transaction", span_attributes) + + try: + return_value = func(txn, *args, **kw) + except Aborted as exc: + del self._transaction + if span: + delay_seconds = _get_retry_delay(exc.errors[0], attempts) + attributes = dict(delay_seconds=delay_seconds) + attributes.update(span_attributes) + record_span_exception_and_status(span, exc) + add_span_event( + span, + "Transaction was aborted, retrying afresh", + attributes, + ) + + _delay_until_retry(exc, deadline, attempts) + continue + except GoogleAPICallError: + del self._transaction + add_span_event( + span, + "Transaction.commit failed due to GoogleAPICallError, not retrying", + span_attributes, + ) + raise + except Exception: + add_span_event( + span, + "Invoking Transaction.rollback(), not retrying", + span_attributes, + ) + txn.rollback() + raise + + try: + txn.commit( + return_commit_stats=self._database.log_commit_stats, + request_options=commit_request_options, + max_commit_delay=max_commit_delay, + ) + except Aborted as exc: + del self._transaction + if span: + delay_seconds = _get_retry_delay(exc.errors[0], attempts) + attributes = dict(delay_seconds=delay_seconds) + attributes.update(span_attributes) + record_span_exception_and_status(span, exc) + add_span_event( + span, + "Transaction was aborted, retrying afresh", + attributes, + ) + + _delay_until_retry(exc, deadline, attempts) + except GoogleAPICallError: + del self._transaction + add_span_event( + span, + "Transaction.commit failed due to GoogleAPICallError, not retrying", + span_attributes, ) - return return_value + raise + else: + if self._database.log_commit_stats and txn.commit_stats: + self._database.logger.info( + "CommitStats: {}".format(txn.commit_stats), + extra={"commit_stats": txn.commit_stats}, + ) + return return_value # Rational: this function factors out complex shared deadline / retry diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 89b5094706..8615e76f33 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -578,7 +578,7 @@ def _get_streamed_result_set( iterator = _restart_on_unavailable( restart, request, - "CloudSpanner.ReadWriteTransaction", + f"CloudSpanner.{type(self).__name__}.execute_streaming_sql", self._session, trace_attributes, transaction=self, diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index fa8e5121ff..123dfb6db3 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -278,7 +278,7 @@ def commit( trace_attributes = {"num_mutations": len(self._mutations)} observability_options = getattr(database, "observability_options", None) with trace_call( - "CloudSpanner.Commit", + f"CloudSpanner.{type(self).__name__}.commit", self._session, trace_attributes, observability_options, @@ -447,7 +447,7 @@ def execute_update( response = self._execute_request( method, request, - "CloudSpanner.ReadWriteTransaction", + f"CloudSpanner.{type(self).__name__}.execute_update", self._session, trace_attributes, observability_options=observability_options, @@ -464,7 +464,7 @@ def execute_update( response = self._execute_request( method, request, - "CloudSpanner.ReadWriteTransaction", + f"CloudSpanner.{type(self).__name__}.execute_update", self._session, trace_attributes, observability_options=observability_options, diff --git a/tests/_helpers.py b/tests/_helpers.py index 81787c5a86..dc386971cb 100644 --- a/tests/_helpers.py +++ b/tests/_helpers.py @@ -119,11 +119,13 @@ def assertSpanNames(self, want_span_names): def get_finished_spans(self): if HAS_OPENTELEMETRY_INSTALLED: - return list( + span_list = list( filter( lambda span: span and span.name, self.ot_exporter.get_finished_spans(), ) ) + # Sort the spans by their start time in the hierarchy. + return sorted(span_list, lambda span: span.start_time) else: return [] diff --git a/tests/system/test_session_api.py b/tests/system/test_session_api.py index b7337cb258..e0374bfff7 100644 --- a/tests/system/test_session_api.py +++ b/tests/system/test_session_api.py @@ -699,6 +699,162 @@ def _transaction_read_then_raise(transaction): assert rows == [] +@pytest.mark.skipif( + not _helpers.USE_EMULATOR, + reason="Emulator needed to run this tests", +) +@pytest.mark.skipif( + not ot_helpers.HAS_OPENTELEMETRY_INSTALLED, + reason="Tracing requires OpenTelemetry", +) +def test_transaction_abort_then_retry_spans(sessions_database, ot_exporter): + from google.auth.credentials import AnonymousCredentials + from google.api_core.exceptions import Aborted + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + from opentelemetry.trace.status import StatusCode + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.sampling import ALWAYS_ON + from opentelemetry import trace + + PROJECT = _helpers.EMULATOR_PROJECT + CONFIGURATION_NAME = "config-name" + INSTANCE_ID = _helpers.INSTANCE_ID + DISPLAY_NAME = "display-name" + DATABASE_ID = _helpers.unique_id("temp_db") + NODE_COUNT = 5 + LABELS = {"test": "true"} + + counters = dict(aborted=0) + already_aborted = False + + def select_in_txn(txn): + from google.rpc import error_details_pb2 + + results = txn.execute_sql("SELECT 1") + for row in results: + _ = row + + if counters["aborted"] == 0: + counters["aborted"] = 1 + raise Aborted( + "Thrown from ClientInterceptor for testing", + errors=[FauxCall(code_pb2.ABORTED)], + ) + + tracer_provider = TracerProvider(sampler=ALWAYS_ON) + trace_exporter = InMemorySpanExporter() + tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter)) + observability_options = dict( + tracer_provider=tracer_provider, + enable_extended_tracing=True, + ) + + client = spanner_v1.Client( + project=PROJECT, + observability_options=observability_options, + credentials=AnonymousCredentials(), + ) + + instance = client.instance( + INSTANCE_ID, + CONFIGURATION_NAME, + display_name=DISPLAY_NAME, + node_count=NODE_COUNT, + labels=LABELS, + ) + + try: + instance.create() + except Exception: + pass + + db = instance.database(DATABASE_ID) + try: + db.create() + except Exception: + pass + + db.run_in_transaction(select_in_txn) + + span_list = trace_exporter.get_finished_spans() + got_span_names = [span.name for span in span_list] + want_span_names = [ + "CloudSpanner.CreateSession", + "CloudSpanner.Transaction.execute_streaming_sql", + "CloudSpanner.Transaction.execute_streaming_sql", + "CloudSpanner.Transaction.commit", + "CloudSpanner.Session.run_in_transaction", + "CloudSpanner.Database.run_in_transaction", + ] + + assert got_span_names == want_span_names + + got_events = [] + got_statuses = [] + + # Some event attributes are noisy/highly ephemeral + # and can't be directly compared against. + imprecise_event_attributes = ["exception.stacktrace", "delay_seconds"] + for span in span_list: + got_statuses.append( + (span.name, span.status.status_code, span.status.description) + ) + for event in span.events: + evt_attributes = event.attributes.copy() + for attr_name in imprecise_event_attributes: + if attr_name in evt_attributes: + evt_attributes[attr_name] = "EPHEMERAL" + + got_events.append((event.name, evt_attributes)) + + # Check for the series of events + want_events = [ + ("Starting Commit", {}), + ("Commit Done", {}), + ("Creating Transaction", {}), + ("Using Transaction", {"attempt": 1}), + ( + "exception", + { + "exception.type": "google.api_core.exceptions.Aborted", + "exception.message": "409 Thrown from ClientInterceptor for testing", + "exception.stacktrace": "EPHEMERAL", + "exception.escaped": "False", + }, + ), + ( + "Transaction was aborted, retrying afresh", + {"delay_seconds": "EPHEMERAL", "attempt": 1}, + ), + ("Creating Transaction", {}), + ("Using Transaction", {"attempt": 2}), + ("Acquiring session", {"kind": "BurstyPool"}), + ("Waiting for a session to become available", {"kind": "BurstyPool"}), + ("No sessions available in pool. Creating session", {"kind": "BurstyPool"}), + ("Creating Session", {}), + ] + assert got_events == want_events + + # Check for the statues. + codes = StatusCode + want_statuses = [ + ("CloudSpanner.CreateSession", codes.OK, None), + ("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None), + ("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None), + ("CloudSpanner.Transaction.commit", codes.OK, None), + ( + "CloudSpanner.Session.run_in_transaction", + codes.ERROR, + "409 Thrown from ClientInterceptor for testing", + ), + ("CloudSpanner.Database.run_in_transaction", codes.OK, None), + ] + assert got_statuses == want_statuses + + @_helpers.retry_mabye_conflict def test_transaction_read_and_insert_or_update_then_commit( sessions_database, diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index a7f7a6f970..68b450563c 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -212,7 +212,7 @@ def test_commit_grpc_error(self): batch.commit() self.assertSpanAttributes( - "CloudSpanner.Commit", + "CloudSpanner.Batch.commit", status=StatusCode.ERROR, attributes=dict(BASE_ATTRIBUTES, num_mutations=1), ) @@ -261,7 +261,7 @@ def test_commit_ok(self): self.assertEqual(max_commit_delay, None) self.assertSpanAttributes( - "CloudSpanner.Commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) + "CloudSpanner.Batch.commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) ) def _test_commit_with_options( @@ -327,7 +327,7 @@ def _test_commit_with_options( self.assertEqual(actual_request_options, expected_request_options) self.assertSpanAttributes( - "CloudSpanner.Commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) + "CloudSpanner.Batch.commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) ) self.assertEqual(max_commit_delay_in, max_commit_delay) @@ -438,7 +438,7 @@ def test_context_mgr_success(self): self.assertEqual(request_options, RequestOptions()) self.assertSpanAttributes( - "CloudSpanner.Commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) + "CloudSpanner.Batch.commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) ) def test_context_mgr_failure(self): diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index 479a0d62e9..2be689a599 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -868,7 +868,7 @@ def test_execute_sql_other_error(self): self.assertEqual(derived._execute_sql_count, 1) self.assertSpanAttributes( - "CloudSpanner.ReadWriteTransaction", + "CloudSpanner._Derived.execute_streaming_sql", status=StatusCode.ERROR, attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY}), ) @@ -1024,7 +1024,7 @@ def _execute_sql_helper( self.assertEqual(derived._execute_sql_count, sql_count + 1) self.assertSpanAttributes( - "CloudSpanner.ReadWriteTransaction", + "CloudSpanner._Derived.execute_streaming_sql", status=StatusCode.OK, attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY_WITH_PARAM}), ) diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index e426f912b2..04a9d41df2 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -345,7 +345,7 @@ def test_commit_w_other_error(self): self.assertIsNone(transaction.committed) self.assertSpanAttributes( - "CloudSpanner.Commit", + "CloudSpanner.Transaction.commit", status=StatusCode.ERROR, attributes=dict(TestTransaction.BASE_ATTRIBUTES, num_mutations=1), ) @@ -427,7 +427,7 @@ def _commit_helper( self.assertEqual(transaction.commit_stats.mutation_count, 4) self.assertSpanAttributes( - "CloudSpanner.Commit", + "CloudSpanner.Transaction.commit", attributes=dict( TestTransaction.BASE_ATTRIBUTES, num_mutations=len(transaction._mutations),