Skip to content

Commit

Permalink
observability: annotate Session+SessionPool events
Browse files Browse the repository at this point in the history
This change adds annotations for session and session pool
events to aid customers in debugging latency issues with
session pool malevolence and also for maintainers to
figure out which session pool type is the most appropriate.

Updates googleapis#1170
  • Loading branch information
odeke-em committed Sep 20, 2024
1 parent b2e6762 commit f59084c
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 5 deletions.
13 changes: 11 additions & 2 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,17 @@ def trace_call(name, session, extra_attributes=None):
try:
yield span
except Exception as error:
span.set_status(Status(StatusCode.ERROR, str(error)))
span.record_exception(error)
set_span_error_and_record_exception(span, error)
raise
else:
span.set_status(Status(StatusCode.OK))


def set_span_error_and_record_exception(span, exc):
if exc and span:
span.set_status(Status(StatusCode.ERROR, str(exc)))
span.record_exception(exc)


def get_current_span():
return trace.get_current_span()
90 changes: 89 additions & 1 deletion google/cloud/spanner_v1/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import datetime
import queue
import time

from google.cloud.exceptions import NotFound
from google.cloud.spanner_v1 import BatchCreateSessionsRequest
Expand All @@ -24,6 +25,9 @@
_metadata_with_prefix,
_metadata_with_leader_aware_routing,
)
from google.cloud.spanner_v1._opentelemetry_tracing import (
get_current_span,
)
from warnings import warn

_NOW = datetime.datetime.utcnow # unit tests may replace
Expand Down Expand Up @@ -199,13 +203,32 @@ def bind(self, database):
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
)
self._database_role = self._database_role or self._database.database_role
requested_session_count = self.size - self._sessions.qsize()
request = BatchCreateSessionsRequest(
database=database.name,
session_count=self.size - self._sessions.qsize(),
session_count=requested_session_count,
session_template=Session(creator_role=self.database_role),
)

current_span = get_current_span()
if requested_session_count > 0:
current_span.add_event(
f"Requesting {requested_session_count} sessions",
{"kind": "fixed_size_pool"},
)

if self._sessions.full():
current_span.add_event(
"Session pool is already full", {"kind": "fixed_size_pool"}
)
return

returned_session_count = 0
while not self._sessions.full():
current_span.add_event(
f"Creating {request.session_count} sessions",
{"kind": "fixed_size_pool"},
)
resp = api.batch_create_sessions(
request=request,
metadata=metadata,
Expand All @@ -214,6 +237,12 @@ def bind(self, database):
session = self._new_session()
session._session_id = session_pb.name.split("/")[-1]
self._sessions.put(session)
returned_session_count += 1

current_span.add_event(
f"Requested for {requested_session_count}, returned {returned_session_count}",
{"kind": "fixed_size_pool"},
)

def get(self, timeout=None):
"""Check a session out from the pool.
Expand All @@ -229,12 +258,23 @@ def get(self, timeout=None):
if timeout is None:
timeout = self.default_timeout

start_time = time.time()
current_span = get_current_span()
current_span.add_event("Acquiring session", {"kind": type(self).__name__})
session = self._sessions.get(block=True, timeout=timeout)

if not session.exists():
session = self._database.session()
session.create()

current_span.add_event(
"Acquired session",
{
"time.elapsed": time.time() - start_time,
"session.id": session.session_id,
"kind": type(self).__name__,
},
)
return session

def put(self, session):
Expand Down Expand Up @@ -307,6 +347,10 @@ def get(self):
:returns: an existing session from the pool, or a newly-created
session.
"""
start_time = time.time()
current_span = get_current_span()
current_span.add_event("Acquiring session", {"kind": type(self).__name__})

try:
session = self._sessions.get_nowait()
except queue.Empty:
Expand All @@ -316,6 +360,15 @@ def get(self):
if not session.exists():
session = self._new_session()
session.create()
else:
current_span.add_event(
"Cache hit: has usable session",
{
"id": session.session_id,
"kind": type(self).__name__,
},
)

return session

def put(self, session):
Expand Down Expand Up @@ -422,6 +475,18 @@ def bind(self, database):
session_template=Session(creator_role=self.database_role),
)

