Skip to content

Commit

Permalink
General touch-ups for every better fine-grained spans and events
Browse files Browse the repository at this point in the history
  • Loading branch information
odeke-em committed Dec 2, 2024
1 parent 51d3e91 commit 47ffdfc
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 139 deletions.
68 changes: 61 additions & 7 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ def get_tracer(tracer_provider=None):
return tracer_provider.get_tracer(TRACER_NAME, TRACER_VERSION)


def trace_end_explicitly(
name, session=None, extra_attributes=None, observability_options=None
def _make_tracer_and_span_attributes(
session=None, extra_attributes=None, observability_options=None
):
if not HAS_OPENTELEMETRY_INSTALLED:
return None
return None, None

tracer_provider = None

Expand All @@ -68,7 +68,7 @@ def trace_end_explicitly(
enable_extended_tracing = True

db_name = ""
if session and session._database:
if session and getattr(session, "_database", None):
db_name = session._database.name

if isinstance(observability_options, dict): # Avoid false positives with mock.Mock
Expand Down Expand Up @@ -99,13 +99,45 @@ def trace_end_explicitly(
if not enable_extended_tracing:
attributes.pop("db.statement", False)

return tracer.start_span(name, kind=trace.SpanKind.CLIENT, attributes=attributes)
return tracer, attributes


def trace_call_end_lazily(
name, session=None, extra_attributes=None, observability_options=None
):
"""
 trace_call_end_lazily is used in situations where you won't have a context manager
 and need to end a span explicitly when a specific condition happens. If you need a
 context manager, please invoke `trace_call` with which you can invoke
 `with trace_call(...) as span:`
 It is the caller's responsibility to explicitly invoke span.end()
"""
tracer, span_attributes = _make_tracer_and_span_attributes(
session, extra_attributes, observability_options
)
if not tracer:
return None
return tracer.start_span(
name, kind=trace.SpanKind.CLIENT, attributes=span_attributes
)


@contextmanager
def trace_call(name, session=None, extra_attributes=None, observability_options=None):
with trace_end_explicitly(
name, session, extra_attributes, observability_options
"""
 trace_call is used in situations where you need to end a span with a context manager
 or after a scope is exited. If you need to keep a span alive and lazily end it, please
 invoke `trace_call_end_lazily`.
"""
tracer, span_attributes = _make_tracer_and_span_attributes(
session, extra_attributes, observability_options
)
if not tracer:
yield None
return

with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT, attributes=span_attributes
) as span:
try:
yield span
Expand All @@ -115,3 +147,25 @@ def trace_call(name, session=None, extra_attributes=None, observability_options=
raise
else:
span.set_status(Status(StatusCode.OK))


def set_span_status_error(span, error):
if span:
span.set_status(Status(StatusCode.ERROR, str(error)))


def set_span_status_ok(span):
if span:
span.set_status(Status(StatusCode.OK))


def get_current_span():
if not HAS_OPENTELEMETRY_INSTALLED:
return None
return trace.get_current_span()


def add_event_on_current_span(self, event_name, attributes=None):
current_span = get_current_span()
if current_span:
current_span.add_event(event_commentary, attributes)
47 changes: 21 additions & 26 deletions google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
_metadata_with_leader_aware_routing,
)
from google.cloud.spanner_v1._opentelemetry_tracing import (
add_event_on_current_span,
trace_call,
trace_end_explicitly,
trace_call_end_lazily,
)
from google.cloud.spanner_v1 import RequestOptions
from google.cloud.spanner_v1._helpers import _retry
Expand All @@ -50,9 +51,9 @@ def __init__(self, session):
super(_BatchBase, self).__init__(session)
self._mutations = []
observability_options = getattr(
self._session.database, "observability_options", None
self._session._database, "observability_options", None
)
self.__span = trace_end_explicitly(
self.__span = trace_call_end_lazily(
"CloudSpannerX." + type(self).__name__,
self._session,
observability_options=observability_options,
Expand Down Expand Up @@ -80,10 +81,9 @@ def insert(self, table, columns, values):
:type values: list of lists
:param values: Values to be modified.
"""
if self.__span:
self.__span.add_event(
"insert mutations inserted", dict(table=table, columns=columns)
)
add_event_on_span(
self.__span, "insert mutations added", dict(table=table, columns=columns)
)
self._mutations.append(Mutation(insert=_make_write_pb(table, columns, values)))

def update(self, table, columns, values):
Expand All @@ -98,11 +98,10 @@ def update(self, table, columns, values):
:type values: list of lists
:param values: Values to be modified.
"""
if self.__span:
self.__span.add_event(
"update mutations inserted", dict(table=table, columns=columns)
)
self._mutations.append(Mutation(update=_make_write_pb(table, columns, values)))
add_event_on_span(
self.__span, "update mutations added", dict(table=table, columns=columns)
)

def insert_or_update(self, table, columns, values):
"""Insert/update one or more table rows.
Expand All @@ -116,14 +115,14 @@ def insert_or_update(self, table, columns, values):
:type values: list of lists
:param values: Values to be modified.
"""
if self.__span:
self.__span.add_event(
"insert_or_update mutations inserted",
dict(table=table, columns=columns),
)
self._mutations.append(
Mutation(insert_or_update=_make_write_pb(table, columns, values))
)
add_event_on_span(
self.__span,
"insert_or_update mutations added",
dict(table=table, columns=columns),
)

def replace(self, table, columns, values):
"""Replace one or more table rows.
Expand All @@ -137,11 +136,10 @@ def replace(self, table, columns, values):
:type values: list of lists
:param values: Values to be modified.
"""
if self.__span:
self.__span.add_event(
"replace mutations inserted", dict(table=table, columns=columns)
)
self._mutations.append(Mutation(replace=_make_write_pb(table, columns, values)))
add_event_on_span(
self.__span, "replace mutations added", dict(table=table, columns=columns)
)

def delete(self, table, keyset):
"""Delete one or more table rows.
Expand All @@ -154,10 +152,7 @@ def delete(self, table, keyset):
"""
delete = Mutation.Delete(table=table, key_set=keyset._to_pb())
self._mutations.append(Mutation(delete=delete))
if self.__span:
self.__span.add_event(
"delete mutations inserted", dict(table=table, columns=columns)
)
add_event_on_span(self.__span, "delete mutations added", dict(table=table))


class Batch(_BatchBase):
Expand Down Expand Up @@ -261,9 +256,9 @@ def __enter__(self):
"""Begin ``with`` block."""
self._check_state()
observability_options = getattr(
self._session.database, "observability_options", None
self._session._database, "observability_options", None
)
self.__span = trace_end_explicitly(
self.__span = trace_call_end_lazily(
"CloudSpanner.Batch",
self._session,
observability_options=observability_options,
Expand Down
Loading

0 comments on commit 47ffdfc

Please sign in to comment.