Skip to content

Commit

Permalink
feat(observability): more descriptive and value adding spans
Browse files Browse the repository at this point in the history
This change adds more descriptive and value adding spans to
replace the generic CloudSpanner.ReadWriteTransaction.
With this change, we add new spans:
* CloudSpanner.Database.run_in_transaction
* CloudSpanner.execute_pdml
* CloudSpanner.execute_sql
* CloudSpanner.execute_update
  • Loading branch information
odeke-em committed Dec 6, 2024
1 parent a6811af commit 847d89b
Show file tree
Hide file tree
Showing 14 changed files with 977 additions and 294 deletions.
105 changes: 93 additions & 12 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,11 @@ def get_tracer(tracer_provider=None):
return tracer_provider.get_tracer(TRACER_NAME, TRACER_VERSION)


@contextmanager
def trace_call(name, session, extra_attributes=None, observability_options=None):
if session:
session._last_use_time = datetime.now()

if not HAS_OPENTELEMETRY_INSTALLED or not session:
# Empty context manager. Users will have to check if the generated value is None or a span
yield None
return
def _make_tracer_and_span_attributes(
session=None, extra_attributes=None, observability_options=None
):
if not HAS_OPENTELEMETRY_INSTALLED:
return None, None

tracer_provider = None

Expand All @@ -72,20 +68,24 @@ def trace_call(name, session, extra_attributes=None, observability_options=None)
# on by default.
enable_extended_tracing = True

db_name = ""
if session and getattr(session, "_database", None):
db_name = session._database.name

if isinstance(observability_options, dict): # Avoid false positives with mock.Mock
tracer_provider = observability_options.get("tracer_provider", None)
enable_extended_tracing = observability_options.get(
"enable_extended_tracing", enable_extended_tracing
)
db_name = observability_options.get("db_name", db_name)

tracer = get_tracer(tracer_provider)

# Set base attributes that we know for every trace created
db = session._database
attributes = {
"db.type": "spanner",
"db.url": SpannerClient.DEFAULT_ENDPOINT,
"db.instance": "" if not db else db.name,
"db.instance": db_name,
"net.host.name": SpannerClient.DEFAULT_ENDPOINT,
OTEL_SCOPE_NAME: TRACER_NAME,
OTEL_SCOPE_VERSION: TRACER_VERSION,
Expand All @@ -99,9 +99,77 @@ def trace_call(name, session, extra_attributes=None, observability_options=None)

if not enable_extended_tracing:
attributes.pop("db.statement", False)
attributes.pop("sql", False)
else:
# Otherwise there are places where the annotated sql was inserted
# directly from the arguments as "sql", and transform those into "db.statement".
db_statement = attributes.get("db.statement", None)
if not db_statement:
sql = attributes.get("sql", None)
if sql:
attributes = attributes.copy()
attributes.pop("sql", False)
attributes["db.statement"] = sql

return tracer, attributes


def trace_call_end_lazily(
name, session=None, extra_attributes=None, observability_options=None
):
"""
trace_call_end_lazily is used in situations where you don't want a context managed
span in a with statement to end as soon as a block exits. This is useful for example
after a Database.batch or Database.snapshot but without a context manager.
If you need to directly invoke tracing with a context manager, please invoke
`trace_call` with which you can invoke
 `with trace_call(...) as span:`
It is the caller's responsibility to explicitly invoke the returned ending function.
"""
if not name:
return None

tracer, span_attributes = _make_tracer_and_span_attributes(
session, extra_attributes, observability_options
)
if not tracer:
return None

span = tracer.start_span(
name, kind=trace.SpanKind.CLIENT, attributes=span_attributes
)
ctx_manager = trace.use_span(span, end_on_exit=True, record_exception=True)
ctx_manager.__enter__()

def discard(exc_type=None, exc_value=None, exc_traceback=None):
if not exc_type:
span.set_status(Status(StatusCode.OK))

ctx_manager.__exit__(exc_type, exc_value, exc_traceback)

return discard


@contextmanager
def trace_call(name, session=None, extra_attributes=None, observability_options=None):
"""
 trace_call is used in situations where you need to end a span with a context manager
 or after a scope is exited. If you need to keep a span alive and lazily end it, please
 invoke `trace_call_end_lazily`.
"""
if not name:
yield None
return

tracer, span_attributes = _make_tracer_and_span_attributes(
session, extra_attributes, observability_options
)
if not tracer:
yield None
return

with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT, attributes=attributes
name, kind=trace.SpanKind.CLIENT, attributes=span_attributes
) as span:
try:
yield span
Expand Down Expand Up @@ -131,3 +199,16 @@ def get_current_span():
def add_span_event(span, event_name, event_attributes=None):
if span:
span.add_event(event_name, event_attributes)


