16
16
17
17
import datetime
18
18
import queue
19
+ import time
19
20
20
21
from google .cloud .exceptions import NotFound
21
22
from google .cloud .spanner_v1 import BatchCreateSessionsRequest
24
25
_metadata_with_prefix ,
25
26
_metadata_with_leader_aware_routing ,
26
27
)
28
+ from google .cloud .spanner_v1 ._opentelemetry_tracing import (
29
+ get_current_span ,
30
+ )
27
31
from warnings import warn
28
32
29
33
_NOW = datetime .datetime .utcnow # unit tests may replace
@@ -199,13 +203,32 @@ def bind(self, database):
199
203
_metadata_with_leader_aware_routing (database ._route_to_leader_enabled )
200
204
)
201
205
self ._database_role = self ._database_role or self ._database .database_role
206
+ requested_session_count = self .size - self ._sessions .qsize ()
202
207
request = BatchCreateSessionsRequest (
203
208
database = database .name ,
204
- session_count = self . size - self . _sessions . qsize () ,
209
+ session_count = requested_session_count ,
205
210
session_template = Session (creator_role = self .database_role ),
206
211
)
207
212
213
+ current_span = get_current_span ()
214
+ if requested_session_count > 0 :
215
+ current_span .add_event (
216
+ f"Requesting { requested_session_count } sessions" ,
217
+ {"kind" : "fixed_size_pool" },
218
+ )
219
+
220
+ if self ._sessions .full ():
221
+ current_span .add_event (
222
+ "Session pool is already full" , {"kind" : "fixed_size_pool" }
223
+ )
224
+ return
225
+
226
+ returned_session_count = 0
208
227
while not self ._sessions .full ():
228
+ current_span .add_event (
229
+ f"Creating { request .session_count } sessions" ,
230
+ {"kind" : "fixed_size_pool" },
231
+ )
209
232
resp = api .batch_create_sessions (
210
233
request = request ,
211
234
metadata = metadata ,
@@ -214,6 +237,12 @@ def bind(self, database):
214
237
session = self ._new_session ()
215
238
session ._session_id = session_pb .name .split ("/" )[- 1 ]
216
239
self ._sessions .put (session )
240
+ returned_session_count += 1
241
+
242
+ current_span .add_event (
243
+ f"Requested for { requested_session_count } , returned { returned_session_count } " ,
244
+ {"kind" : "fixed_size_pool" },
245
+ )
217
246
218
247
def get (self , timeout = None ):
219
248
"""Check a session out from the pool.
@@ -229,12 +258,23 @@ def get(self, timeout=None):
229
258
if timeout is None :
230
259
timeout = self .default_timeout
231
260
261
+ start_time = time .time ()
262
+ current_span = get_current_span ()
263
+ current_span .add_event ("Acquiring session" , {"kind" : type (self ).__name__ })
232
264
session = self ._sessions .get (block = True , timeout = timeout )
233
265
234
266
if not session .exists ():
235
267
session = self ._database .session ()
236
268
session .create ()
237
269
270
+ current_span .add_event (
271
+ "Acquired session" ,
272
+ {
273
+ "time.elapsed" : time .time () - start_time ,
274
+ "session.id" : session .session_id ,
275
+ "kind" : type (self ).__name__ ,
276
+ },
277
+ )
238
278
return session
239
279
240
280
def put (self , session ):
@@ -307,6 +347,10 @@ def get(self):
307
347
:returns: an existing session from the pool, or a newly-created
308
348
session.
309
349
"""
350
+ start_time = time .time ()
351
+ current_span = get_current_span ()
352
+ current_span .add_event ("Acquiring session" , {"kind" : type (self ).__name__ })
353
+
310
354
try :
311
355
session = self ._sessions .get_nowait ()
312
356
except queue .Empty :
@@ -316,6 +360,15 @@ def get(self):
316
360
if not session .exists ():
317
361
session = self ._new_session ()
318
362
session .create ()
363
+ else :
364
+ current_span .add_event (
365
+ "Cache hit: has usable session" ,
366
+ {
367
+ "id" : session .session_id ,
368
+ "kind" : type (self ).__name__ ,
369
+ },
370
+ )
371
+
319
372
return session
320
373
321
374
def put (self , session ):
@@ -422,6 +475,18 @@ def bind(self, database):
422
475
session_template = Session (creator_role = self .database_role ),
423
476
)
424
477
478
+ requested_session_count = request .session_count
479
+ current_span = get_current_span ()
480
+ current_span .add_event (f"Requesting { requested_session_count } sessions" )
481
+
482
+ if created_session_count >= self .size :
483
+ current_span .add_event (
484
+ "Created no new sessions as sessionPool is full" ,
485
+ {"kind" : type (self ).__name__ },
486
+ )
487
+ return
488
+
489
+ returned_session_count = 0
425
490
while created_session_count < self .size :
426
491
resp = api .batch_create_sessions (
427
492
request = request ,
@@ -431,8 +496,17 @@ def bind(self, database):
431
496
session = self ._new_session ()
432
497
session ._session_id = session_pb .name .split ("/" )[- 1 ]
433
498
self .put (session )
499
+ returned_session_count += 1
500
+
434
501
created_session_count += len (resp .session )
435
502
503
+ current_span .add_event (
504
+ "Requested for {requested_session_count} sessions, return {returned_session_count}" ,
505
+ {
506
+ "kind" : "pinging_pool" ,
507
+ },
508
+ )
509
+
436
510
def get (self , timeout = None ):
437
511
"""Check a session out from the pool.
438
512
@@ -447,6 +521,12 @@ def get(self, timeout=None):
447
521
if timeout is None :
448
522
timeout = self .default_timeout
449
523
524
+ start_time = time .time ()
525
+ current_span = get_current_span ()
526
+ current_span .add_event (
527
+ "Waiting for a session to become available" , {"kind" : "pinging_pool" }
528
+ )
529
+
450
530
ping_after , session = self ._sessions .get (block = True , timeout = timeout )
451
531
452
532
if _NOW () > ping_after :
@@ -457,6 +537,14 @@ def get(self, timeout=None):
457
537
session = self ._new_session ()
458
538
session .create ()
459
539
540
+ current_span .add_event (
541
+ "Acquired session" ,
542
+ {
543
+ "time.elapsed" : time .time () - start_time ,
544
+ "session.id" : session .session_id ,
545
+ "kind" : "pinging_pool" ,
546
+ },
547
+ )
460
548
return session
461
549
462
550
def put (self , session ):
0 commit comments