Skip to content

Commit b20eee8

Browse files
committed
observability: PDML + some batch write spans
This change adds spans for Partitioned DML and making updates for Batch. Carved out from PR googleapis#1241.
1 parent 6352dd2 commit b20eee8

File tree

13 files changed

+260
-123
lines changed

13 files changed

+260
-123
lines changed

google/cloud/spanner_v1/batch.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
336336
)
337337
observability_options = getattr(database, "observability_options", None)
338338
with trace_call(
339-
"CloudSpanner.BatchWrite",
339+
"CloudSpanner.batch_write",
340340
self._session,
341341
trace_attributes,
342342
observability_options=observability_options,

google/cloud/spanner_v1/database.py

Lines changed: 106 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -699,7 +699,8 @@ def execute_partitioned_dml(
699699
)
700700

701701
def execute_pdml():
702-
with SessionCheckout(self._pool) as session:
702+
def do_execute_pdml(session, span):
703+
add_span_event(span, "Starting BeginTransaction")
703704
txn = api.begin_transaction(
704705
session=session.name, options=txn_options, metadata=metadata
705706
)
@@ -732,6 +733,13 @@ def execute_pdml():
732733

733734
return result_set.stats.row_count_lower_bound
734735

736+
with trace_call(
737+
"CloudSpanner.Database.execute_partitioned_pdml",
738+
observability_options=self.observability_options,
739+
) as span:
740+
with SessionCheckout(self._pool) as session:
741+
return do_execute_pdml(session, span)
742+
735743
return _retry_on_aborted(execute_pdml, DEFAULT_RETRY_BACKOFF)()
736744

737745
def session(self, labels=None, database_role=None):
@@ -1349,6 +1357,10 @@ def to_dict(self):
13491357
"transaction_id": snapshot._transaction_id,
13501358
}
13511359

1360+
@property
1361+
def observability_options(self):
1362+
return getattr(self._database, "observability_options", {})
1363+
13521364
def _get_session(self):
13531365
"""Create session as needed.
13541366
@@ -1468,27 +1480,32 @@ def generate_read_batches(
14681480
mappings of information used perform actual partitioned reads via
14691481
:meth:`process_read_batch`.
14701482
"""
1471-
partitions = self._get_snapshot().partition_read(
1472-
table=table,
1473-
columns=columns,
1474-
keyset=keyset,
1475-
index=index,
1476-
partition_size_bytes=partition_size_bytes,
1477-
max_partitions=max_partitions,
1478-
retry=retry,
1479-
timeout=timeout,
1480-
)
1483+
with trace_call(
1484+
f"CloudSpanner.{type(self).__name__}.generate_read_batches",
1485+
extra_attributes=dict(table=table, columns=columns),
1486+
observability_options=self.observability_options,
1487+
):
1488+
partitions = self._get_snapshot().partition_read(
1489+
table=table,
1490+
columns=columns,
1491+
keyset=keyset,
1492+
index=index,
1493+
partition_size_bytes=partition_size_bytes,
1494+
max_partitions=max_partitions,
1495+
retry=retry,
1496+
timeout=timeout,
1497+
)
14811498

1482-
read_info = {
1483-
"table": table,
1484-
"columns": columns,
1485-
"keyset": keyset._to_dict(),
1486-
"index": index,
1487-
"data_boost_enabled": data_boost_enabled,
1488-
"directed_read_options": directed_read_options,
1489-
}
1490-
for partition in partitions:
1491-
yield {"partition": partition, "read": read_info.copy()}
1499+
read_info = {
1500+
"table": table,
1501+
"columns": columns,
1502+
"keyset": keyset._to_dict(),
1503+
"index": index,
1504+
"data_boost_enabled": data_boost_enabled,
1505+
"directed_read_options": directed_read_options,
1506+
}
1507+
for partition in partitions:
1508+
yield {"partition": partition, "read": read_info.copy()}
14921509