requested_session_count = request.session_count
current_span = get_current_span()
current_span.add_event(f"Requesting {requested_session_count} sessions")

if created_session_count >= self.size:
current_span.add_event(
"Created no new sessions as sessionPool is full",
{"kind": type(self).__name__},
)
return

returned_session_count = 0
while created_session_count < self.size:
resp = api.batch_create_sessions(
request=request,
Expand All @@ -431,8 +496,17 @@ def bind(self, database):
session = self._new_session()
session._session_id = session_pb.name.split("/")[-1]
self.put(session)
returned_session_count += 1

created_session_count += len(resp.session)

current_span.add_event(
"Requested for {requested_session_count} sessions, return {returned_session_count}",
{
"kind": "pinging_pool",
},
)

def get(self, timeout=None):
"""Check a session out from the pool.
Expand All @@ -447,6 +521,12 @@ def get(self, timeout=None):
if timeout is None:
timeout = self.default_timeout

start_time = time.time()
current_span = get_current_span()
current_span.add_event(
"Waiting for a session to become available", {"kind": "pinging_pool"}
)

ping_after, session = self._sessions.get(block=True, timeout=timeout)

if _NOW() > ping_after:
Expand All @@ -457,6 +537,14 @@ def get(self, timeout=None):
session = self._new_session()
session.create()

current_span.add_event(
"Acquired session",
{
"time.elapsed": time.time() - start_time,
"session.id": session.session_id,
"kind": "pinging_pool",
},
)
return session

def put(self, session):
Expand Down
19 changes: 17 additions & 2 deletions google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
_metadata_with_prefix,
_metadata_with_leader_aware_routing,
)
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from google.cloud.spanner_v1._opentelemetry_tracing import (
get_current_span,
set_span_error_and_record_exception,
trace_call,
)
from google.cloud.spanner_v1.batch import Batch
from google.cloud.spanner_v1.snapshot import Snapshot
from google.cloud.spanner_v1.transaction import Transaction
Expand Down Expand Up @@ -113,6 +117,10 @@ def name(self):
:raises ValueError: if session is not yet created
"""
if self._session_id is None:
err = "No session available"
current_span = get_current_span()
current_span.add_event(err)
set_span_error_and_record_exception(current_span, err)
raise ValueError("No session ID set by back-end")
return self._database.name + "/sessions/" + self._session_id

Expand All @@ -124,8 +132,14 @@ def create(self):
:raises ValueError: if :attr:`session_id` is already set.
"""
current_span = get_current_span()
current_span.add_event("Creating Session")

if self._session_id is not None:
raise ValueError("Session ID already set by back-end")
err = "Session ID already set by back-end"
current_span.add_event(err)
set_span_error_and_record_exception(current_span, err)
raise ValueError(err)
api = self._database.spanner_api
metadata = _metadata_with_prefix(self._database.name)
if self._database._route_to_leader_enabled:
Expand All @@ -148,6 +162,7 @@ def create(self):
metadata=metadata,
)
self._session_id = session_pb.name.split("/")[-1]
current_span.add_event("Using Session", {"id": self._session_id})

def exists(self):
"""Test for the existence of this session.
Expand Down
10 changes: 10 additions & 0 deletions tests/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,13 @@ def assertSpanAttributes(
self.assertEqual(span.name, name)
self.assertEqual(span.status.status_code, status)
self.assertEqual(dict(span.attributes), attributes)

def assertSpanEvents(self, name, wantEventNames=[], span=None):
if HAS_OPENTELEMETRY_INSTALLED:
if not span:
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list) > 0, True)
span = span_list[0]

