Skip to content

Commit 048e4cc

Browse files
committed
observability: PDML + some batch write spans
This change adds spans for Partitioned DML and making updates for Batch. Carved out from PR googleapis#1241.
1 parent 0887eb4 commit 048e4cc

File tree

13 files changed

+260
-123
lines changed

13 files changed

+260
-123
lines changed

google/cloud/spanner_v1/batch.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
344344
)
345345
observability_options = getattr(database, "observability_options", None)
346346
with trace_call(
347-
"CloudSpanner.BatchWrite",
347+
"CloudSpanner.batch_write",
348348
self._session,
349349
trace_attributes,
350350
observability_options=observability_options,

google/cloud/spanner_v1/database.py

Lines changed: 106 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -699,7 +699,8 @@ def execute_partitioned_dml(
699699
)
700700

701701
def execute_pdml():
702-
with SessionCheckout(self._pool) as session:
702+
def do_execute_pdml(session, span):
703+
add_span_event(span, "Starting BeginTransaction")
703704
txn = api.begin_transaction(
704705
session=session.name, options=txn_options, metadata=metadata
705706
)
@@ -732,6 +733,13 @@ def execute_pdml():
732733

733734
return result_set.stats.row_count_lower_bound
734735

736+
with trace_call(
737+
"CloudSpanner.Database.execute_partitioned_pdml",
738+
observability_options=self.observability_options,
739+
) as span:
740+
with SessionCheckout(self._pool) as session:
741+
return do_execute_pdml(session, span)
742+
735743
return _retry_on_aborted(execute_pdml, DEFAULT_RETRY_BACKOFF)()
736744

737745
def session(self, labels=None, database_role=None):
@@ -1357,6 +1365,10 @@ def to_dict(self):
13571365
"transaction_id": snapshot._transaction_id,
13581366
}
13591367

1368+
@property
1369+
def observability_options(self):
1370+
return getattr(self._database, "observability_options", {})
1371+
13601372
def _get_session(self):
13611373
"""Create session as needed.
13621374
@@ -1476,27 +1488,32 @@ def generate_read_batches(
14761488
mappings of information used perform actual partitioned reads via
14771489
:meth:`process_read_batch`.
14781490
"""
1479-
partitions = self._get_snapshot().partition_read(
1480-
table=table,
1481-
columns=columns,
1482-
keyset=keyset,
1483-
index=index,
1484-
partition_size_bytes=partition_size_bytes,
1485-
max_partitions=max_partitions,
1486-
retry=retry,
1487-
timeout=timeout,
1488-
)
1491+
with trace_call(
1492+
f"CloudSpanner.{type(self).__name__}.generate_read_batches",
1493+
extra_attributes=dict(table=table, columns=columns),
1494+
observability_options=self.observability_options,
1495+
):
1496+
partitions = self._get_snapshot().partition_read(
1497+
table=table,
1498+
columns=columns,
1499+
keyset=keyset,
1500+
index=index,
1501+
partition_size_bytes=partition_size_bytes,
1502+
max_partitions=max_partitions,
1503+
retry=retry,
1504+
timeout=timeout,
1505+
)
14891506

1490-
read_info = {
1491-
"table": table,
1492-
"columns": columns,
1493-
"keyset": keyset._to_dict(),
1494-
"index": index,
1495-
"data_boost_enabled": data_boost_enabled,
1496-
"directed_read_options": directed_read_options,
1497-
}
1498-
for partition in partitions:
1499-
yield {"partition": partition, "read": read_info.copy()}
1507+
read_info = {
1508+
"table": table,
1509+
"columns": columns,
1510+
"keyset": keyset._to_dict(),
1511+
"index": index,
1512+
"data_boost_enabled": data_boost_enabled,
1513+
"directed_read_options": directed_read_options,
1514+
}
1515+
for partition in partitions:
1516+
yield {"partition": partition, "read": read_info.copy()}
15001517

