Skip to content

Commit

Permalink
all: update review comments + show type for BeginTransaction
Browse files Browse the repository at this point in the history
  • Loading branch information
odeke-em committed Dec 10, 2024
1 parent f437955 commit f140c41
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 195 deletions.
7 changes: 3 additions & 4 deletions google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,6 @@ def run_in_transaction(self, func, *args, **kw):
) as span:
while True:
if self._transaction is None:
add_span_event(span, "Creating Transaction")
txn = self.transaction()
txn.transaction_tag = transaction_tag
txn.exclude_txn_from_change_streams = (
Expand Down Expand Up @@ -499,7 +498,7 @@ def run_in_transaction(self, func, *args, **kw):
record_span_exception_and_status(span, exc)
add_span_event(
span,
"Transaction was aborted, retrying afresh",
"Transaction was aborted in user operation, retrying",
attributes,
)

Expand All @@ -516,7 +515,7 @@ def run_in_transaction(self, func, *args, **kw):
except Exception:
add_span_event(
span,
"Invoking Transaction.rollback(), not retrying",
"User operation failed. Invoking Transaction.rollback(), not retrying",
span_attributes,
)
txn.rollback()
Expand All @@ -537,7 +536,7 @@ def run_in_transaction(self, func, *args, **kw):
record_span_exception_and_status(span, exc)
add_span_event(
span,
"Transaction was aborted, retrying afresh",
"Transaction got aborted during commit, retrying afresh",
attributes,
)

Expand Down
2 changes: 1 addition & 1 deletion google/cloud/spanner_v1/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ def begin(self):
)
txn_selector = self._make_txn_selector()
with trace_call(
"CloudSpanner.BeginTransaction",
f"CloudSpanner.{type(self).__name__}.begin",
self._session,
observability_options=getattr(database, "observability_options", None),
):
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/spanner_v1/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def begin(self):
)
observability_options = getattr(database, "observability_options", None)
with trace_call(
"CloudSpanner.BeginTransaction",
f"CloudSpanner.{type(self).__name__}.begin",
self._session,
observability_options=observability_options,
) as span:
Expand Down
1 change: 0 additions & 1 deletion tests/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ def tearDown(self):
def assertNoSpans(self):
if HAS_OPENTELEMETRY_INSTALLED:
span_list = self.get_finished_spans()
print("got_spans", [span.name for span in span_list])
self.assertEqual(len(span_list), 0)

def assertSpanAttributes(
Expand Down
18 changes: 18 additions & 0 deletions tests/system/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,21 @@ def cleanup_old_instances(spanner_client):

def unique_id(prefix, separator="-"):
return f"{prefix}{system.unique_resource_id(separator)}"


class FauxCall:
def __init__(self, code, details="FauxCall"):
self._code = code
self._details = details

def initial_metadata(self):
return {}

def trailing_metadata(self):
return {}

def code(self):
return self._code

def details(self):
return self._details
155 changes: 155 additions & 0 deletions tests/system/test_observability_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,161 @@ def test_propagation(enable_extended_tracing):
test_propagation(False)


@pytest.mark.skipif(
not _helpers.USE_EMULATOR,
reason="Emulator needed to run this tests",
)
@pytest.mark.skipif(
not HAS_OTEL_INSTALLED,
reason="Tracing requires OpenTelemetry",
)
def test_transaction_abort_then_retry_spans():
from google.auth.credentials import AnonymousCredentials
from google.api_core.exceptions import Aborted
from google.rpc import code_pb2
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)
from opentelemetry.trace.status import StatusCode
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
from opentelemetry import trace

PROJECT = _helpers.EMULATOR_PROJECT
CONFIGURATION_NAME = "config-name"
INSTANCE_ID = _helpers.INSTANCE_ID
DISPLAY_NAME = "display-name"
DATABASE_ID = _helpers.unique_id("temp_db")
NODE_COUNT = 5
LABELS = {"test": "true"}

counters = dict(aborted=0)
already_aborted = False

def select_in_txn(txn):
from google.rpc import error_details_pb2

results = txn.execute_sql("SELECT 1")
for row in results:
_ = row

if counters["aborted"] == 0:
counters["aborted"] = 1
raise Aborted(
"Thrown from ClientInterceptor for testing",
errors=[_helpers.FauxCall(code_pb2.ABORTED)],
)

tracer_provider = TracerProvider(sampler=ALWAYS_ON)
trace_exporter = InMemorySpanExporter()
tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter))
observability_options = dict(
tracer_provider=tracer_provider,
enable_extended_tracing=True,
)

client = Client(
project=PROJECT,
observability_options=observability_options,
credentials=AnonymousCredentials(),
)

instance = client.instance(
INSTANCE_ID,
CONFIGURATION_NAME,
display_name=DISPLAY_NAME,
node_count=NODE_COUNT,
labels=LABELS,
)

try:
instance.create()
except Exception:
pass

db = instance.database(DATABASE_ID)
try:
db.create()
except Exception:
pass

db.run_in_transaction(select_in_txn)

span_list = trace_exporter.get_finished_spans()
got_span_names = [span.name for span in span_list]
want_span_names = [
"CloudSpanner.CreateSession",
"CloudSpanner.Transaction.execute_streaming_sql",
"CloudSpanner.Transaction.execute_streaming_sql",
"CloudSpanner.Transaction.commit",
"CloudSpanner.Session.run_in_transaction",
"CloudSpanner.Database.run_in_transaction",
]

assert got_span_names == want_span_names

got_events = []
got_statuses = []

# Some event attributes are noisy/highly ephemeral
# and can't be directly compared against.
imprecise_event_attributes = ["exception.stacktrace", "delay_seconds"]
for span in span_list:
got_statuses.append(
(span.name, span.status.status_code, span.status.description)
)
for event in span.events:
evt_attributes = event.attributes.copy()
for attr_name in imprecise_event_attributes:
if attr_name in evt_attributes:
evt_attributes[attr_name] = "EPHEMERAL"

got_events.append((event.name, evt_attributes))

# Check for the series of events
want_events = [
("Starting Commit", {}),
("Commit Done", {}),
("Using Transaction", {"attempt": 1}),
(
"exception",
{
"exception.type": "google.api_core.exceptions.Aborted",
"exception.message": "409 Thrown from ClientInterceptor for testing",
"exception.stacktrace": "EPHEMERAL",
"exception.escaped": "False",
},
),
(
"Transaction was aborted in user operation, retrying",
{"delay_seconds": "EPHEMERAL", "attempt": 1},
),
("Using Transaction", {"attempt": 2}),
("Acquiring session", {"kind": "BurstyPool"}),
("Waiting for a session to become available", {"kind": "BurstyPool"}),
("No sessions available in pool. Creating session", {"kind": "BurstyPool"}),
("Creating Session", {}),
]
assert got_events == want_events

# Check for the statues.
codes = StatusCode
want_statuses = [
("CloudSpanner.CreateSession", codes.OK, None),
("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None),
("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None),
("CloudSpanner.Transaction.commit", codes.OK, None),
(
"CloudSpanner.Session.run_in_transaction",
codes.ERROR,
"409 Thrown from ClientInterceptor for testing",
),
("CloudSpanner.Database.run_in_transaction", codes.OK, None),
]
assert got_statuses == want_statuses


def _make_credentials():
from google.auth.credentials import AnonymousCredentials

Expand Down
Loading

0 comments on commit f140c41

Please sign in to comment.