self.assertEqual(span.name, name)
self.assertEqual(len(span.events), len(wantEventNames))
4 changes: 4 additions & 0 deletions tests/unit/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,10 @@ def __init__(self, database=None, name=TestBatch.SESSION_NAME):
self._database = database
self.name = name

@property
def session_id(self):
return self.name


class _Database(object):
name = "testing"
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -3188,6 +3188,10 @@ def run_in_transaction(self, func, *args, **kw):
self._retried = (func, args, kw)
return self._committed

@property
def session_id(self):
return self.name


class _MockIterator(object):
def __init__(self, *values, **kw):
Expand Down
7 changes: 7 additions & 0 deletions tests/unit/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@


from functools import total_ordering
import time
import unittest

import mock
Expand Down Expand Up @@ -923,6 +924,8 @@ def __init__(self, database, exists=True, transaction=None):
self.create = mock.Mock()
self._deleted = False
self._transaction = transaction
# Generate a faux id.
self._session_id = f"time.time()"

def __lt__(self, other):
return id(self) < id(other)
Expand All @@ -949,6 +952,10 @@ def transaction(self):
txn = self._transaction = _make_transaction(self)
return txn

@property
def session_id(self):
return self._session_id


class _Database(object):
def __init__(self, name):
Expand Down
38 changes: 38 additions & 0 deletions tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import google.api_core.gapic_v1.method
from google.cloud.spanner_v1 import RequestOptions
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
import mock
from tests._helpers import (
OpenTelemetryBase,
Expand Down Expand Up @@ -174,6 +175,43 @@ def test_create_w_database_role(self):
"CloudSpanner.CreateSession", attributes=TestSession.BASE_ATTRIBUTES
)

def test_create_session_span_annotations(self):
from google.cloud.spanner_v1 import CreateSessionRequest
from google.cloud.spanner_v1 import Session as SessionRequestProto

session_pb = self._make_session_pb(
self.SESSION_NAME, database_role=self.DATABASE_ROLE
)

gax_api = self._make_spanner_api()
gax_api.create_session.return_value = session_pb
database = self._make_database(database_role=self.DATABASE_ROLE)
database.spanner_api = gax_api
session = self._make_one(database, database_role=self.DATABASE_ROLE)

with trace_call("TestSessionSpan", session):
session.create()

self.assertEqual(session.session_id, self.SESSION_ID)
self.assertEqual(session.database_role, self.DATABASE_ROLE)
session_template = SessionRequestProto(creator_role=self.DATABASE_ROLE)

request = CreateSessionRequest(
database=database.name,
session=session_template,
)

gax_api.create_session.assert_called_once_with(
request=request,
metadata=[
("google-cloud-resource-prefix", database.name),
("x-goog-spanner-route-to-leader", "true"),
],
)

wantEventNames = ["Acquiring session", "Creating Session", "Using Session"]
self.assertSpanEvents("CloudSpanner.CreateSession", wantEventNames)

def test_create_wo_database_role(self):
from google.cloud.spanner_v1 import CreateSessionRequest

Expand Down
4 changes: 4 additions & 0 deletions tests/unit/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,10 @@ def __init__(self, database=None, name=TestSnapshot.SESSION_NAME):
self._database = database
self.name = name

@property
def session_id(self):
return self.name


class _MockIterator(object):
def __init__(self, *values, **kw):
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/test_spanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,10 @@ def __init__(self, database=None, name=TestTransaction.SESSION_NAME):
self._database = database
self.name = name

@property
def session_id(self):
return self.name


class _MockIterator(object):
def __init__(self, *values, **kw):
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,10 @@ def __init__(self, database=None, name=TestTransaction.SESSION_NAME):
self._database = database
self.name = name

@property
def session_id(self):
return self.name


class _FauxSpannerAPI(object):
_committed = None
Expand Down

0 comments on commit f59084c

Please sign in to comment.