@@ -699,7 +699,8 @@ def execute_partitioned_dml(
699
699
)
700
700
701
701
def execute_pdml ():
702
- with SessionCheckout (self ._pool ) as session :
702
+ def do_execute_pdml (session , span ):
703
+ add_span_event (span , "Starting BeginTransaction" )
703
704
txn = api .begin_transaction (
704
705
session = session .name , options = txn_options , metadata = metadata
705
706
)
@@ -732,6 +733,13 @@ def execute_pdml():
732
733
733
734
return result_set .stats .row_count_lower_bound
734
735
736
+ with trace_call (
737
+ "CloudSpanner.Database.execute_partitioned_pdml" ,
738
+ observability_options = self .observability_options ,
739
+ ) as span :
740
+ with SessionCheckout (self ._pool ) as session :
741
+ return do_execute_pdml (session , span )
742
+
735
743
return _retry_on_aborted (execute_pdml , DEFAULT_RETRY_BACKOFF )()
736
744
737
745
def session (self , labels = None , database_role = None ):
@@ -1349,6 +1357,10 @@ def to_dict(self):
1349
1357
"transaction_id" : snapshot ._transaction_id ,
1350
1358
}
1351
1359
1360
+ @property
1361
+ def observability_options (self ):
1362
+ return getattr (self ._database , "observability_options" , {})
1363
+
1352
1364
def _get_session (self ):
1353
1365
"""Create session as needed.
1354
1366
@@ -1468,27 +1480,32 @@ def generate_read_batches(
1468
1480
mappings of information used perform actual partitioned reads via
1469
1481
:meth:`process_read_batch`.
1470
1482
"""
1471
- partitions = self ._get_snapshot ().partition_read (
1472
- table = table ,
1473
- columns = columns ,
1474
- keyset = keyset ,
1475
- index = index ,
1476
- partition_size_bytes = partition_size_bytes ,
1477
- max_partitions = max_partitions ,
1478
- retry = retry ,
1479
- timeout = timeout ,
1480
- )
1483
+ with trace_call (
1484
+ f"CloudSpanner.{ type (self ).__name__ } .generate_read_batches" ,
1485
+ extra_attributes = dict (table = table , columns = columns ),
1486
+ observability_options = self .observability_options ,
1487
+ ):
1488
+ partitions = self ._get_snapshot ().partition_read (
1489
+ table = table ,
1490
+ columns = columns ,
1491
+ keyset = keyset ,
1492
+ index = index ,
1493
+ partition_size_bytes = partition_size_bytes ,
1494
+ max_partitions = max_partitions ,
1495
+ retry = retry ,
1496
+ timeout = timeout ,
1497
+ )
1481
1498
1482
- read_info = {
1483
- "table" : table ,
1484
- "columns" : columns ,
1485
- "keyset" : keyset ._to_dict (),
1486
- "index" : index ,
1487
- "data_boost_enabled" : data_boost_enabled ,
1488
- "directed_read_options" : directed_read_options ,
1489
- }
1490
- for partition in partitions :
1491
- yield {"partition" : partition , "read" : read_info .copy ()}
1499
+ read_info = {
1500
+ "table" : table ,
1501
+ "columns" : columns ,
1502
+ "keyset" : keyset ._to_dict (),
1503
+ "index" : index ,
1504
+ "data_boost_enabled" : data_boost_enabled ,
1505
+ "directed_read_options" : directed_read_options ,
1506
+ }
1507
+ for partition in partitions :
1508
+ yield {"partition" : partition , "read" : read_info .copy ()}
1492
1509
1493
1510
def process_read_batch (
1494
1511
self ,
@@ -1514,12 +1531,17 @@ def process_read_batch(
1514
1531
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
1515
1532
:returns: a result set instance which can be used to consume rows.
1516
1533
"""
1517
- kwargs = copy .deepcopy (batch ["read" ])
1518
- keyset_dict = kwargs .pop ("keyset" )
1519
- kwargs ["keyset" ] = KeySet ._from_dict (keyset_dict )
1520
- return self ._get_snapshot ().read (
1521
- partition = batch ["partition" ], ** kwargs , retry = retry , timeout = timeout
1522
- )
1534
+ observability_options = self .observability_options or {}
1535
+ with trace_call (
1536
+ f"CloudSpanner.{ type (self ).__name__ } .process_read_batch" ,
1537
+ observability_options = observability_options ,
1538
+ ):
1539
+ kwargs = copy .deepcopy (batch ["read" ])
1540
+ keyset_dict = kwargs .pop ("keyset" )
1541
+ kwargs ["keyset" ] = KeySet ._from_dict (keyset_dict )
1542
+ return self ._get_snapshot ().read (
1543
+ partition = batch ["partition" ], ** kwargs , retry = retry , timeout = timeout
1544
+ )
1523
1545
1524
1546
def generate_query_batches (
1525
1547
self ,
@@ -1594,34 +1616,39 @@ def generate_query_batches(
1594
1616
mappings of information used perform actual partitioned reads via
1595
1617
:meth:`process_read_batch`.
1596
1618
"""
1597
- partitions = self ._get_snapshot ().partition_query (
1598
- sql = sql ,
1599
- params = params ,
1600
- param_types = param_types ,
1601
- partition_size_bytes = partition_size_bytes ,
1602
- max_partitions = max_partitions ,
1603
- retry = retry ,
1604
- timeout = timeout ,
1605
- )
1619
+ with trace_call (
1620
+ f"CloudSpanner.{ type (self ).__name__ } .generate_query_batches" ,
1621
+ extra_attributes = dict (sql = sql ),
1622
+ observability_options = self .observability_options ,
1623
+ ):
1624
+ partitions = self ._get_snapshot ().partition_query (
1625
+ sql = sql ,
1626
+ params = params ,
1627
+ param_types = param_types ,
1628
+ partition_size_bytes = partition_size_bytes ,
1629
+ max_partitions = max_partitions ,
1630
+ retry = retry ,
1631
+ timeout = timeout ,
1632
+ )
1606
1633
1607
- query_info = {
1608
- "sql" : sql ,
1609
- "data_boost_enabled" : data_boost_enabled ,
1610
- "directed_read_options" : directed_read_options ,
1611
- }
1612
- if params :
1613
- query_info ["params" ] = params
1614
- query_info ["param_types" ] = param_types
1615
-
1616
- # Query-level options have higher precedence than client-level and
1617
- # environment-level options
1618
- default_query_options = self ._database ._instance ._client ._query_options
1619
- query_info ["query_options" ] = _merge_query_options (
1620
- default_query_options , query_options
1621
- )
1634
+ query_info = {
1635
+ "sql" : sql ,
1636
+ "data_boost_enabled" : data_boost_enabled ,
1637
+ "directed_read_options" : directed_read_options ,
1638
+ }
1639
+ if params :
1640
+ query_info ["params" ] = params
1641
+ query_info ["param_types" ] = param_types
1642
+
1643
+ # Query-level options have higher precedence than client-level and
1644
+ # environment-level options
1645
+ default_query_options = self ._database ._instance ._client ._query_options
1646
+ query_info ["query_options" ] = _merge_query_options (
1647
+ default_query_options , query_options
1648
+ )
1622
1649
1623
- for partition in partitions :
1624
- yield {"partition" : partition , "query" : query_info }
1650
+ for partition in partitions :
1651
+ yield {"partition" : partition , "query" : query_info }
1625
1652
1626
1653
def process_query_batch (
1627
1654
self ,
@@ -1646,9 +1673,16 @@ def process_query_batch(
1646
1673
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
1647
1674
:returns: a result set instance which can be used to consume rows.
1648
1675
"""
1649
- return self ._get_snapshot ().execute_sql (
1650
- partition = batch ["partition" ], ** batch ["query" ], retry = retry , timeout = timeout
1651
- )
1676
+ with trace_call (
1677
+ f"CloudSpanner.{ type (self ).__name__ } .process_query_batch" ,
1678
+ observability_options = self .observability_options ,
1679
+ ):
1680
+ return self ._get_snapshot ().execute_sql (
1681
+ partition = batch ["partition" ],
1682
+ ** batch ["query" ],
1683
+ retry = retry ,
1684
+ timeout = timeout ,
1685
+ )
1652
1686
1653
1687
def run_partitioned_query (
1654
1688
self ,
@@ -1703,18 +1737,23 @@ def run_partitioned_query(
1703
1737
:rtype: :class:`~google.cloud.spanner_v1.merged_result_set.MergedResultSet`
1704
1738
:returns: a result set instance which can be used to consume rows.
1705
1739
"""
1706
- partitions = list (
1707
- self .generate_query_batches (
1708
- sql ,
1709
- params ,
1710
- param_types ,
1711
- partition_size_bytes ,
1712
- max_partitions ,
1713
- query_options ,
1714
- data_boost_enabled ,
1740
+ with trace_call (
1741
+ f"CloudSpanner.${ type (self ).__name__ } .run_partitioned_query" ,
1742
+ extra_attributes = dict (sql = sql ),
1743
+ observability_options = self .observability_options ,
1744
+ ):
1745
+ partitions = list (
1746
+ self .generate_query_batches (
1747
+ sql ,
1748
+ params ,
1749
+ param_types ,
1750
+ partition_size_bytes ,
1751
+ max_partitions ,
1752
+ query_options ,
1753
+ data_boost_enabled ,
1754
+ )
1715
1755
)
1716
- )
1717
- return MergedResultSet (self , partitions , 0 )
1756
+ return MergedResultSet (self , partitions , 0 )
1718
1757
1719
1758
def process (self , batch ):
1720
1759
"""Process a single, partitioned query or read.
0 commit comments