14931510
def process_read_batch(
14941511
self,
@@ -1514,12 +1531,17 @@ def process_read_batch(
15141531
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
15151532
:returns: a result set instance which can be used to consume rows.
15161533
"""
1517-
kwargs = copy.deepcopy(batch["read"])
1518-
keyset_dict = kwargs.pop("keyset")
1519-
kwargs["keyset"] = KeySet._from_dict(keyset_dict)
1520-
return self._get_snapshot().read(
1521-
partition=batch["partition"], **kwargs, retry=retry, timeout=timeout
1522-
)
1534+
observability_options = self.observability_options or {}
1535+
with trace_call(
1536+
f"CloudSpanner.{type(self).__name__}.process_read_batch",
1537+
observability_options=observability_options,
1538+
):
1539+
kwargs = copy.deepcopy(batch["read"])
1540+
keyset_dict = kwargs.pop("keyset")
1541+
kwargs["keyset"] = KeySet._from_dict(keyset_dict)
1542+
return self._get_snapshot().read(
1543+
partition=batch["partition"], **kwargs, retry=retry, timeout=timeout
1544+
)
15231545

15241546
def generate_query_batches(
15251547
self,
@@ -1594,34 +1616,39 @@ def generate_query_batches(
15941616
mappings of information used perform actual partitioned reads via
15951617
:meth:`process_read_batch`.
15961618
"""
1597-
partitions = self._get_snapshot().partition_query(
1598-
sql=sql,
1599-
params=params,
1600-
param_types=param_types,
1601-
partition_size_bytes=partition_size_bytes,
1602-
max_partitions=max_partitions,
1603-
retry=retry,
1604-
timeout=timeout,
1605-
)
1619+
with trace_call(
1620+
f"CloudSpanner.{type(self).__name__}.generate_query_batches",
1621+
extra_attributes=dict(sql=sql),
1622+
observability_options=self.observability_options,
1623+
):
1624+
partitions = self._get_snapshot().partition_query(
1625+
sql=sql,
1626+
params=params,
1627+
param_types=param_types,
1628+
partition_size_bytes=partition_size_bytes,
1629+
max_partitions=max_partitions,
1630+
retry=retry,
1631+
timeout=timeout,
1632+
)
16061633

1607-
query_info = {
1608-
"sql": sql,
1609-
"data_boost_enabled": data_boost_enabled,
1610-
"directed_read_options": directed_read_options,
1611-
}
1612-
if params:
1613-
query_info["params"] = params
1614-
query_info["param_types"] = param_types
1615-
1616-
# Query-level options have higher precedence than client-level and
1617-
# environment-level options
1618-
default_query_options = self._database._instance._client._query_options
1619-
query_info["query_options"] = _merge_query_options(
1620-
default_query_options, query_options
1621-
)
1634+
query_info = {
1635+
"sql": sql,
1636+
"data_boost_enabled": data_boost_enabled,
1637+
"directed_read_options": directed_read_options,
1638+
}
1639+
if params:
1640+
query_info["params"] = params
1641+
query_info["param_types"] = param_types
1642+
1643+
# Query-level options have higher precedence than client-level and
1644+
# environment-level options
1645+
default_query_options = self._database._instance._client._query_options
1646+
query_info["query_options"] = _merge_query_options(
1647+
default_query_options, query_options
1648+
)
16221649

1623-
for partition in partitions:
1624-
yield {"partition": partition, "query": query_info}
1650+
for partition in partitions:
1651+
yield {"partition": partition, "query": query_info}
16251652

16261653
def process_query_batch(
16271654
self,
@@ -1646,9 +1673,16 @@ def process_query_batch(
16461673
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
16471674
:returns: a result set instance which can be used to consume rows.
16481675
"""
1649-
return self._get_snapshot().execute_sql(
1650-
partition=batch["partition"], **batch["query"], retry=retry, timeout=timeout
1651-
)
1676+
with trace_call(
1677+
f"CloudSpanner.{type(self).__name__}.process_query_batch",
1678+
observability_options=self.observability_options,
1679+
):
1680+
return self._get_snapshot().execute_sql(
1681+
partition=batch["partition"],
1682+
**batch["query"],
1683+
retry=retry,
1684+
timeout=timeout,
1685+
)
16521686

16531687
def run_partitioned_query(
16541688
self,
@@ -1703,18 +1737,23 @@ def run_partitioned_query(
17031737
:rtype: :class:`~google.cloud.spanner_v1.merged_result_set.MergedResultSet`
17041738
:returns: a result set instance which can be used to consume rows.
17051739
"""
1706-
partitions = list(
1707-
self.generate_query_batches(
1708-
sql,
1709-
params,
1710-
param_types,
1711-
partition_size_bytes,
1712-
max_partitions,
1713-
query_options,
1714-
data_boost_enabled,
1740+
with trace_call(
1741+
f"CloudSpanner.${type(self).__name__}.run_partitioned_query",
1742+
extra_attributes=dict(sql=sql),
1743+
observability_options=self.observability_options,
1744+
):
1745+
partitions = list(
1746+
self.generate_query_batches(
1747+
sql,
1748+
params,
1749+
param_types,
1750+
partition_size_bytes,
1751+
max_partitions,
1752+
query_options,
1753+
data_boost_enabled,
1754+
)
17151755
)
1716-
)
1717-
return MergedResultSet(self, partitions, 0)
1756+
return MergedResultSet(self, partitions, 0)
17181757

17191758
def process(self, batch):
17201759
"""Process a single, partitioned query or read.

google/cloud/spanner_v1/merged_result_set.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,16 @@ def __init__(self, batch_snapshot, partition_id, merged_result_set):
3737
self._queue: Queue[PartitionExecutorResult] = merged_result_set._queue
3838

3939
def run(self):
40+
observability_options = getattr(
41+
self._batch_snapshot, "observability_options", {}
42+
)
43+
with trace_call(
44+
"CloudSpanner.PartitionExecutor.run",
45+
observability_options=observability_options,
46+
):
47+
return self.__run()
48+
49+
def __run(self):
4050
results = None
4151
try:
4252
results = self._batch_snapshot.process_query_batch(self._partition_id)

google/cloud/spanner_v1/pool.py

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -523,12 +523,11 @@ def bind(self, database):
523523
metadata.append(
524524
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
525525
)
526-
created_session_count = 0
527526
self._database_role = self._database_role or self._database.database_role
528527

529528
request = BatchCreateSessionsRequest(
530529
database=database.name,
531-
session_count=self.size - created_session_count,
530+
session_count=self.size,
532531
session_template=Session(creator_role=self.database_role),
533532
)
534533

@@ -549,38 +548,28 @@ def bind(self, database):
549548
span_event_attributes,
550549
)
551550

552-
if created_session_count >= self.size:
553-
add_span_event(
554-
current_span,
555-
"Created no new sessions as sessionPool is full",
556-
span_event_attributes,
557-
)
558-
return
559-
560-
add_span_event(
561-
current_span,
562-
f"Creating {request.session_count} sessions",
563-
span_event_attributes,
564-
)
565-
566551
observability_options = getattr(self._database, "observability_options", None)
567552
with trace_call(
568553
"CloudSpanner.PingingPool.BatchCreateSessions",
569554
observability_options=observability_options,
570555
) as span:
571556
returned_session_count = 0
572-
while created_session_count < self.size:
557+
while returned_session_count < self.size:
573558
resp = api.batch_create_sessions(
574559
request=request,
575560
metadata=metadata,
576561
)
562+
563+
add_span_event(
564+
span,
565+
f"Created {len(resp.session)} sessions",
566+
)
567+
577568
for session_pb in resp.session:
578569
session = self._new_session()
570+
returned_session_count += 1
579571
session._session_id = session_pb.name.split("/")[-1]
580572
self.put(session)
581-
returned_session_count += 1
582-
583-
created_session_count += len(resp.session)
584573

585574
add_span_event(
586575
span,

google/cloud/spanner_v1/session.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,7 @@ def run_in_transaction(self, func, *args, **kw):
470470
) as span:
471471
while True:
472472
if self._transaction is None:
473+
add_span_event(span, "Creating Transaction")
473474
txn = self.transaction()
474475
txn.transaction_tag = transaction_tag
475476
txn.exclude_txn_from_change_streams = (

google/cloud/spanner_v1/snapshot.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -675,10 +675,13 @@ def partition_read(
675675
)
676676

677677
trace_attributes = {"table_id": table, "columns": columns}
678+
can_include_index = (index != "") and (index is not None)
679+
if can_include_index:
680+
trace_attributes["index"] = index
681+
678682
with trace_call(
679683
f"CloudSpanner.{type(self).__name__}.partition_read",
680-
self._session,
681-
trace_attributes,
684+
extra_attributes=trace_attributes,
682685
observability_options=getattr(database, "observability_options", None),
683686
):
684687
method = functools.partial(
@@ -779,7 +782,7 @@ def partition_query(
779782

780783
trace_attributes = {"db.statement": sql}
781784
with trace_call(
782-
"CloudSpanner.PartitionReadWriteTransaction",
785+
f"CloudSpanner.{type(self).__name__}.partition_query",
783786
self._session,
784787
trace_attributes,
785788
observability_options=getattr(database, "observability_options", None),

tests/_helpers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def assertSpanAttributes(
8686
):
8787
if HAS_OPENTELEMETRY_INSTALLED:
8888
if not span:
89-
span_list = self.ot_exporter.get_finished_spans()
89+
span_list = self.get_finished_spans()
9090
self.assertEqual(len(span_list) > 0, True)
9191
span = span_list[0]
9292

tests/system/test_observability_options.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
not HAS_OTEL_INSTALLED, reason="OpenTelemetry is necessary to test traces."
3838
)
3939
@pytest.mark.skipif(
40-
not _helpers.USE_EMULATOR, reason="mulator is necessary to test traces."
40+
not _helpers.USE_EMULATOR, reason="Emulator is necessary to test traces."
4141
)
4242
def test_observability_options_propagation():
4343
PROJECT = _helpers.EMULATOR_PROJECT
@@ -108,6 +108,7 @@ def test_propagation(enable_extended_tracing):
108108
wantNames = [
109109
"CloudSpanner.CreateSession",
110110
"CloudSpanner.Snapshot.execute_streaming_sql",
111+
"CloudSpanner.Database.snapshot",
111112
]
112113
assert gotNames == wantNames
113114

0 commit comments

Comments
 (0)