Skip to content

Commit

Permalink
observability: annotate Session+SessionPool events
Browse files Browse the repository at this point in the history
This change adds annotations for session and session pool
events to aid customers in debugging latency issues with
session pool malevolence and also for maintainers to
figure out which session pool type is the most appropriate.

Updates googleapis#1170
  • Loading branch information
odeke-em committed Sep 19, 2024
1 parent cb8a2b7 commit adfad00
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 7 deletions.
4 changes: 2 additions & 2 deletions examples/grpc_instrumentation_enabled.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@

def main():
# Setup common variables that'll be used between Spanner and traces.
project_id = os.environ.get('SPANNER_PROJECT_ID', 'test-project')
project_id = os.environ.get('XSPANNER_PROJECT_ID', 'test-project')

# Setup OpenTelemetry, trace and Cloud Trace exporter.
tracer_provider = TracerProvider(sampler=ALWAYS_ON)
trace_exporter = CloudTraceSpanExporter(project_id=project_id)
trace_exporter = CloudTraceSpanExporter()
tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter))
trace.set_tracer_provider(tracer_provider)
# Retrieve a tracer from the global tracer provider.
Expand Down
12 changes: 10 additions & 2 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,16 @@ def trace_call(name, session, extra_attributes=None):
try:
yield span
except Exception as error:
span.set_status(Status(StatusCode.ERROR, str(error)))
span.record_exception(error)
set_span_error_and_record_exception(span, error)
raise
else:
span.set_status(Status(StatusCode.OK))


def set_span_error_and_record_exception(span, exc):
span.set_status(Status(StatusCode.ERROR, str(exc)))
span.record_exception(exc)


def get_current_span(tracer_provider=None):
return trace.get_current_span()
90 changes: 89 additions & 1 deletion google/cloud/spanner_v1/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import datetime
import queue
import time

from google.cloud.exceptions import NotFound
from google.cloud.spanner_v1 import BatchCreateSessionsRequest
Expand All @@ -24,8 +25,12 @@
_metadata_with_prefix,
_metadata_with_leader_aware_routing,
)
from google.cloud.spanner_v1._opentelemetry_tracing import (
get_current_span,
)
from warnings import warn


_NOW = datetime.datetime.utcnow # unit tests may replace


Expand Down Expand Up @@ -199,13 +204,32 @@ def bind(self, database):
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
)
self._database_role = self._database_role or self._database.database_role
requested_session_count = self.size - self._sessions.qsize()
request = BatchCreateSessionsRequest(
database=database.name,
session_count=self.size - self._sessions.qsize(),
session_count=requested_session_count,
session_template=Session(creator_role=self.database_role),
)

current_span = get_current_span()
if requested_session_count > 0:
current_span.add_event(
f"Requesting {requested_session_count} sessions",
{"kind": "fixed_size_pool"},
)

if self._sessions.full():
current_span.add_event(
"Session pool is already full", {"kind": "fixed_size_pool"}
)
return

returned_session_count = 0
while not self._sessions.full():
current_span.add_event(
f"Creating {request.session_count} sessions",
{"kind": "fixed_size_pool"},
)
resp = api.batch_create_sessions(
request=request,
metadata=metadata,
Expand All @@ -214,6 +238,12 @@ def bind(self, database):
session = self._new_session()
session._session_id = session_pb.name.split("/")[-1]
self._sessions.put(session)
returned_session_count += 1

current_span.add_event(
f"Requested for {requested_session_count}, returned {returned_session_count}",
{"kind": "fixed_size_pool"},
)

def get(self, timeout=None):
"""Check a session out from the pool.
Expand All @@ -229,12 +259,23 @@ def get(self, timeout=None):
if timeout is None:
timeout = self.default_timeout

start_time = time.time()
current_span = get_current_span()
current_span.add_event("Acquiring session", {"kind": type(self).__name__})
session = self._sessions.get(block=True, timeout=timeout)

if not session.exists():
session = self._database.session()
session.create()

current_span.add_event(
"Acquired session",
{
"time.elapsed": time.time() - start_time,
"session.id": session.session_id,
"kind": type(self).__name__,
},
)
return session

def put(self, session):
Expand Down Expand Up @@ -307,6 +348,10 @@ def get(self):
:returns: an existing session from the pool, or a newly-created
session.
"""
start_time = time.time()
current_span = get_current_span()
current_span.add_event("Acquiring session", {"kind": type(self).__name__})

try:
session = self._sessions.get_nowait()
except queue.Empty:
Expand All @@ -316,6 +361,15 @@ def get(self):
if not session.exists():
session = self._new_session()
session.create()
else:
current_span.add_event(
"Cache hit: has usable session",
{
"id": session.session_id,
"kind": type(self).__name__,
},
)

return session

def put(self, session):
Expand Down Expand Up @@ -422,6 +476,17 @@ def bind(self, database):
session_template=Session(creator_role=self.database_role),
)