15011518
def process_read_batch(
15021519
self,
@@ -1522,12 +1539,17 @@ def process_read_batch(
15221539
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
15231540
:returns: a result set instance which can be used to consume rows.
15241541
"""
1525-
kwargs = copy.deepcopy(batch["read"])
1526-
keyset_dict = kwargs.pop("keyset")
1527-
kwargs["keyset"] = KeySet._from_dict(keyset_dict)
1528-
return self._get_snapshot().read(
1529-
partition=batch["partition"], **kwargs, retry=retry, timeout=timeout
1530-
)
1542+
observability_options = self.observability_options or {}
1543+
with trace_call(
1544+
f"CloudSpanner.{type(self).__name__}.process_read_batch",
1545+
observability_options=observability_options,
1546+
):
1547+
kwargs = copy.deepcopy(batch["read"])
1548+
keyset_dict = kwargs.pop("keyset")
1549+
kwargs["keyset"] = KeySet._from_dict(keyset_dict)
1550+
return self._get_snapshot().read(
1551+
partition=batch["partition"], **kwargs, retry=retry, timeout=timeout
1552+
)
15311553

15321554
def generate_query_batches(
15331555
self,
@@ -1602,34 +1624,39 @@ def generate_query_batches(
16021624
mappings of information used perform actual partitioned reads via
16031625
:meth:`process_read_batch`.
16041626
"""
1605-
partitions = self._get_snapshot().partition_query(
1606-
sql=sql,
1607-
params=params,
1608-
param_types=param_types,
1609-
partition_size_bytes=partition_size_bytes,
1610-
max_partitions=max_partitions,
1611-
retry=retry,
1612-
timeout=timeout,
1613-
)
1627+
with trace_call(
1628+
f"CloudSpanner.{type(self).__name__}.generate_query_batches",
1629+
extra_attributes=dict(sql=sql),
1630+
observability_options=self.observability_options,
1631+
):
1632+
partitions = self._get_snapshot().partition_query(
1633+
sql=sql,
1634+
params=params,
1635+
param_types=param_types,
1636+
partition_size_bytes=partition_size_bytes,
1637+
max_partitions=max_partitions,
1638+
retry=retry,
1639+
timeout=timeout,
1640+
)
16141641

1615-
query_info = {
1616-
"sql": sql,
1617-
"data_boost_enabled": data_boost_enabled,
1618-
"directed_read_options": directed_read_options,
1619-
}
1620-
if params:
1621-
query_info["params"] = params
1622-
query_info["param_types"] = param_types
1623-
1624-
# Query-level options have higher precedence than client-level and
1625-
# environment-level options
1626-
default_query_options = self._database._instance._client._query_options
1627-
query_info["query_options"] = _merge_query_options(
1628-
default_query_options, query_options
1629-
)
1642+
query_info = {
1643+
"sql": sql,
1644+
"data_boost_enabled": data_boost_enabled,
1645+
"directed_read_options": directed_read_options,
1646+
}
1647+
if params:
1648+
query_info["params"] = params
1649+
query_info["param_types"] = param_types
1650+
1651+
# Query-level options have higher precedence than client-level and
1652+
# environment-level options
1653+
default_query_options = self._database._instance._client._query_options
1654+
query_info["query_options"] = _merge_query_options(
1655+
default_query_options, query_options
1656+
)
16301657

1631-
for partition in partitions:
1632-
yield {"partition": partition, "query": query_info}
1658+
for partition in partitions:
1659+
yield {"partition": partition, "query": query_info}
16331660

16341661
def process_query_batch(
16351662
self,
@@ -1654,9 +1681,16 @@ def process_query_batch(
16541681
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
16551682
:returns: a result set instance which can be used to consume rows.
16561683
"""
1657-
return self._get_snapshot().execute_sql(
1658-
partition=batch["partition"], **batch["query"], retry=retry, timeout=timeout
1659-
)
1684+
with trace_call(
1685+
f"CloudSpanner.{type(self).__name__}.process_query_batch",
1686+
observability_options=self.observability_options,
1687+
):
1688+
return self._get_snapshot().execute_sql(
1689+
partition=batch["partition"],
1690+
**batch["query"],
1691+
retry=retry,
1692+
timeout=timeout,
1693+
)
16601694

16611695
def run_partitioned_query(
16621696
self,
@@ -1711,18 +1745,23 @@ def run_partitioned_query(
17111745
:rtype: :class:`~google.cloud.spanner_v1.merged_result_set.MergedResultSet`
17121746
:returns: a result set instance which can be used to consume rows.
17131747
"""
1714-
partitions = list(
1715-
self.generate_query_batches(
1716-
sql,
1717-
params,
1718-
param_types,
1719-
partition_size_bytes,
1720-
max_partitions,
1721-
query_options,
1722-
data_boost_enabled,
1748+
with trace_call(
1749+
f"CloudSpanner.${type(self).__name__}.run_partitioned_query",
1750+
extra_attributes=dict(sql=sql),
1751+
observability_options=self.observability_options,
1752+
):
1753+
partitions = list(
1754+
self.generate_query_batches(
1755+
sql,
1756+
params,
1757+
param_types,
1758+
partition_size_bytes,
1759+
max_partitions,
1760+
query_options,
1761+
data_boost_enabled,
1762+
)
17231763
)
1724-
)
1725-
return MergedResultSet(self, partitions, 0)
1764+
return MergedResultSet(self, partitions, 0)
17261765

17271766
def process(self, batch):
17281767
"""Process a single, partitioned query or read.

google/cloud/spanner_v1/merged_result_set.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,16 @@ def __init__(self, batch_snapshot, partition_id, merged_result_set):
3737
self._queue: Queue[PartitionExecutorResult] = merged_result_set._queue
3838

3939
def run(self):
40+
observability_options = getattr(
41+
self._batch_snapshot, "observability_options", {}
42+
)
43+
with trace_call(
44+
"CloudSpanner.PartitionExecutor.run",
45+
observability_options=observability_options,
46+
):
47+
return self.__run()
48+
49+
def __run(self):
4050
results = None
4151
try:
4252
results = self._batch_snapshot.process_query_batch(self._partition_id)

google/cloud/spanner_v1/pool.py

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -523,12 +523,11 @@ def bind(self, database):
523523
metadata.append(
524524
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
525525
)
526-
created_session_count = 0
527526
self._database_role = self._database_role or self._database.database_role
528527

529528
request = BatchCreateSessionsRequest(
530529
database=database.name,
531-
session_count=self.size - created_session_count,
530+
session_count=self.size,
532531
session_template=Session(creator_role=self.database_role),
533532
)
534533

@@ -549,38 +548,28 @@ def bind(self, database):
549548
span_event_attributes,
550549
)
551550

552-
if created_session_count >= self.size:
553-
add_span_event(
554-
current_span,
555-
"Created no new sessions as sessionPool is full",
556-
span_event_attributes,
557-
)
558-
return
559-
560-
add_span_event(
561-
current_span,
562-
f"Creating {request.session_count} sessions",
563-
span_event_attributes,
564-
)
565-
566551
observability_options = getattr(self._database, "observability_options", None)
567552
with trace_call(
568553
"CloudSpanner.PingingPool.BatchCreateSessions",
569554
observability_options=observability_options,
570555
) as span:
571556
returned_session_count = 0
572-
while created_session_count < self.size:
557+
while returned_session_count < self.size:
573558
resp = api.batch_create_sessions(
574559
request=request,
575560
metadata=metadata,
576561
)
562+
563+
add_span_event(
564+
span,
565+
f"Created {len(resp.session)} sessions",
566+
)
567+
577568
for session_pb in resp.session:
578569
session = self._new_session()
570+
returned_session_count += 1
579571
session._session_id = session_pb.name.split("/")[-1]
580572
self.put(session)
581-
returned_session_count += 1
582-
583-
created_session_count += len(resp.session)
584573

585574
add_span_event(
586575
span,

google/cloud/spanner_v1/session.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,7 @@ def run_in_transaction(self, func, *args, **kw):
470470
) as span:
471471
while True:
472472
if self._transaction is None:
473+
add_span_event(span, "Creating Transaction")
473474
txn = self.transaction()
474475
txn.transaction_tag = transaction_tag
475476
txn.exclude_txn_from_change_streams = (

google/cloud/spanner_v1/snapshot.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -680,10 +680,13 @@ def partition_read(
680680
)
681681

682682
trace_attributes = {"table_id": table, "columns": columns}
683+
can_include_index = (index != "") and (index is not None)
684+
if can_include_index:
685+
trace_attributes["index"] = index
686+
683687
with trace_call(
684688
f"CloudSpanner.{type(self).__name__}.partition_read",
685-
self._session,
686-
trace_attributes,
689+
extra_attributes=trace_attributes,
687690
observability_options=getattr(database, "observability_options", None),
688691
):
689692
method = functools.partial(
@@ -784,7 +787,7 @@ def partition_query(
784787

785788
trace_attributes = {"db.statement": sql}
786789
with trace_call(
787-
"CloudSpanner.PartitionReadWriteTransaction",
790+
f"CloudSpanner.{type(self).__name__}.partition_query",
788791
self._session,
789792
trace_attributes,
790793
observability_options=getattr(database, "observability_options", None),

tests/_helpers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def assertSpanAttributes(
8686
):
8787
if HAS_OPENTELEMETRY_INSTALLED:
8888
if not span:
89-
span_list = self.ot_exporter.get_finished_spans()
89+
span_list = self.get_finished_spans()
9090
self.assertEqual(len(span_list) > 0, True)
9191
span = span_list[0]
9292

tests/system/test_observability_options.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
not HAS_OTEL_INSTALLED, reason="OpenTelemetry is necessary to test traces."
3838
)
3939
@pytest.mark.skipif(
40-
not _helpers.USE_EMULATOR, reason="mulator is necessary to test traces."
40+
not _helpers.USE_EMULATOR, reason="Emulator is necessary to test traces."
4141
)
4242
def test_observability_options_propagation():
4343
PROJECT = _helpers.EMULATOR_PROJECT
@@ -108,6 +108,7 @@ def test_propagation(enable_extended_tracing):
108108
wantNames = [
109109
"CloudSpanner.CreateSession",
110110
"CloudSpanner.Snapshot.execute_streaming_sql",
111+
"CloudSpanner.Database.snapshot",
111112
]
112113
assert gotNames == wantNames
113114

0 commit comments

Comments
 (0)