Skip to content

Commit

Permalink
Wring up passthrough context manager
Browse files Browse the repository at this point in the history
  • Loading branch information
odeke-em committed Dec 2, 2024
1 parent 3ff4176 commit cd4a8b1
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 44 deletions.
21 changes: 16 additions & 5 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,15 @@ def trace_call_end_lazily(
name, session=None, extra_attributes=None, observability_options=None
):
"""
 trace_call_end_lazily is used in situations where you won't have a context manager
 and need to end a span explicitly when a specific condition happens. If you need a
 context manager, please invoke `trace_call` with which you can invoke
trace_call_end_lazily is used in situations where you don't want a context managed
span in a with statement to end as soon as a block exits. This is useful for example
after a Database.batch or Database.snapshot but without a context manager.
If you need to directly invoke tracing with a context manager, please invoke
`trace_call` with which you can invoke
 `with trace_call(...) as span:`
It is the caller's responsibility to explicitly invoke span.end()
It is the caller's responsibility to explicitly invoke the returned ending function.
"""

if not name:
return None

Expand All @@ -131,9 +134,17 @@ def trace_call_end_lazily(
)
if not tracer:
return None
return tracer.start_span(

span = tracer.start_span(
name, kind=trace.SpanKind.CLIENT, attributes=span_attributes
)
ctx_manager = trace.use_span(span, end_on_exit=True, record_exception=True)
ctx_manager.__enter__()

def discard(exc_type=None, exc_value=None, exc_traceback=None):
ctx_manager.__exit__(exc_type, exc_value, exc_traceback)

return discard


@contextmanager
Expand Down
19 changes: 9 additions & 10 deletions google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class _BatchBase(_SessionWrapper):
def __init__(self, session):
super(_BatchBase, self).__init__(session)
self._mutations = []
self.__span = trace_call_end_lazily(
self.__discard_span = trace_call_end_lazily(
f"CloudSpanner.{type(self).__name__}",
self._session,
None,
Expand Down Expand Up @@ -82,7 +82,6 @@ def insert(self, table, columns, values):
add_event_on_current_span(
"insert mutations added",
dict(table=table, columns=columns),
self.__span,
)
self._mutations.append(Mutation(insert=_make_write_pb(table, columns, values)))

Expand All @@ -102,7 +101,6 @@ def update(self, table, columns, values):
add_event_on_current_span(
"update mutations added",
dict(table=table, columns=columns),
self.__span,
)

def insert_or_update(self, table, columns, values):
Expand All @@ -123,7 +121,6 @@ def insert_or_update(self, table, columns, values):
add_event_on_current_span(
"insert_or_update mutations added",
dict(table=table, columns=columns),
self.__span,
)

def replace(self, table, columns, values):
Expand All @@ -140,7 +137,8 @@ def replace(self, table, columns, values):
"""
self._mutations.append(Mutation(replace=_make_write_pb(table, columns, values)))
add_event_on_current_span(
"replace mutations added", dict(table=table, columns=columns), self.__span
"replace mutations added",
dict(table=table, columns=columns),
)

def delete(self, table, keyset):
Expand All @@ -155,7 +153,8 @@ def delete(self, table, keyset):
delete = Mutation.Delete(table=table, key_set=keyset._to_pb())
self._mutations.append(Mutation(delete=delete))
add_event_on_current_span(
"delete mutations added", dict(table=table), self.__span
"delete mutations added",
dict(table=table),
)


Expand Down Expand Up @@ -262,7 +261,7 @@ def __enter__(self):
observability_options = getattr(
self._session._database, "observability_options", None
)
self.__span = trace_call_end_lazily(
self.__discard_span = trace_call_end_lazily(
"CloudSpanner.Batch",
self._session,
observability_options=observability_options,
Expand All @@ -274,9 +273,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):
"""End ``with`` block."""
if exc_type is None:
self.commit()
if self.__span:
self.__span.end()
self.__span = None
if self.__discard_span:
self.__discard_span(exc_type, exc_val, exc_tb)
self.__discard_span = None


class MutationGroup(_BatchBase):
Expand Down
38 changes: 14 additions & 24 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1192,12 +1192,12 @@ def __init__(
self._request_options = request_options
self._max_commit_delay = max_commit_delay
self._exclude_txn_from_change_streams = exclude_txn_from_change_streams
self.__span = None
self.__discard_span = None

def __enter__(self):
"""Begin ``with`` block."""
observability_options = getattr(self._database, "observability_options", None)
self.__span = trace_call_end_lazily(
self.__discard_span = trace_call_end_lazily(
"CloudSpanner.Database.batch",
observability_options=observability_options,
)
Expand All @@ -1224,14 +1224,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):
extra={"commit_stats": self._batch.commit_stats},
)

if self.__span:
if not exc_type:
set_span_status_ok(self.__span)
else:
set_span_status_error(self.__span, exc_val)
self.__span.record_exception(exc_val)
self.__span.end()
self.__span = None
if self.__discard_span:
self.__discard_span(exc_type, exc_val, exc_tb)
self.__discard_span = None

self._database._pool.put(self._session)

Expand Down Expand Up @@ -1291,15 +1286,15 @@ def __init__(self, database, **kw):
self._database = database
self._session = None
self._kw = kw
self.__span = None
self.__discard_span = None

def __enter__(self):
"""Begin ``with`` block."""
observability_options = getattr(self._database, "observability_options", {})
attributes = None
if self._kw:
attributes = dict(multi_use=self._kw.get("multi_use", False))
self.__span = trace_call_end_lazily(
self.__discard_span = trace_call_end_lazily(
"CloudSpanner.Database.snapshot",
extra_attributes=attributes,
observability_options=observability_options,
Expand All @@ -1316,14 +1311,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self._session = self._database._pool._new_session()
self._session.create()

if self.__span:
if not exc_type:
set_span_status_ok(self.__span)
else:
set_span_status_error(self.__span, exc_val)
self.__span.record_exception(exc_val)
self.__span.end()
self.__span = None
if self.__discard_span:
self.__discard_span(exc_val)
self.__discard_span = None

self._database._pool.put(self._session)

Expand Down Expand Up @@ -1359,7 +1349,7 @@ def __init__(
self._exact_staleness = exact_staleness
observability_options = getattr(self._database, "observability_options", {})
self.__observability_options = observability_options
self.__span = trace_call_end_lazily(
self.__discard_span = trace_call_end_lazily(
"CloudSpanner.BatchSnapshot",
self._session,
observability_options=observability_options,
Expand Down Expand Up @@ -1829,9 +1819,9 @@ def close(self):
if self._session is not None:
self._session.delete()

if self.__span:
self.__span.end()
self.__span = None
if self.__discard_span:
self.__discard_span()
self.__discard_span = None


def _check_ddl_statements(value):
Expand Down
12 changes: 10 additions & 2 deletions google/cloud/spanner_v1/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,11 @@ def bind(self, database):
session_template=Session(creator_role=self.database_role),
)

while trace_call("Cloudspanner.FixedPool.BatchCreateSessions", self):
observability_options = getattr(self._database, "observability_options", None)
while trace_call(
"Cloudspanner.FixedPool.BatchCreateSessions",
observability_options=observability_options,
):
while not self._sessions.full():
resp = api.batch_create_sessions(
request=request,
Expand Down Expand Up @@ -424,7 +428,11 @@ def bind(self, database):
session_template=Session(creator_role=self.database_role),
)

while trace_call("Cloudspanner.PingingPool.BatchCreateSessions", self):
observability_options = getattr(self._database, "observability_options", None)
while trace_call(
"Cloudspanner.PingingPool.BatchCreateSessions",
observability_options=observability_options,
):
while created_session_count < self.size:
resp = api.batch_create_sessions(
request=request,
Expand Down
1 change: 1 addition & 0 deletions google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def exists(self):
)

observability_options = getattr(self._database, "observability_options", None)
print(f"obsopts {observability_options}")
with trace_call(
"CloudSpanner.GetSession", self, observability_options=observability_options
) as span:
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/spanner_v1/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def execute_update(
response = self._execute_request(
method,
request,
"CloudSpanner.execute_update",
f"CloudSpanner.{type(self).__name__}.execute_update",
self._session,
trace_attributes,
observability_options=observability_options,
Expand All @@ -440,7 +440,7 @@ def execute_update(
response = self._execute_request(
method,
request,
"CloudSpanner.execute_update",
f"CloudSpanner.{type(self).__name__}.execute_update",
self._session,
trace_attributes,
observability_options=observability_options,
Expand Down
2 changes: 1 addition & 1 deletion tests/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def assertSpanAttributes(
if HAS_OPENTELEMETRY_INSTALLED:
if not span:
span_list = self.get_finished_spans()
self.assertEqual(len(span_list), 1)
self.assertEqual(len(span_list) > 0, True)
span = span_list[0]

self.assertEqual(span.name, name)
Expand Down
6 changes: 6 additions & 0 deletions tests/unit/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,18 @@ def _commit_helper(
if return_commit_stats:
self.assertEqual(transaction.commit_stats.mutation_count, 4)

span_list = self.get_finished_spans()
txn_commit_span = span_list[-1]
# got_span_names = [span.name for span in span_list]
# want_span_names = ["CloudSpanner.Transaction.commi"]
# assert got_span_names == want_span_names
self.assertSpanAttributes(
"CloudSpanner.Transaction.commit",
attributes=dict(
TestTransaction.BASE_ATTRIBUTES,
num_mutations=len(transaction._mutations),
),
span=txn_commit_span,
)

def test_commit_no_mutations(self):
Expand Down

0 comments on commit cd4a8b1

Please sign in to comment.