Skip to content

Commit ea2551a

Browse files
committed
Updates from code review
1 parent 8581115 commit ea2551a

File tree

5 files changed

+39
-48
lines changed

5 files changed

+39
-48
lines changed

google/cloud/spanner_v1/database.py

Lines changed: 33 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -699,46 +699,43 @@ def execute_partitioned_dml(
699699
)
700700

701701
def execute_pdml():
702-
def do_execute_pdml(session, span):
703-
add_span_event(span, "Starting BeginTransaction")
704-
txn = api.begin_transaction(
705-
session=session.name, options=txn_options, metadata=metadata
706-
)
707-
708-
txn_selector = TransactionSelector(id=txn.id)
709-
710-
request = ExecuteSqlRequest(
711-
session=session.name,
712-
sql=dml,
713-
params=params_pb,
714-
param_types=param_types,
715-
query_options=query_options,
716-
request_options=request_options,
717-
)
718-
method = functools.partial(
719-
api.execute_streaming_sql,
720-
metadata=metadata,
721-
)
722-
723-
iterator = _restart_on_unavailable(
724-
method=method,
725-
trace_name="CloudSpanner.ExecuteStreamingSql",
726-
request=request,
727-
transaction_selector=txn_selector,
728-
observability_options=self.observability_options,
729-
)
730-
731-
result_set = StreamedResultSet(iterator)
732-
list(result_set) # consume all partials
733-
734-
return result_set.stats.row_count_lower_bound
735-
736702
with trace_call(
737703
"CloudSpanner.Database.execute_partitioned_pdml",
738704
observability_options=self.observability_options,
739705
) as span:
740706
with SessionCheckout(self._pool) as session:
741-
return do_execute_pdml(session, span)
707+
add_span_event(span, "Starting BeginTransaction")
708+
txn = api.begin_transaction(
709+
session=session.name, options=txn_options, metadata=metadata
710+
)
711+
712+
txn_selector = TransactionSelector(id=txn.id)
713+
714+
request = ExecuteSqlRequest(
715+
session=session.name,
716+
sql=dml,
717+
params=params_pb,
718+
param_types=param_types,
719+
query_options=query_options,
720+
request_options=request_options,
721+
)
722+
method = functools.partial(
723+
api.execute_streaming_sql,
724+
metadata=metadata,
725+
)
726+
727+
iterator = _restart_on_unavailable(
728+
method=method,
729+
trace_name="CloudSpanner.ExecuteStreamingSql",
730+
request=request,
731+
transaction_selector=txn_selector,
732+
observability_options=self.observability_options,
733+
)
734+
735+
result_set = StreamedResultSet(iterator)
736+
list(result_set) # consume all partials
737+
738+
return result_set.stats.row_count_lower_bound
742739

743740
return _retry_on_aborted(execute_pdml, DEFAULT_RETRY_BACKOFF)()
744741

@@ -1531,7 +1528,7 @@ def process_read_batch(
15311528
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
15321529
:returns: a result set instance which can be used to consume rows.
15331530
"""
1534-
observability_options = self.observability_options or {}
1531+
observability_options = self.observability_options
15351532
with trace_call(
15361533
f"CloudSpanner.{type(self).__name__}.process_read_batch",
15371534
observability_options=observability_options,

google/cloud/spanner_v1/session.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,6 @@ def run_in_transaction(self, func, *args, **kw):
470470
) as span:
471471
while True:
472472
if self._transaction is None:
473-
add_span_event(span, "Creating Transaction")
474473
txn = self.transaction()
475474
txn.transaction_tag = transaction_tag
476475
txn.exclude_txn_from_change_streams = (

google/cloud/spanner_v1/snapshot.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,7 @@ def partition_read(
686686

687687
with trace_call(
688688
f"CloudSpanner.{type(self).__name__}.partition_read",
689+
self._session,
689690
extra_attributes=trace_attributes,
690691
observability_options=getattr(database, "observability_options", None),
691692
):

tests/system/test_observability_options.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,12 +224,10 @@ def select_in_txn(txn):
224224
("Waiting for a session to become available", {"kind": "BurstyPool"}),
225225
("No sessions available in pool. Creating session", {"kind": "BurstyPool"}),
226226
("Creating Session", {}),
227-
("Creating Transaction", {}),
228227
(
229228
"Transaction was aborted in user operation, retrying",
230229
{"delay_seconds": "EPHEMERAL", "cause": "EPHEMERAL", "attempt": 1},
231230
),
232-
("Creating Transaction", {}),
233231
("Starting Commit", {}),
234232
("Commit Done", {}),
235233
]
@@ -283,7 +281,7 @@ def finished_spans_statuses(trace_exporter):
283281
not HAS_OTEL_INSTALLED,
284282
reason="Tracing requires OpenTelemetry",
285283
)
286-
def test_database_partitioned():
284+
def test_database_partitioned_error():
287285
from opentelemetry.trace.status import StatusCode
288286

289287
db, trace_exporter = create_db_trace_exporter()

tests/unit/test_snapshot.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,15 +1233,12 @@ def test_partition_read_other_error(self):
12331233
if not HAS_OPENTELEMETRY_INSTALLED:
12341234
return
12351235

1236-
want_span_attributes = dict(
1237-
BASE_ATTRIBUTES,
1238-
table_id=TABLE_NAME,
1239-
columns=tuple(COLUMNS),
1240-
)
12411236
self.assertSpanAttributes(
12421237
"CloudSpanner._Derived.partition_read",
12431238
status=StatusCode.ERROR,
1244-
attributes=want_span_attributes,
1239+
attributes=dict(
1240+
BASE_ATTRIBUTES, table_id=TABLE_NAME, columns=tuple(COLUMNS)
1241+
),
12451242
)
12461243

12471244
def test_partition_read_w_retry(self):
@@ -1379,11 +1376,10 @@ def _partition_query_helper(
13791376
timeout=timeout,
13801377
)
13811378

1382-
attributes = dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY_WITH_PARAM})
13831379
self.assertSpanAttributes(
13841380
"CloudSpanner._Derived.partition_query",
13851381
status=StatusCode.OK,
1386-
attributes=attributes,
1382+
attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY_WITH_PARAM}),
13871383
)
13881384

13891385
def test_partition_query_other_error(self):

0 commit comments

Comments
 (0)