requested_session_count = request.session_count
current_span.addEvent(f"Requesting {requested_session_count} sessions")

if created_session_count >= self.size:
current_span.add_event(
"Created no new sessions as sessionPool is full",
{"kind": type(self).__name__},
)
return

returned_session_count
while created_session_count < self.size:
resp = api.batch_create_sessions(
request=request,
Expand All @@ -431,8 +496,17 @@ def bind(self, database):
session = self._new_session()
session._session_id = session_pb.name.split("/")[-1]
self.put(session)
returned_session_count += 1

created_session_count += len(resp.session)

current_span.add_event(
"Requested for {requested_session_count} sessions, return {returned_session_count}",
{
"kind": "pinging_pool",
},
)

def get(self, timeout=None):
"""Check a session out from the pool.
Expand All @@ -447,6 +521,12 @@ def get(self, timeout=None):
if timeout is None:
timeout = self.default_timeout

start_time = time.time()
current_span = get_current_span()
current_span.add_event(
"Waiting for a session to become available", {"kind": "pinging_pool"}
)

ping_after, session = self._sessions.get(block=True, timeout=timeout)

if _NOW() > ping_after:
Expand All @@ -457,6 +537,14 @@ def get(self, timeout=None):
session = self._new_session()
session.create()

current_span.add_event(
"Acquired session",
{
"time.elapsed": time.time() - start_time,
"session.id": session.session_id,
"kind": "pinging_pool",
},
)
return session

def put(self, session):
Expand Down
18 changes: 16 additions & 2 deletions google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
_metadata_with_prefix,
_metadata_with_leader_aware_routing,
)
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from google.cloud.spanner_v1._opentelemetry_tracing import (
get_current_span,
set_span_error_and_record_exception,
trace_call,
)
from google.cloud.spanner_v1.batch import Batch
from google.cloud.spanner_v1.snapshot import Snapshot
from google.cloud.spanner_v1.transaction import Transaction
Expand Down Expand Up @@ -113,6 +117,9 @@ def name(self):
:raises ValueError: if session is not yet created
"""
if self._session_id is None:
err = "No session available"
current_span.add_event(err)
set_span_error_and_record_exception(current_span, err)
raise ValueError("No session ID set by back-end")
return self._database.name + "/sessions/" + self._session_id

Expand All @@ -124,8 +131,14 @@ def create(self):
:raises ValueError: if :attr:`session_id` is already set.
"""
current_span = get_current_span()
current_span.add_event("Creating Session")

if self._session_id is not None:
raise ValueError("Session ID already set by back-end")
err = "Session ID already set by back-end"
current_span.add_event(err)
set_span_error_and_record_exception(current_span, err)
raise ValueError(err)
api = self._database.spanner_api
metadata = _metadata_with_prefix(self._database.name)
if self._database._route_to_leader_enabled:
Expand All @@ -148,6 +161,7 @@ def create(self):
metadata=metadata,
)
self._session_id = session_pb.name.split("/")[-1]
current_span.add_event("Using Session", {"id": self._session_id})

def exists(self):
"""Test for the existence of this session.
Expand Down

0 comments on commit adfad00

Please sign in to comment.