def add_event_on_current_span(event_name, event_attributes=None, span=None):
if not span:
span = get_current_span()

add_span_event(span, event_name, event_attributes)


def record_span_exception_and_status(span, exc):
if span:
span.set_status(Status(StatusCode.ERROR, str(exc)))
span.record_exception(exc)
60 changes: 57 additions & 3 deletions google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
_metadata_with_prefix,
_metadata_with_leader_aware_routing,
)
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from google.cloud.spanner_v1._opentelemetry_tracing import (
add_event_on_current_span,
trace_call,
trace_call_end_lazily,
)
from google.cloud.spanner_v1 import RequestOptions
from google.cloud.spanner_v1._helpers import _retry
from google.cloud.spanner_v1._helpers import _check_rst_stream_error
Expand All @@ -46,6 +50,12 @@ class _BatchBase(_SessionWrapper):
def __init__(self, session):
super(_BatchBase, self).__init__(session)
self._mutations = []
self.__base_discard_span = trace_call_end_lazily(
f"CloudSpanner.{type(self).__name__}",
self._session,
None,
getattr(self._session._database, "observability_options", None),
)

def _check_state(self):
"""Helper for :meth:`commit` et al.
Expand All @@ -69,6 +79,10 @@ def insert(self, table, columns, values):
:type values: list of lists
:param values: Values to be modified.
"""
add_event_on_current_span(
"insert mutations added",
dict(table=table, columns=columns),
)
self._mutations.append(Mutation(insert=_make_write_pb(table, columns, values)))

def update(self, table, columns, values):
Expand All @@ -84,6 +98,10 @@ def update(self, table, columns, values):
:param values: Values to be modified.
"""
self._mutations.append(Mutation(update=_make_write_pb(table, columns, values)))
add_event_on_current_span(
"update mutations added",
dict(table=table, columns=columns),
)

def insert_or_update(self, table, columns, values):
"""Insert/update one or more table rows.
Expand All @@ -100,6 +118,10 @@ def insert_or_update(self, table, columns, values):
self._mutations.append(
Mutation(insert_or_update=_make_write_pb(table, columns, values))
)
add_event_on_current_span(
"insert_or_update mutations added",
dict(table=table, columns=columns),
)

def replace(self, table, columns, values):
"""Replace one or more table rows.
Expand All @@ -114,6 +136,10 @@ def replace(self, table, columns, values):
:param values: Values to be modified.
"""
self._mutations.append(Mutation(replace=_make_write_pb(table, columns, values)))
add_event_on_current_span(
"replace mutations added",
dict(table=table, columns=columns),
)

def delete(self, table, keyset):
"""Delete one or more table rows.
Expand All @@ -126,6 +152,21 @@ def delete(self, table, keyset):
"""
delete = Mutation.Delete(table=table, key_set=keyset._to_pb())
self._mutations.append(Mutation(delete=delete))
add_event_on_current_span(
"delete mutations added",
dict(table=table),
)

def _discard_on_end(self, exc_type=None, exc_val=None, exc_traceback=None):
if self.__base_discard_span:
self.__base_discard_span(exc_type, exc_val, exc_traceback)
self.__base_discard_span = None

def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
self._discard_on_end(exc_type, exc_val, exc_traceback)

def __enter__(self):
return self


class Batch(_BatchBase):
Expand Down Expand Up @@ -207,7 +248,7 @@ def commit(
)
observability_options = getattr(database, "observability_options", None)
with trace_call(
"CloudSpanner.Commit",
"CloudSpanner.Batch.commit",
self._session,
trace_attributes,
observability_options=observability_options,
Expand All @@ -223,18 +264,31 @@ def commit(
)
self.committed = response.commit_timestamp
self.commit_stats = response.commit_stats
self._discard_on_end()
return self.committed

def __enter__(self):
"""Begin ``with`` block."""
self._check_state()
observability_options = getattr(
self._session._database, "observability_options", None
)
self.__discard_span = trace_call_end_lazily(
"CloudSpanner.Batch",
self._session,
observability_options=observability_options,
)

return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""End ``with`` block."""
if exc_type is None:
self.commit()
if self.__discard_span:
self.__discard_span(exc_type, exc_val, exc_tb)
self.__discard_span = None
self._discard_on_end()


class MutationGroup(_BatchBase):
Expand Down Expand Up @@ -326,7 +380,7 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
)
observability_options = getattr(database, "observability_options", None)
with trace_call(
"CloudSpanner.BatchWrite",
"CloudSpanner.batch_write",
self._session,
trace_attributes,
observability_options=observability_options,
Expand Down
Loading

0 comments on commit 847d89b

Please sign in to comment.