Skip to content

Commit

Permalink
feat(observability): more descriptive and value adding spans
Browse files Browse the repository at this point in the history
This change adds more descriptive and value adding spans to
replace the generic CloudSpanner.ReadWriteTransaction.
With this change, we add new spans:
* CloudSpanner.Database.run_in_transaction
* CloudSpanner.execute_pdml
* CloudSpanner.execute_sql
* CloudSpanner.execute_update
  • Loading branch information
odeke-em committed Nov 18, 2024
1 parent 054a186 commit a1c45aa
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 26 deletions.
43 changes: 31 additions & 12 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
_metadata_with_prefix,
_metadata_with_leader_aware_routing,
)
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from google.cloud.spanner_v1.batch import Batch
from google.cloud.spanner_v1.batch import MutationGroups
from google.cloud.spanner_v1.keyset import KeySet
Expand Down Expand Up @@ -694,7 +695,7 @@ def execute_partitioned_dml(
)

def execute_pdml():
with SessionCheckout(self._pool) as session:
def do_execute_pdml(session):
txn = api.begin_transaction(
session=session.name, options=txn_options, metadata=metadata
)
Expand Down Expand Up @@ -726,6 +727,15 @@ def execute_pdml():

return result_set.stats.row_count_lower_bound

with SessionCheckout(self._pool) as session:
observability_options = getattr(self, "observability_options", None)
with trace_call(
"CloudSpanner.execute_pdml",
session,
observability_options=observability_options,
):
return do_execute_pdml(session)

return _retry_on_aborted(execute_pdml, DEFAULT_RETRY_BACKOFF)()

def session(self, labels=None, database_role=None):
Expand Down Expand Up @@ -880,17 +890,26 @@ def run_in_transaction(self, func, *args, **kw):
# Sanity check: Is there a transaction already running?
# If there is, then raise a red flag. Otherwise, mark that this one
# is running.
if getattr(self._local, "transaction_running", False):
raise RuntimeError("Spanner does not support nested transactions.")
self._local.transaction_running = True

# Check out a session and run the function in a transaction; once
# done, flip the sanity check bit back.
try:
with SessionCheckout(self._pool) as session:
return session.run_in_transaction(func, *args, **kw)
finally:
self._local.transaction_running = False
with SessionCheckout(self._pool) as session:
observability_options = getattr(self, "observability_options", None)
with trace_call(
"CloudSpanner.Database.run_in_transaction",
session,
observability_options=observability_options,
):
# Sanity check: Is there a transaction already running?
# If there is, then raise a red flag. Otherwise, mark that this one
# is running.
if getattr(self._local, "transaction_running", False):
raise RuntimeError("Spanner does not support nested transactions.")
self._local.transaction_running = True

# Check out a session and run the function in a transaction; once
# done, flip the sanity check bit back.
try:
return session.run_in_transaction(func, *args, **kw)
finally:
self._local.transaction_running = False

def restore(self, source):
"""Restore from a backup to this database.
Expand Down
37 changes: 26 additions & 11 deletions google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,23 +419,17 @@ def run_in_transaction(self, func, *args, **kw):
exclude_txn_from_change_streams = kw.pop(
"exclude_txn_from_change_streams", None
)
attempts = 0

while True:
if self._transaction is None:
txn = self.transaction()
txn.transaction_tag = transaction_tag
txn.exclude_txn_from_change_streams = exclude_txn_from_change_streams
else:
txn = self._transaction
observability_options = getattr(self._database, "observability_options", None)
attempts = 0

def __run_txn(txn, attempts):
try:
attempts += 1
return_value = func(txn, *args, **kw)
except Aborted as exc:
del self._transaction
_delay_until_retry(exc, deadline, attempts)
continue
return None, False
except GoogleAPICallError:
del self._transaction
raise
Expand All @@ -461,7 +455,28 @@ def run_in_transaction(self, func, *args, **kw):
"CommitStats: {}".format(txn.commit_stats),
extra={"commit_stats": txn.commit_stats},
)
return return_value
return return_value, True

while True:
if self._transaction is None:
with trace_call(
"CloudSpanner.ReadWriteTransaction", self, observability_options
):
txn = self.transaction()
txn.transaction_tag = transaction_tag
txn.exclude_txn_from_change_streams = (
exclude_txn_from_change_streams
)
return_value, completed = __run_txn(txn, attempts)
if completed:
return return_value
else:
txn = self._transaction
return_value, completed = __run_txn(txn, attempts)
if completed:
return return_value

attempts += 1


# Rational: this function factors out complex shared deadline / retry
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/spanner_v1/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ def _get_streamed_result_set(
iterator = _restart_on_unavailable(
restart,
request,
"CloudSpanner.ReadWriteTransaction",
"CloudSpanner.execute_sql",
self._session,
trace_attributes,
transaction=self,
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/spanner_v1/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def execute_update(
response = self._execute_request(
method,
request,
"CloudSpanner.ReadWriteTransaction",
"CloudSpanner.execute_update",
self._session,
trace_attributes,
observability_options=observability_options,
Expand All @@ -440,7 +440,7 @@ def execute_update(
response = self._execute_request(
method,
request,
"CloudSpanner.ReadWriteTransaction",
"CloudSpanner.execute_update",
self._session,
trace_attributes,
observability_options=observability_options,
Expand Down

0 comments on commit a1c45aa

Please sign in to comment.