From f43795535d42091eaccab00cc3e4971f140c6ed5 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 6 Dec 2024 04:48:58 -0800 Subject: [PATCH] observability: add updated span events + traace more methods This change carves out parts of PR #1241 in smaller pieces to ease with smaller reviews. This change adds more span events, updates important spans to make them more distinct like changing: "CloudSpanner.ReadWriteTransaction" to more direct and more pointed spans like: * CloudSpanner.Transaction.execute_streaming_sql Also added important spans: * CloudSpanner.Database.run_in_transaction * CloudSpanner.Session.run_in_transaction --- .../spanner_v1/_opentelemetry_tracing.py | 42 +++- google/cloud/spanner_v1/batch.py | 27 ++- google/cloud/spanner_v1/database.py | 42 ++-- google/cloud/spanner_v1/pool.py | 90 +++++--- google/cloud/spanner_v1/session.py | 134 +++++++---- google/cloud/spanner_v1/snapshot.py | 8 +- google/cloud/spanner_v1/transaction.py | 8 +- tests/_helpers.py | 10 +- tests/system/test_observability_options.py | 5 +- tests/system/test_session_api.py | 209 ++++++++++++++++-- tests/unit/test_batch.py | 11 +- tests/unit/test_pool.py | 87 ++++++-- tests/unit/test_session.py | 15 +- tests/unit/test_snapshot.py | 12 +- tests/unit/test_transaction.py | 9 +- 15 files changed, 553 insertions(+), 156 deletions(-) diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 1caac59ecd..6421f2c978 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -56,15 +56,15 @@ def get_tracer(tracer_provider=None): @contextmanager -def trace_call(name, session, extra_attributes=None, observability_options=None): - if session: - session._last_use_time = datetime.now() - - if not HAS_OPENTELEMETRY_INSTALLED or not session: +def trace_call(name, session=None, extra_attributes=None, observability_options=None): + if not (HAS_OPENTELEMETRY_INSTALLED and name): # Empty context manager. Users will have to check if the generated value is None or a span yield None return + if session: + session._last_use_time = datetime.now() + tracer_provider = None # By default enable_extended_tracing=True because in a bid to minimize @@ -72,20 +72,24 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) # on by default. enable_extended_tracing = True + db_name = "" + if session and getattr(session, "_database", None): + db_name = session._database.name + if isinstance(observability_options, dict): # Avoid false positives with mock.Mock tracer_provider = observability_options.get("tracer_provider", None) enable_extended_tracing = observability_options.get( "enable_extended_tracing", enable_extended_tracing ) + db_name = observability_options.get("db_name", db_name) tracer = get_tracer(tracer_provider) # Set base attributes that we know for every trace created - db = session._database attributes = { "db.type": "spanner", "db.url": SpannerClient.DEFAULT_ENDPOINT, - "db.instance": "" if not db else db.name, + "db.instance": db_name, "net.host.name": SpannerClient.DEFAULT_ENDPOINT, OTEL_SCOPE_NAME: TRACER_NAME, OTEL_SCOPE_VERSION: TRACER_VERSION, @@ -99,6 +103,17 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) if not enable_extended_tracing: attributes.pop("db.statement", False) + attributes.pop("sql", False) + else: + # Otherwise there are places where the annotated sql was inserted + # directly from the arguments as "sql", and transform those into "db.statement". + db_statement = attributes.get("db.statement", None) + if not db_statement: + sql = attributes.get("sql", None) + if sql: + attributes = attributes.copy() + attributes.pop("sql", False) + attributes["db.statement"] = sql with tracer.start_as_current_span( name, kind=trace.SpanKind.CLIENT, attributes=attributes @@ -131,3 +146,16 @@ def get_current_span(): def add_span_event(span, event_name, event_attributes=None): if span: span.add_event(event_name, event_attributes) + + +def add_event_on_current_span(event_name, event_attributes=None, span=None): + if not span: + span = get_current_span() + + add_span_event(span, event_name, event_attributes) + + +def record_span_exception_and_status(span, exc): + if span: + span.set_status(Status(StatusCode.ERROR, str(exc))) + span.record_exception(exc) diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 948740d7d4..2c32457e3c 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -26,7 +26,10 @@ _metadata_with_prefix, _metadata_with_leader_aware_routing, ) -from google.cloud.spanner_v1._opentelemetry_tracing import trace_call +from google.cloud.spanner_v1._opentelemetry_tracing import ( + add_event_on_current_span, + trace_call, +) from google.cloud.spanner_v1 import RequestOptions from google.cloud.spanner_v1._helpers import _retry from google.cloud.spanner_v1._helpers import _check_rst_stream_error @@ -70,6 +73,10 @@ def insert(self, table, columns, values): :param values: Values to be modified. """ self._mutations.append(Mutation(insert=_make_write_pb(table, columns, values))) + add_event_on_current_span( + "insert mutations added", + dict(table=table, columns=columns), + ) def update(self, table, columns, values): """Update one or more existing table rows. @@ -84,6 +91,10 @@ def update(self, table, columns, values): :param values: Values to be modified. """ self._mutations.append(Mutation(update=_make_write_pb(table, columns, values))) + add_event_on_current_span( + "update mutations added", + dict(table=table, columns=columns), + ) def insert_or_update(self, table, columns, values): """Insert/update one or more table rows. @@ -100,6 +111,10 @@ def insert_or_update(self, table, columns, values): self._mutations.append( Mutation(insert_or_update=_make_write_pb(table, columns, values)) ) + add_event_on_current_span( + "insert_or_update mutations added", + dict(table=table, columns=columns), + ) def replace(self, table, columns, values): """Replace one or more table rows. @@ -114,6 +129,10 @@ def replace(self, table, columns, values): :param values: Values to be modified. """ self._mutations.append(Mutation(replace=_make_write_pb(table, columns, values))) + add_event_on_current_span( + "replace mutations added", + dict(table=table, columns=columns), + ) def delete(self, table, keyset): """Delete one or more table rows. @@ -126,6 +145,10 @@ def delete(self, table, keyset): """ delete = Mutation.Delete(table=table, key_set=keyset._to_pb()) self._mutations.append(Mutation(delete=delete)) + add_event_on_current_span( + "delete mutations added", + dict(table=table), + ) class Batch(_BatchBase): @@ -207,7 +230,7 @@ def commit( ) observability_options = getattr(database, "observability_options", None) with trace_call( - "CloudSpanner.Commit", + f"CloudSpanner.{type(self).__name__}.commit", self._session, trace_attributes, observability_options=observability_options, diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index c8230ab503..88d2bb60f7 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -70,6 +70,7 @@ from google.cloud.spanner_v1._opentelemetry_tracing import ( add_span_event, get_current_span, + trace_call, ) @@ -720,6 +721,7 @@ def execute_pdml(): iterator = _restart_on_unavailable( method=method, + trace_name="CloudSpanner.ExecuteStreamingSql", request=request, transaction_selector=txn_selector, observability_options=self.observability_options, @@ -881,20 +883,25 @@ def run_in_transaction(self, func, *args, **kw): :raises Exception: reraises any non-ABORT exceptions raised by ``func``. """ - # Sanity check: Is there a transaction already running? - # If there is, then raise a red flag. Otherwise, mark that this one - # is running. - if getattr(self._local, "transaction_running", False): - raise RuntimeError("Spanner does not support nested transactions.") - self._local.transaction_running = True - - # Check out a session and run the function in a transaction; once - # done, flip the sanity check bit back. - try: - with SessionCheckout(self._pool) as session: - return session.run_in_transaction(func, *args, **kw) - finally: - self._local.transaction_running = False + observability_options = getattr(self, "observability_options", None) + with trace_call( + "CloudSpanner.Database.run_in_transaction", + observability_options=observability_options, + ): + # Sanity check: Is there a transaction already running? + # If there is, then raise a red flag. Otherwise, mark that this one + # is running. + if getattr(self._local, "transaction_running", False): + raise RuntimeError("Spanner does not support nested transactions.") + self._local.transaction_running = True + + # Check out a session and run the function in a transaction; once + # done, flip the sanity check bit back. + try: + with SessionCheckout(self._pool) as session: + return session.run_in_transaction(func, *args, **kw) + finally: + self._local.transaction_running = False def restore(self, source): """Restore from a backup to this database. @@ -1120,7 +1127,12 @@ def observability_options(self): if not (self._instance and self._instance._client): return None - return getattr(self._instance._client, "observability_options", None) + opts = getattr(self._instance._client, "observability_options", None) + if not opts: + opts = dict() + + opts["db_name"] = self.name + return opts class BatchCheckout(object): diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 4f90196b4a..03bff81b52 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -28,6 +28,7 @@ from google.cloud.spanner_v1._opentelemetry_tracing import ( add_span_event, get_current_span, + trace_call, ) from warnings import warn @@ -237,29 +238,41 @@ def bind(self, database): session_template=Session(creator_role=self.database_role), ) - returned_session_count = 0 - while not self._sessions.full(): - request.session_count = requested_session_count - self._sessions.qsize() + observability_options = getattr(self._database, "observability_options", None) + with trace_call( + "CloudSpanner.FixedPool.BatchCreateSessions", + observability_options=observability_options, + ) as span: + returned_session_count = 0 + while not self._sessions.full(): + request.session_count = requested_session_count - self._sessions.qsize() + add_span_event( + span, + f"Creating {request.session_count} sessions", + span_event_attributes, + ) + resp = api.batch_create_sessions( + request=request, + metadata=metadata, + ) + + add_span_event( + span, + "Created sessions", + dict(count=len(resp.session)), + ) + + for session_pb in resp.session: + session = self._new_session() + session._session_id = session_pb.name.split("/")[-1] + self._sessions.put(session) + returned_session_count += 1 + add_span_event( span, - f"Creating {request.session_count} sessions", + f"Requested for {requested_session_count} sessions, returned {returned_session_count}", span_event_attributes, ) - resp = api.batch_create_sessions( - request=request, - metadata=metadata, - ) - for session_pb in resp.session: - session = self._new_session() - session._session_id = session_pb.name.split("/")[-1] - self._sessions.put(session) - returned_session_count += 1 - - add_span_event( - span, - f"Requested for {requested_session_count} sessions, returned {returned_session_count}", - span_event_attributes, - ) def get(self, timeout=None): """Check a session out from the pool. @@ -550,25 +563,30 @@ def bind(self, database): span_event_attributes, ) - returned_session_count = 0 - while created_session_count < self.size: - resp = api.batch_create_sessions( - request=request, - metadata=metadata, - ) - for session_pb in resp.session: - session = self._new_session() - session._session_id = session_pb.name.split("/")[-1] - self.put(session) - returned_session_count += 1 + observability_options = getattr(self._database, "observability_options", None) + with trace_call( + "CloudSpanner.PingingPool.BatchCreateSessions", + observability_options=observability_options, + ) as span: + returned_session_count = 0 + while created_session_count < self.size: + resp = api.batch_create_sessions( + request=request, + metadata=metadata, + ) + for session_pb in resp.session: + session = self._new_session() + session._session_id = session_pb.name.split("/")[-1] + self.put(session) + returned_session_count += 1 - created_session_count += len(resp.session) + created_session_count += len(resp.session) - add_span_event( - current_span, - f"Requested for {requested_session_count} sessions, return {returned_session_count}", - span_event_attributes, - ) + add_span_event( + span, + f"Requested for {requested_session_count} sessions, returned {returned_session_count}", + span_event_attributes, + ) def get(self, timeout=None): """Check a session out from the pool. diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index 166d5488c6..12b6336c35 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -34,6 +34,7 @@ from google.cloud.spanner_v1._opentelemetry_tracing import ( add_span_event, get_current_span, + record_span_exception_and_status, trace_call, ) from google.cloud.spanner_v1.batch import Batch @@ -243,6 +244,10 @@ def delete(self): with trace_call( "CloudSpanner.DeleteSession", self, + extra_attributes={ + "session.id": self._session_id, + "session.name": self.name, + }, observability_options=observability_options, ): api.delete_session(name=self.name, metadata=metadata) @@ -458,47 +463,100 @@ def run_in_transaction(self, func, *args, **kw): ) attempts = 0 - while True: - if self._transaction is None: - txn = self.transaction() - txn.transaction_tag = transaction_tag - txn.exclude_txn_from_change_streams = exclude_txn_from_change_streams - else: - txn = self._transaction + observability_options = getattr(self._database, "observability_options", None) + with trace_call( + "CloudSpanner.Session.run_in_transaction", + self, + observability_options=observability_options, + ) as span: + while True: + if self._transaction is None: + add_span_event(span, "Creating Transaction") + txn = self.transaction() + txn.transaction_tag = transaction_tag + txn.exclude_txn_from_change_streams = ( + exclude_txn_from_change_streams + ) + else: + txn = self._transaction - try: attempts += 1 - return_value = func(txn, *args, **kw) - except Aborted as exc: - del self._transaction - _delay_until_retry(exc, deadline, attempts) - continue - except GoogleAPICallError: - del self._transaction - raise - except Exception: - txn.rollback() - raise - - try: - txn.commit( - return_commit_stats=self._database.log_commit_stats, - request_options=commit_request_options, - max_commit_delay=max_commit_delay, - ) - except Aborted as exc: - del self._transaction - _delay_until_retry(exc, deadline, attempts) - except GoogleAPICallError: - del self._transaction - raise - else: - if self._database.log_commit_stats and txn.commit_stats: - self._database.logger.info( - "CommitStats: {}".format(txn.commit_stats), - extra={"commit_stats": txn.commit_stats}, + span_attributes = {"attempt": attempts} + txn_id = getattr(txn, "_transaction_id", "") or "" + if txn_id: + span_attributes["transaction.id"] = txn_id + + add_span_event(span, "Using Transaction", span_attributes) + + try: + return_value = func(txn, *args, **kw) + except Aborted as exc: + del self._transaction + if span: + delay_seconds = _get_retry_delay(exc.errors[0], attempts) + attributes = dict(delay_seconds=delay_seconds) + attributes.update(span_attributes) + record_span_exception_and_status(span, exc) + add_span_event( + span, + "Transaction was aborted, retrying afresh", + attributes, + ) + + _delay_until_retry(exc, deadline, attempts) + continue + except GoogleAPICallError: + del self._transaction + add_span_event( + span, + "Transaction.commit failed due to GoogleAPICallError, not retrying", + span_attributes, + ) + raise + except Exception: + add_span_event( + span, + "Invoking Transaction.rollback(), not retrying", + span_attributes, + ) + txn.rollback() + raise + + try: + txn.commit( + return_commit_stats=self._database.log_commit_stats, + request_options=commit_request_options, + max_commit_delay=max_commit_delay, + ) + except Aborted as exc: + del self._transaction + if span: + delay_seconds = _get_retry_delay(exc.errors[0], attempts) + attributes = dict(delay_seconds=delay_seconds) + attributes.update(span_attributes) + record_span_exception_and_status(span, exc) + add_span_event( + span, + "Transaction was aborted, retrying afresh", + attributes, + ) + + _delay_until_retry(exc, deadline, attempts) + except GoogleAPICallError: + del self._transaction + add_span_event( + span, + "Transaction.commit failed due to GoogleAPICallError, not retrying", + span_attributes, ) - return return_value + raise + else: + if self._database.log_commit_stats and txn.commit_stats: + self._database.logger.info( + "CommitStats: {}".format(txn.commit_stats), + extra={"commit_stats": txn.commit_stats}, + ) + return return_value # Rational: this function factors out complex shared deadline / retry diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 89b5094706..97a81fab47 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -335,7 +335,7 @@ def read( iterator = _restart_on_unavailable( restart, request, - "CloudSpanner.ReadOnlyTransaction", + f"CloudSpanner.{type(self).__name__}.read", self._session, trace_attributes, transaction=self, @@ -357,7 +357,7 @@ def read( iterator = _restart_on_unavailable( restart, request, - "CloudSpanner.ReadOnlyTransaction", + f"CloudSpanner.{type(self).__name__}.read", self._session, trace_attributes, transaction=self, @@ -578,7 +578,7 @@ def _get_streamed_result_set( iterator = _restart_on_unavailable( restart, request, - "CloudSpanner.ReadWriteTransaction", + f"CloudSpanner.{type(self).__name__}.execute_streaming_sql", self._session, trace_attributes, transaction=self, @@ -676,7 +676,7 @@ def partition_read( trace_attributes = {"table_id": table, "columns": columns} with trace_call( - "CloudSpanner.PartitionReadOnlyTransaction", + f"CloudSpanner.{type(self).__name__}.partition_read", self._session, trace_attributes, observability_options=getattr(database, "observability_options", None), diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index fa8e5121ff..2a2477f4b3 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -199,7 +199,7 @@ def rollback(self): ) observability_options = getattr(database, "observability_options", None) with trace_call( - "CloudSpanner.Rollback", + f"CloudSpanner.{type(self).__name__}.rollback", self._session, observability_options=observability_options, ): @@ -278,7 +278,7 @@ def commit( trace_attributes = {"num_mutations": len(self._mutations)} observability_options = getattr(database, "observability_options", None) with trace_call( - "CloudSpanner.Commit", + f"CloudSpanner.{type(self).__name__}.commit", self._session, trace_attributes, observability_options, @@ -447,7 +447,7 @@ def execute_update( response = self._execute_request( method, request, - "CloudSpanner.ReadWriteTransaction", + f"CloudSpanner.{type(self).__name__}.execute_update", self._session, trace_attributes, observability_options=observability_options, @@ -464,7 +464,7 @@ def execute_update( response = self._execute_request( method, request, - "CloudSpanner.ReadWriteTransaction", + f"CloudSpanner.{type(self).__name__}.execute_update", self._session, trace_attributes, observability_options=observability_options, diff --git a/tests/_helpers.py b/tests/_helpers.py index 81787c5a86..bfe72d46f7 100644 --- a/tests/_helpers.py +++ b/tests/_helpers.py @@ -78,7 +78,8 @@ def tearDown(self): def assertNoSpans(self): if HAS_OPENTELEMETRY_INSTALLED: - span_list = self.ot_exporter.get_finished_spans() + span_list = self.get_finished_spans() + print("got_spans", [span.name for span in span_list]) self.assertEqual(len(span_list), 0) def assertSpanAttributes( @@ -119,11 +120,16 @@ def assertSpanNames(self, want_span_names): def get_finished_spans(self): if HAS_OPENTELEMETRY_INSTALLED: - return list( + span_list = list( filter( lambda span: span and span.name, self.ot_exporter.get_finished_spans(), ) ) + # Sort the spans by their start time in the hierarchy. + return sorted(span_list, key=lambda span: span.start_time) else: return [] + + def reset(self): + self.tearDown() diff --git a/tests/system/test_observability_options.py b/tests/system/test_observability_options.py index 8382255c15..10b9cd8315 100644 --- a/tests/system/test_observability_options.py +++ b/tests/system/test_observability_options.py @@ -105,7 +105,10 @@ def test_propagation(enable_extended_tracing): len(from_inject_spans) >= 2 ) # "Expecting at least 2 spans from the injected trace exporter" gotNames = [span.name for span in from_inject_spans] - wantNames = ["CloudSpanner.CreateSession", "CloudSpanner.ReadWriteTransaction"] + wantNames = [ + "CloudSpanner.CreateSession", + "CloudSpanner.Snapshot.execute_streaming_sql", + ] assert gotNames == wantNames # Check for conformance of enable_extended_tracing diff --git a/tests/system/test_session_api.py b/tests/system/test_session_api.py index b7337cb258..67d7afa164 100644 --- a/tests/system/test_session_api.py +++ b/tests/system/test_session_api.py @@ -447,7 +447,7 @@ def test_batch_insert_then_read(sessions_database, ot_exporter): ) assert_span_attributes( ot_exporter, - "CloudSpanner.Commit", + "CloudSpanner.Batch.commit", attributes=_make_attributes(db_name, num_mutations=2), span=span_list[1], ) @@ -459,7 +459,7 @@ def test_batch_insert_then_read(sessions_database, ot_exporter): ) assert_span_attributes( ot_exporter, - "CloudSpanner.ReadOnlyTransaction", + "CloudSpanner.Snapshot.read", attributes=_make_attributes(db_name, columns=sd.COLUMNS, table_id=sd.TABLE), span=span_list[3], ) @@ -608,6 +608,18 @@ def test_transaction_read_and_insert_then_rollback( if ot_exporter is not None: span_list = ot_exporter.get_finished_spans() + got_span_names = [span.name for span in span_list] + want_span_names = [ + "CloudSpanner.CreateSession", + "CloudSpanner.GetSession", + "CloudSpanner.Batch.commit", + "CloudSpanner.BeginTransaction", + "CloudSpanner.Transaction.read", + "CloudSpanner.Transaction.read", + "CloudSpanner.Rollback", + "CloudSpanner.Snapshot.read", + ] + print("got_spans", got_span_names) assert len(span_list) == 8 assert_span_attributes( @@ -624,7 +636,7 @@ def test_transaction_read_and_insert_then_rollback( ) assert_span_attributes( ot_exporter, - "CloudSpanner.Commit", + "CloudSpanner.Batch.commit", attributes=_make_attributes(db_name, num_mutations=1), span=span_list[2], ) @@ -636,7 +648,7 @@ def test_transaction_read_and_insert_then_rollback( ) assert_span_attributes( ot_exporter, - "CloudSpanner.ReadOnlyTransaction", + "CloudSpanner.Transaction.read", attributes=_make_attributes( db_name, table_id=sd.TABLE, @@ -646,7 +658,7 @@ def test_transaction_read_and_insert_then_rollback( ) assert_span_attributes( ot_exporter, - "CloudSpanner.ReadOnlyTransaction", + "CloudSpanner.Transaction.read", attributes=_make_attributes( db_name, table_id=sd.TABLE, @@ -656,13 +668,13 @@ def test_transaction_read_and_insert_then_rollback( ) assert_span_attributes( ot_exporter, - "CloudSpanner.Rollback", + "CloudSpanner.Transaction.rollback", attributes=_make_attributes(db_name), span=span_list[6], ) assert_span_attributes( ot_exporter, - "CloudSpanner.ReadOnlyTransaction", + "CloudSpanner.Snapshot.read", attributes=_make_attributes( db_name, table_id=sd.TABLE, @@ -699,6 +711,162 @@ def _transaction_read_then_raise(transaction): assert rows == [] +@pytest.mark.skipif( + not _helpers.USE_EMULATOR, + reason="Emulator needed to run this tests", +) +@pytest.mark.skipif( + not ot_helpers.HAS_OPENTELEMETRY_INSTALLED, + reason="Tracing requires OpenTelemetry", +) +def test_transaction_abort_then_retry_spans(sessions_database, ot_exporter): + from google.auth.credentials import AnonymousCredentials + from google.api_core.exceptions import Aborted + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + from opentelemetry.trace.status import StatusCode + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.sampling import ALWAYS_ON + from opentelemetry import trace + + PROJECT = _helpers.EMULATOR_PROJECT + CONFIGURATION_NAME = "config-name" + INSTANCE_ID = _helpers.INSTANCE_ID + DISPLAY_NAME = "display-name" + DATABASE_ID = _helpers.unique_id("temp_db") + NODE_COUNT = 5 + LABELS = {"test": "true"} + + counters = dict(aborted=0) + already_aborted = False + + def select_in_txn(txn): + from google.rpc import error_details_pb2 + + results = txn.execute_sql("SELECT 1") + for row in results: + _ = row + + if counters["aborted"] == 0: + counters["aborted"] = 1 + raise Aborted( + "Thrown from ClientInterceptor for testing", + errors=[FauxCall(code_pb2.ABORTED)], + ) + + tracer_provider = TracerProvider(sampler=ALWAYS_ON) + trace_exporter = InMemorySpanExporter() + tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter)) + observability_options = dict( + tracer_provider=tracer_provider, + enable_extended_tracing=True, + ) + + client = spanner_v1.Client( + project=PROJECT, + observability_options=observability_options, + credentials=AnonymousCredentials(), + ) + + instance = client.instance( + INSTANCE_ID, + CONFIGURATION_NAME, + display_name=DISPLAY_NAME, + node_count=NODE_COUNT, + labels=LABELS, + ) + + try: + instance.create() + except Exception: + pass + + db = instance.database(DATABASE_ID) + try: + db.create() + except Exception: + pass + + db.run_in_transaction(select_in_txn) + + span_list = trace_exporter.get_finished_spans() + got_span_names = [span.name for span in span_list] + want_span_names = [ + "CloudSpanner.CreateSession", + "CloudSpanner.Transaction.execute_streaming_sql", + "CloudSpanner.Transaction.execute_streaming_sql", + "CloudSpanner.Transaction.commit", + "CloudSpanner.Session.run_in_transaction", + "CloudSpanner.Database.run_in_transaction", + ] + + assert got_span_names == want_span_names + + got_events = [] + got_statuses = [] + + # Some event attributes are noisy/highly ephemeral + # and can't be directly compared against. + imprecise_event_attributes = ["exception.stacktrace", "delay_seconds"] + for span in span_list: + got_statuses.append( + (span.name, span.status.status_code, span.status.description) + ) + for event in span.events: + evt_attributes = event.attributes.copy() + for attr_name in imprecise_event_attributes: + if attr_name in evt_attributes: + evt_attributes[attr_name] = "EPHEMERAL" + + got_events.append((event.name, evt_attributes)) + + # Check for the series of events + want_events = [ + ("Starting Commit", {}), + ("Commit Done", {}), + ("Creating Transaction", {}), + ("Using Transaction", {"attempt": 1}), + ( + "exception", + { + "exception.type": "google.api_core.exceptions.Aborted", + "exception.message": "409 Thrown from ClientInterceptor for testing", + "exception.stacktrace": "EPHEMERAL", + "exception.escaped": "False", + }, + ), + ( + "Transaction was aborted, retrying afresh", + {"delay_seconds": "EPHEMERAL", "attempt": 1}, + ), + ("Creating Transaction", {}), + ("Using Transaction", {"attempt": 2}), + ("Acquiring session", {"kind": "BurstyPool"}), + ("Waiting for a session to become available", {"kind": "BurstyPool"}), + ("No sessions available in pool. Creating session", {"kind": "BurstyPool"}), + ("Creating Session", {}), + ] + assert got_events == want_events + + # Check for the statues. + codes = StatusCode + want_statuses = [ + ("CloudSpanner.CreateSession", codes.OK, None), + ("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None), + ("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None), + ("CloudSpanner.Transaction.commit", codes.OK, None), + ( + "CloudSpanner.Session.run_in_transaction", + codes.ERROR, + "409 Thrown from ClientInterceptor for testing", + ), + ("CloudSpanner.Database.run_in_transaction", codes.OK, None), + ] + assert got_statuses == want_statuses + + @_helpers.retry_mabye_conflict def test_transaction_read_and_insert_or_update_then_commit( sessions_database, @@ -1183,18 +1351,29 @@ def unit_of_work(transaction): session.run_in_transaction(unit_of_work) span_list = ot_exporter.get_finished_spans() - assert len(span_list) == 5 - expected_span_names = [ + got_span_names = [span.name for span in span_list] + want_span_names = [ "CloudSpanner.CreateSession", - "CloudSpanner.Commit", + "CloudSpanner.Batch.commit", "CloudSpanner.DMLTransaction", - "CloudSpanner.Commit", + "CloudSpanner.Transaction.commit", + "CloudSpanner.Session.run_in_transaction", "Test Span", ] - assert [span.name for span in span_list] == expected_span_names - for span in span_list[2:-1]: - assert span.context.trace_id == span_list[-1].context.trace_id - assert span.parent.span_id == span_list[-1].context.span_id + assert got_span_names == want_span_names + + def assert_parent_hierarchy(parent, children): + for child in children: + assert child.context.trace_id == parent.context.trace_id + assert child.parent.span_id == parent.context.span_id + + test_span = span_list[-1] + test_span_children = [span_list[-2]] + assert_parent_hierarchy(test_span, test_span_children) + + session_run_in_txn = span_list[-2] + session_run_in_txn_children = span_list[2:-2] + assert_parent_hierarchy(session_run_in_txn, session_run_in_txn_children) def test_execute_partitioned_dml( diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index a7f7a6f970..a43678f3b9 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -212,7 +212,7 @@ def test_commit_grpc_error(self): batch.commit() self.assertSpanAttributes( - "CloudSpanner.Commit", + "CloudSpanner.Batch.commit", status=StatusCode.ERROR, attributes=dict(BASE_ATTRIBUTES, num_mutations=1), ) @@ -261,7 +261,8 @@ def test_commit_ok(self): self.assertEqual(max_commit_delay, None) self.assertSpanAttributes( - "CloudSpanner.Commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) + "CloudSpanner.Batch.commit", + attributes=dict(BASE_ATTRIBUTES, num_mutations=1), ) def _test_commit_with_options( @@ -327,7 +328,8 @@ def _test_commit_with_options( self.assertEqual(actual_request_options, expected_request_options) self.assertSpanAttributes( - "CloudSpanner.Commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) + "CloudSpanner.Batch.commit", + attributes=dict(BASE_ATTRIBUTES, num_mutations=1), ) self.assertEqual(max_commit_delay_in, max_commit_delay) @@ -438,7 +440,8 @@ def test_context_mgr_success(self): self.assertEqual(request_options, RequestOptions()) self.assertSpanAttributes( - "CloudSpanner.Commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) + "CloudSpanner.Batch.commit", + attributes=dict(BASE_ATTRIBUTES, num_mutations=1), ) def test_context_mgr_failure(self): diff --git a/tests/unit/test_pool.py b/tests/unit/test_pool.py index fbb35201eb..486314a1a9 100644 --- a/tests/unit/test_pool.py +++ b/tests/unit/test_pool.py @@ -241,25 +241,36 @@ def test_spans_bind_get(self): with trace_call("pool.Get", SESSIONS[0]) as span: pool.get() - wantEventNames = [ - "Acquiring session", - "Waiting for a session to become available", - "Acquired session", - ] - self.assertSpanEvents("pool.Get", wantEventNames, span) - # Check for the overall spans too. + span_list = self.get_finished_spans() + got_span_names = [span.name for span in span_list] + want_span_names = ["CloudSpanner.FixedPool.BatchCreateSessions", "pool.Get"] + print("got", got_span_names) + assert got_span_names == want_span_names + + attrs = TestFixedSizePool.BASE_ATTRIBUTES.copy() + attrs["db.instance"] = "" + + # Check for the overall spans. + self.assertSpanAttributes( + "CloudSpanner.FixedPool.BatchCreateSessions", + status=StatusCode.OK, + attributes=attrs, + span=span_list[0], + ) + self.assertSpanAttributes( "pool.Get", + status=StatusCode.OK, attributes=TestFixedSizePool.BASE_ATTRIBUTES, + span=span_list[-1], ) - wantEventNames = [ "Acquiring session", "Waiting for a session to become available", "Acquired session", ] - self.assertSpanEvents("pool.Get", wantEventNames) + self.assertSpanEvents("pool.Get", wantEventNames, span_list[-1]) def test_spans_bind_get_empty_pool(self): # Tests trying to invoke pool.get() from an empty pool. @@ -304,18 +315,22 @@ def test_spans_pool_bind(self): except Exception: pass + span_list = self.get_finished_spans() + got_span_names = [span.name for span in span_list] + want_span_names = ["testBind", "CloudSpanner.FixedPool.BatchCreateSessions"] + assert got_span_names == want_span_names + wantEventNames = [ "Requesting 1 sessions", - "Creating 1 sessions", "exception", ] - self.assertSpanEvents("testBind", wantEventNames) + self.assertSpanEvents("testBind", wantEventNames, span_list[0]) - # Check for the overall spans. self.assertSpanAttributes( "testBind", status=StatusCode.ERROR, attributes=TestFixedSizePool.BASE_ATTRIBUTES, + span=span_list[0], ) def test_get_expired(self): @@ -364,6 +379,7 @@ def test_put_full(self): SESSIONS = [_Session(database)] * 4 database._sessions.extend(SESSIONS) pool.bind(database) + self.reset() with self.assertRaises(queue.Full): pool.put(_Session(database)) @@ -474,16 +490,26 @@ def test_spans_get_empty_pool(self): session.create.assert_called() self.assertTrue(pool._sessions.empty()) + span_list = self.get_finished_spans() + got_span_names = [span.name for span in span_list] + want_span_names = [ + "pool.Get", + ] + print("got_spans", got_span_names) + assert got_span_names == want_span_names + + create_span = span_list[-1] self.assertSpanAttributes( "pool.Get", attributes=TestBurstyPool.BASE_ATTRIBUTES, + span=create_span, ) wantEventNames = [ "Acquiring session", "Waiting for a session to become available", "No sessions available in pool. Creating session", ] - self.assertSpanEvents("pool.Get", wantEventNames) + self.assertSpanEvents("pool.Get", wantEventNames, span=create_span) def test_get_non_empty_session_exists(self): pool = self._make_one() @@ -708,6 +734,7 @@ def test_get_hit_no_ping(self): SESSIONS = [_Session(database)] * 4 database._sessions.extend(SESSIONS) pool.bind(database) + self.reset() session = pool.get() @@ -731,6 +758,8 @@ def test_get_hit_w_ping(self): with _Monkey(MUT, _NOW=lambda: sessions_created): pool.bind(database) + self.reset() + session = pool.get() self.assertIs(session, SESSIONS[0]) @@ -753,6 +782,7 @@ def test_get_hit_w_ping_expired(self): with _Monkey(MUT, _NOW=lambda: sessions_created): pool.bind(database) + self.reset() session = pool.get() @@ -799,7 +829,25 @@ def test_put_full(self): pool.put(_Session(database)) self.assertTrue(pool._sessions.full()) - self.assertNoSpans() + + span_list = self.get_finished_spans() + got_span_names = [span.name for span in span_list] + want_span_names = ["CloudSpanner.PingingPool.BatchCreateSessions"] + assert got_span_names == want_span_names + + attrs = TestPingingPool.BASE_ATTRIBUTES.copy() + attrs["db.instance"] = "" + self.assertSpanAttributes( + "CloudSpanner.PingingPool.BatchCreateSessions", + attributes=attrs, + span=span_list[-1], + ) + wantEventNames = [ + 'Requested for 4 sessions, returned 4' + ] + self.assertSpanEvents( + "CloudSpanner.PingingPool.BatchCreateSessions", wantEventNames + ) def test_put_non_full(self): import datetime @@ -828,6 +876,7 @@ def test_clear(self): SESSIONS = [_Session(database)] * 10 database._sessions.extend(SESSIONS) pool.bind(database) + self.reset() self.assertTrue(pool._sessions.full()) api = database.spanner_api @@ -852,6 +901,7 @@ def test_ping_oldest_fresh(self): SESSIONS = [_Session(database)] * 1 database._sessions.extend(SESSIONS) pool.bind(database) + self.reset() pool.ping() @@ -886,6 +936,7 @@ def test_ping_oldest_stale_and_not_exists(self): SESSIONS[0]._exists = False database._sessions.extend(SESSIONS) pool.bind(database) + self.reset() later = datetime.datetime.utcnow() + datetime.timedelta(seconds=4000) with _Monkey(MUT, _NOW=lambda: later): @@ -914,15 +965,21 @@ def test_spans_get_and_leave_empty_pool(self): # session.create.assert_called() self.assertTrue(pool._sessions.empty()) + span_list = self.get_finished_spans() + got_span_names = [span.name for span in span_list] + want_span_names = ["CloudSpanner.PingingPool.BatchCreateSessions", "pool.Get"] + assert got_span_names == want_span_names + self.assertSpanAttributes( "pool.Get", attributes=TestPingingPool.BASE_ATTRIBUTES, + span=span_list[-1], ) wantEventNames = [ "Waiting for a session to become available", "Acquired session", ] - self.assertSpanEvents("pool.Get", wantEventNames) + self.assertSpanEvents("pool.Get", wantEventNames, span_list[-1]) class TestSessionCheckout(unittest.TestCase): diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 966adadcbd..0d60e98cd0 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -558,8 +558,11 @@ def test_delete_hit(self): metadata=[("google-cloud-resource-prefix", database.name)], ) + attrs = {"session.id": session._session_id, "session.name": session.name} + attrs.update(TestSession.BASE_ATTRIBUTES) self.assertSpanAttributes( - "CloudSpanner.DeleteSession", attributes=TestSession.BASE_ATTRIBUTES + "CloudSpanner.DeleteSession", + attributes=attrs, ) def test_delete_miss(self): @@ -580,10 +583,13 @@ def test_delete_miss(self): metadata=[("google-cloud-resource-prefix", database.name)], ) + attrs = {"session.id": session._session_id, "session.name": session.name} + attrs.update(TestSession.BASE_ATTRIBUTES) + self.assertSpanAttributes( "CloudSpanner.DeleteSession", status=StatusCode.ERROR, - attributes=TestSession.BASE_ATTRIBUTES, + attributes=attrs, ) def test_delete_error(self): @@ -604,10 +610,13 @@ def test_delete_error(self): metadata=[("google-cloud-resource-prefix", database.name)], ) + attrs = {"session.id": session._session_id, "session.name": session.name} + attrs.update(TestSession.BASE_ATTRIBUTES) + self.assertSpanAttributes( "CloudSpanner.DeleteSession", status=StatusCode.ERROR, - attributes=TestSession.BASE_ATTRIBUTES, + attributes=attrs, ) def test_snapshot_not_created(self): diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index 479a0d62e9..5a21b2861a 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -616,7 +616,7 @@ def test_read_other_error(self): list(derived.read(TABLE_NAME, COLUMNS, keyset)) self.assertSpanAttributes( - "CloudSpanner.ReadOnlyTransaction", + "CloudSpanner._Derived.read", status=StatusCode.ERROR, attributes=dict( BASE_ATTRIBUTES, table_id=TABLE_NAME, columns=tuple(COLUMNS) @@ -773,7 +773,7 @@ def _read_helper( ) self.assertSpanAttributes( - "CloudSpanner.ReadOnlyTransaction", + "CloudSpanner._Derived.read", attributes=dict( BASE_ATTRIBUTES, table_id=TABLE_NAME, columns=tuple(COLUMNS) ), @@ -868,7 +868,7 @@ def test_execute_sql_other_error(self): self.assertEqual(derived._execute_sql_count, 1) self.assertSpanAttributes( - "CloudSpanner.ReadWriteTransaction", + "CloudSpanner._Derived.execute_streaming_sql", status=StatusCode.ERROR, attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY}), ) @@ -1024,7 +1024,7 @@ def _execute_sql_helper( self.assertEqual(derived._execute_sql_count, sql_count + 1) self.assertSpanAttributes( - "CloudSpanner.ReadWriteTransaction", + "CloudSpanner._Derived.execute_streaming_sql", status=StatusCode.OK, attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY_WITH_PARAM}), ) @@ -1195,7 +1195,7 @@ def _partition_read_helper( ) self.assertSpanAttributes( - "CloudSpanner.PartitionReadOnlyTransaction", + "CloudSpanner._Derived.partition_read", status=StatusCode.OK, attributes=dict( BASE_ATTRIBUTES, table_id=TABLE_NAME, columns=tuple(COLUMNS) @@ -1226,7 +1226,7 @@ def test_partition_read_other_error(self): list(derived.partition_read(TABLE_NAME, COLUMNS, keyset)) self.assertSpanAttributes( - "CloudSpanner.PartitionReadOnlyTransaction", + "CloudSpanner._Derived.partition_read", status=StatusCode.ERROR, attributes=dict( BASE_ATTRIBUTES, table_id=TABLE_NAME, columns=tuple(COLUMNS) diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index e426f912b2..6794a067b2 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -266,7 +266,7 @@ def test_rollback_w_other_error(self): self.assertFalse(transaction.rolled_back) self.assertSpanAttributes( - "CloudSpanner.Rollback", + "CloudSpanner.Transaction.rollback", status=StatusCode.ERROR, attributes=TestTransaction.BASE_ATTRIBUTES, ) @@ -299,7 +299,8 @@ def test_rollback_ok(self): ) self.assertSpanAttributes( - "CloudSpanner.Rollback", attributes=TestTransaction.BASE_ATTRIBUTES + "CloudSpanner.Transaction.rollback", + attributes=TestTransaction.BASE_ATTRIBUTES, ) def test_commit_not_begun(self): @@ -345,7 +346,7 @@ def test_commit_w_other_error(self): self.assertIsNone(transaction.committed) self.assertSpanAttributes( - "CloudSpanner.Commit", + "CloudSpanner.Transaction.commit", status=StatusCode.ERROR, attributes=dict(TestTransaction.BASE_ATTRIBUTES, num_mutations=1), ) @@ -427,7 +428,7 @@ def _commit_helper( self.assertEqual(transaction.commit_stats.mutation_count, 4) self.assertSpanAttributes( - "CloudSpanner.Commit", + "CloudSpanner.Transaction.commit", attributes=dict( TestTransaction.BASE_ATTRIBUTES, num_mutations=len(transaction._mutations),