From 78073411f025848c6e86ee3d8bdc8a29e0cf151a Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 18 Sep 2024 17:45:27 -1000 Subject: [PATCH] observability: annotate Session+SessionPool events This change adds annotations for session and session pool events to aid customers in debugging latency issues with session pool malevolence and also for maintainers to figure out which session pool type is the most appropriate. Updates #1170 --- .../spanner_v1/_opentelemetry_tracing.py | 12 ++- google/cloud/spanner_v1/pool.py | 90 ++++++++++++++++++- google/cloud/spanner_v1/session.py | 18 +++- 3 files changed, 115 insertions(+), 5 deletions(-) diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 51501a07a3..bdbb7d5d05 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -78,8 +78,16 @@ def trace_call(name, session, extra_attributes=None): try: yield span except Exception as error: - span.set_status(Status(StatusCode.ERROR, str(error))) - span.record_exception(error) + set_span_error_and_record_exception(span, error) raise else: span.set_status(Status(StatusCode.OK)) + + +def set_span_error_and_record_exception(span, exc): + span.set_status(Status(StatusCode.ERROR, str(exc))) + span.record_exception(exc) + + +def get_current_span(tracer_provider=None): + return trace.get_current_span() diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 56837bfc0b..3618d1a2dd 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -16,6 +16,7 @@ import datetime import queue +import time from google.cloud.exceptions import NotFound from google.cloud.spanner_v1 import BatchCreateSessionsRequest @@ -24,8 +25,12 @@ _metadata_with_prefix, _metadata_with_leader_aware_routing, ) +from google.cloud.spanner_v1._opentelemetry_tracing import ( + get_current_span, +) from warnings import warn + _NOW = datetime.datetime.utcnow # unit tests may replace @@ -199,13 +204,32 @@ def bind(self, database): _metadata_with_leader_aware_routing(database._route_to_leader_enabled) ) self._database_role = self._database_role or self._database.database_role + requested_session_count = self.size - self._sessions.qsize() request = BatchCreateSessionsRequest( database=database.name, - session_count=self.size - self._sessions.qsize(), + session_count=requested_session_count, session_template=Session(creator_role=self.database_role), ) + current_span = get_current_span() + if requested_session_count > 0: + current_span.add_event( + f"Requesting {requested_session_count} sessions", + {"kind": "fixed_size_pool"}, + ) + + if self._sessions.full(): + current_span.add_event( + "Session pool is already full", {"kind": "fixed_size_pool"} + ) + return + + returned_session_count = 0 while not self._sessions.full(): + current_span.add_event( + f"Creating {request.session_count} sessions", + {"kind": "fixed_size_pool"}, + ) resp = api.batch_create_sessions( request=request, metadata=metadata, @@ -214,6 +238,12 @@ def bind(self, database): session = self._new_session() session._session_id = session_pb.name.split("/")[-1] self._sessions.put(session) + returned_session_count += 1 + + current_span.add_event( + f"Requested for {requested_session_count}, returned {returned_session_count}", + {"kind": "fixed_size_pool"}, + ) def get(self, timeout=None): """Check a session out from the pool. @@ -229,12 +259,23 @@ def get(self, timeout=None): if timeout is None: timeout = self.default_timeout + start_time = time.time() + current_span = get_current_span() + current_span.add_event("Acquiring session", {"kind": type(self).__name__}) session = self._sessions.get(block=True, timeout=timeout) if not session.exists(): session = self._database.session() session.create() + current_span.add_event( + "Acquired session", + { + "time.elapsed": time.time() - start_time, + "session.id": session.session_id, + "kind": type(self).__name__, + }, + ) return session def put(self, session): @@ -307,6 +348,10 @@ def get(self): :returns: an existing session from the pool, or a newly-created session. """ + start_time = time.time() + current_span = get_current_span() + current_span.add_event("Acquiring session", {"kind": type(self).__name__}) + try: session = self._sessions.get_nowait() except queue.Empty: @@ -316,6 +361,15 @@ def get(self): if not session.exists(): session = self._new_session() session.create() + else: + current_span.add_event( + "Cache hit: has usable session", + { + "id": session.session_id, + "kind": type(self).__name__, + }, + ) + return session def put(self, session): @@ -422,6 +476,17 @@ def bind(self, database): session_template=Session(creator_role=self.database_role), ) + requested_session_count = request.session_count + current_span.addEvent(f"Requesting {requested_session_count} sessions") + + if created_session_count >= self.size: + current_span.add_event( + "Created no new sessions as sessionPool is full", + {"kind": type(self).__name__}, + ) + return + + returned_session_count while created_session_count < self.size: resp = api.batch_create_sessions( request=request, @@ -431,8 +496,17 @@ def bind(self, database): session = self._new_session() session._session_id = session_pb.name.split("/")[-1] self.put(session) + returned_session_count += 1 + created_session_count += len(resp.session) + current_span.add_event( + "Requested for {requested_session_count} sessions, return {returned_session_count}", + { + "kind": "pinging_pool", + }, + ) + def get(self, timeout=None): """Check a session out from the pool. @@ -447,6 +521,12 @@ def get(self, timeout=None): if timeout is None: timeout = self.default_timeout + start_time = time.time() + current_span = get_current_span() + current_span.add_event( + "Waiting for a session to become available", {"kind": "pinging_pool"} + ) + ping_after, session = self._sessions.get(block=True, timeout=timeout) if _NOW() > ping_after: @@ -457,6 +537,14 @@ def get(self, timeout=None): session = self._new_session() session.create() + current_span.add_event( + "Acquired session", + { + "time.elapsed": time.time() - start_time, + "session.id": session.session_id, + "kind": "pinging_pool", + }, + ) return session def put(self, session): diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index 28280282f4..c1b6ac4a1f 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -30,7 +30,11 @@ _metadata_with_prefix, _metadata_with_leader_aware_routing, ) -from google.cloud.spanner_v1._opentelemetry_tracing import trace_call +from google.cloud.spanner_v1._opentelemetry_tracing import ( + get_current_span, + set_span_error_and_record_exception, + trace_call, +) from google.cloud.spanner_v1.batch import Batch from google.cloud.spanner_v1.snapshot import Snapshot from google.cloud.spanner_v1.transaction import Transaction @@ -113,6 +117,9 @@ def name(self): :raises ValueError: if session is not yet created """ if self._session_id is None: + err = "No session available" + current_span.add_event(err) + set_span_error_and_record_exception(current_span, err) raise ValueError("No session ID set by back-end") return self._database.name + "/sessions/" + self._session_id @@ -124,8 +131,14 @@ def create(self): :raises ValueError: if :attr:`session_id` is already set. """ + current_span = get_current_span() + current_span.add_event("Creating Session") + if self._session_id is not None: - raise ValueError("Session ID already set by back-end") + err = "Session ID already set by back-end" + current_span.add_event(err) + set_span_error_and_record_exception(current_span, err) + raise ValueError(err) api = self._database.spanner_api metadata = _metadata_with_prefix(self._database.name) if self._database._route_to_leader_enabled: @@ -148,6 +161,7 @@ def create(self): metadata=metadata, ) self._session_id = session_pb.name.split("/")[-1] + current_span.add_event("Using Session", {"id": self._session_id}) def exists(self): """Test for the existence of this session.