11
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
+
14
15
"""Spanner read-write transaction support."""
15
16
import functools
16
17
import threading
@@ -90,20 +91,18 @@ def _make_txn_selector(self):
90
91
self ._check_state ()
91
92
92
93
if self ._transaction_id is None :
93
- return TransactionSelector (begin = TransactionOptions (
94
- read_write = TransactionOptions .ReadWrite (),
95
- exclude_txn_from_change_streams = self .
96
- exclude_txn_from_change_streams ,
97
- ))
94
+ return TransactionSelector (
95
+ begin = TransactionOptions (
96
+ read_write = TransactionOptions .ReadWrite (),
97
+ exclude_txn_from_change_streams = self .exclude_txn_from_change_streams ,
98
+ )
99
+ )
98
100
else :
99
101
return TransactionSelector (id = self ._transaction_id )
100
102
101
- def _execute_request (self ,
102
- method ,
103
- request ,
104
- trace_name = None ,
105
- session = None ,
106
- attributes = None ):
103
+ def _execute_request (
104
+ self , method , request , trace_name = None , session = None , attributes = None
105
+ ):
107
106
"""Helper method to execute request after fetching transaction selector.
108
107
109
108
:type method: callable
@@ -118,9 +117,7 @@ def _execute_request(self,
118
117
method = functools .partial (method , request = request )
119
118
response = _retry (
120
119
method ,
121
- allowed_exceptions = {
122
- InternalServerError : _check_rst_stream_error
123
- },
120
+ allowed_exceptions = {InternalServerError : _check_rst_stream_error },
124
121
)
125
122
126
123
return response
@@ -147,12 +144,11 @@ def begin(self):
147
144
metadata = _metadata_with_prefix (database .name )
148
145
if database ._route_to_leader_enabled :
149
146
metadata .append (
150
- _metadata_with_leader_aware_routing (
151
- database . _route_to_leader_enabled ) )
147
+ _metadata_with_leader_aware_routing (database . _route_to_leader_enabled )
148
+ )
152
149
txn_options = TransactionOptions (
153
150
read_write = TransactionOptions .ReadWrite (),
154
- exclude_txn_from_change_streams = self .
155
- exclude_txn_from_change_streams ,
151
+ exclude_txn_from_change_streams = self .exclude_txn_from_change_streams ,
156
152
)
157
153
with trace_call ("CloudSpanner.BeginTransaction" , self ._session ):
158
154
method = functools .partial (
@@ -163,9 +159,7 @@ def begin(self):
163
159
)
164
160
response = _retry (
165
161
method ,
166
- allowed_exceptions = {
167
- InternalServerError : _check_rst_stream_error
168
- },
162
+ allowed_exceptions = {InternalServerError : _check_rst_stream_error },
169
163
)
170
164
self ._transaction_id = response .id
171
165
return self ._transaction_id
@@ -181,7 +175,9 @@ def rollback(self):
181
175
if database ._route_to_leader_enabled :
182
176
metadata .append (
183
177
_metadata_with_leader_aware_routing (
184
- database ._route_to_leader_enabled ))
178
+ database ._route_to_leader_enabled
179
+ )
180
+ )
185
181
with trace_call ("CloudSpanner.Rollback" , self ._session ):
186
182
method = functools .partial (
187
183
api .rollback ,
@@ -191,17 +187,14 @@ def rollback(self):
191
187
)
192
188
_retry (
193
189
method ,
194
- allowed_exceptions = {
195
- InternalServerError : _check_rst_stream_error
196
- },
190
+ allowed_exceptions = {InternalServerError : _check_rst_stream_error },
197
191
)
198
192
self .rolled_back = True
199
193
del self ._session ._transaction
200
194
201
- def commit (self ,
202
- return_commit_stats = False ,
203
- request_options = None ,
204
- max_commit_delay = None ):
195
+ def commit (
196
+ self , return_commit_stats = False , request_options = None , max_commit_delay = None
197
+ ):
205
198
"""Commit mutations to the database.
206
199
207
200
:type return_commit_stats: bool
@@ -236,8 +229,8 @@ def commit(self,
236
229
metadata = _metadata_with_prefix (database .name )
237
230
if database ._route_to_leader_enabled :
238
231
metadata .append (
239
- _metadata_with_leader_aware_routing (
240
- database . _route_to_leader_enabled ) )
232
+ _metadata_with_leader_aware_routing (database . _route_to_leader_enabled )
233
+ )
241
234
trace_attributes = {"num_mutations" : len (self ._mutations )}
242
235
243
236
if request_options is None :
@@ -258,18 +251,15 @@ def commit(self,
258
251
max_commit_delay = max_commit_delay ,
259
252
request_options = request_options ,
260
253
)
261
- with trace_call ("CloudSpanner.Commit" , self ._session ,
262
- trace_attributes ):
254
+ with trace_call ("CloudSpanner.Commit" , self ._session , trace_attributes ):
263
255
method = functools .partial (
264
256
api .commit ,
265
257
request = request ,
266
258
metadata = metadata ,
267
259
)
268
260
response = _retry (
269
261
method ,
270
- allowed_exceptions = {
271
- InternalServerError : _check_rst_stream_error
272
- },
262
+ allowed_exceptions = {InternalServerError : _check_rst_stream_error },
273
263
)
274
264
self .committed = response .commit_timestamp
275
265
if return_commit_stats :
@@ -298,10 +288,9 @@ def _make_params_pb(params, param_types):
298
288
If ``params`` is None but ``param_types`` is not None.
299
289
"""
300
290
if params is not None :
301
- return Struct (fields = {
302
- key : _make_value_pb (value )
303
- for key , value in params .items ()
304
- })
291
+ return Struct (
292
+ fields = {key : _make_value_pb (value ) for key , value in params .items ()}
293
+ )
305
294
306
295
return {}
307
296
@@ -363,8 +352,8 @@ def execute_update(
363
352
metadata = _metadata_with_prefix (database .name )
364
353
if database ._route_to_leader_enabled :
365
354
metadata .append (
366
- _metadata_with_leader_aware_routing (
367
- database . _route_to_leader_enabled ) )
355
+ _metadata_with_leader_aware_routing (database . _route_to_leader_enabled )
356
+ )
368
357
api = database .spanner_api
369
358
370
359
seqno , self ._execute_sql_count = (
@@ -375,8 +364,7 @@ def execute_update(
375
364
# Query-level options have higher precedence than client-level and
376
365
# environment-level options
377
366
default_query_options = database ._instance ._client ._query_options
378
- query_options = _merge_query_options (default_query_options ,
379
- query_options )
367
+ query_options = _merge_query_options (default_query_options , query_options )
380
368
381
369
if request_options is None :
382
370
request_options = RequestOptions ()
@@ -416,9 +404,12 @@ def execute_update(
416
404
trace_attributes ,
417
405
)
418
406
# Setting the transaction id because the transaction begin was inlined for first rpc.
419
- if (self ._transaction_id is None and response is not None
420
- and response .metadata is not None
421
- and response .metadata .transaction is not None ):
407
+ if (
408
+ self ._transaction_id is None
409
+ and response is not None
410
+ and response .metadata is not None
411
+ and response .metadata .transaction is not None
412
+ ):
422
413
self ._transaction_id = response .metadata .transaction .id
423
414
else :
424
415
response = self ._execute_request (
@@ -481,16 +472,17 @@ def batch_update(
481
472
dml , params , param_types = statement
482
473
params_pb = self ._make_params_pb (params , param_types )
483
474
parsed .append (
484
- ExecuteBatchDmlRequest .Statement (sql = dml ,
485
- params = params_pb ,
486
- param_types = param_types ))
475
+ ExecuteBatchDmlRequest .Statement (
476
+ sql = dml , params = params_pb , param_types = param_types
477
+ )
478
+ )
487
479
488
480
database = self ._session ._database
489
481
metadata = _metadata_with_prefix (database .name )
490
482
if database ._route_to_leader_enabled :
491
483
metadata .append (
492
- _metadata_with_leader_aware_routing (
493
- database . _route_to_leader_enabled ) )
484
+ _metadata_with_leader_aware_routing (database . _route_to_leader_enabled )
485
+ )
494
486
api = database .spanner_api
495
487
496
488
seqno , self ._execute_sql_count = (
@@ -535,9 +527,11 @@ def batch_update(
535
527
)
536
528
# Setting the transaction id because the transaction begin was inlined for first rpc.
537
529
for result_set in response .result_sets :
538
- if (self ._transaction_id is None
539
- and result_set .metadata is not None
540
- and result_set .metadata .transaction is not None ):
530
+ if (
531
+ self ._transaction_id is None
532
+ and result_set .metadata is not None
533
+ and result_set .metadata .transaction is not None
534
+ ):
541
535
self ._transaction_id = result_set .metadata .transaction .id
542
536
break
543
537
else :
@@ -550,8 +544,7 @@ def batch_update(
550
544
)
551
545
552
546
row_counts = [
553
- result_set .stats .row_count_exact
554
- for result_set in response .result_sets
547
+ result_set .stats .row_count_exact for result_set in response .result_sets
555
548
]
556
549
557
550
return response .status , row_counts
0 commit comments