@@ -305,7 +305,11 @@ class SasTokenStage(PipelineStage):
305
305
306
306
def __init__ (self ):
307
307
super (SasTokenStage , self ).__init__ ()
308
+ # Indicates when token needs to be updated
308
309
self ._token_update_alarm = None
310
+ # Indicates when to retry a failed reauthorization attempt
311
+ # (only used with renewable SAS auth)
312
+ self ._reauth_retry_timer = None
309
313
310
314
@pipeline_thread .runs_on_pipeline_thread
311
315
def _run_op (self , op ):
@@ -335,6 +339,7 @@ def _run_op(self, op):
335
339
self .send_op_down (op )
336
340
elif isinstance (op , pipeline_ops_base .ShutdownPipelineOperation ):
337
341
self ._cancel_token_update_alarm ()
342
+ self ._cancel_reauth_retry_timer ()
338
343
self .send_op_down (op )
339
344
else :
340
345
self .send_op_down (op )
@@ -349,6 +354,16 @@ def _cancel_token_update_alarm(self):
349
354
old_alarm .cancel ()
350
355
old_alarm = None
351
356
357
+ @pipeline_thread .runs_on_pipeline_thread
358
+ def _cancel_reauth_retry_timer (self ):
359
+ """Cancel and delete any pending reauth retry timer"""
360
+ old_reauth_retry_timer = self ._reauth_retry_timer
361
+ self ._reauth_retry_timer = None
362
+ if old_reauth_retry_timer :
363
+ logger .debug ("Cancelling reauthorization retry timer" )
364
+ old_reauth_retry_timer .cancel ()
365
+ old_reauth_retry_timer = None
366
+
352
367
@pipeline_thread .runs_on_pipeline_thread
353
368
def _start_token_update_alarm (self ):
354
369
"""Begin an update alarm.
@@ -370,41 +385,27 @@ def _start_token_update_alarm(self):
370
385
# and then start another alarm.
371
386
if isinstance (self .pipeline_root .pipeline_configuration .sastoken , st .RenewableSasToken ):
372
387
logger .debug (
373
- "Scheduling automatic SAS Token renewal at epoch time: {}" .format (update_time )
388
+ "{}: Scheduling automatic SAS Token renewal at epoch time: {}" .format (
389
+ self .name , update_time
390
+ )
374
391
)
375
392
376
- @pipeline_thread .runs_on_pipeline_thread
377
- def on_reauthorize_complete (op , error ):
378
- this = self_weakref ()
379
- if error :
380
- logger .info (
381
- "{}({}): reauthorize connection operation failed. Error={}" .format (
382
- this .name , op .name , error
383
- )
384
- )
385
- handle_exceptions .handle_background_exception (error )
386
- else :
387
- logger .info (
388
- "{}({}): reauthorize connection operation is complete" .format (
389
- this .name , op .name
390
- )
391
- )
392
-
393
393
@pipeline_thread .invoke_on_pipeline_thread_nowait
394
394
def renew_token ():
395
395
this = self_weakref ()
396
+ # Cancel any token reauth retry timer in progress (from a previous renewal)
397
+ this ._cancel_reauth_retry_timer ()
396
398
logger .info ("Renewing SAS Token..." )
397
399
# Renew the token
398
400
sastoken = this .pipeline_root .pipeline_configuration .sastoken
399
401
sastoken .refresh ()
400
402
# If the pipeline is already connected, send order to reauthorize the connection
401
- # now that token has been renewed
403
+ # now that token has been renewed. If the pipeline is not currently connected,
404
+ # there is no need to do this, as the next connection will be using the new
405
+ # credentials.
402
406
if this .pipeline_root .connected :
403
- this .send_op_down (
404
- pipeline_ops_base .ReauthorizeConnectionOperation (
405
- callback = on_reauthorize_complete
406
- )
407
- )
407
+ this ._reauthorize ()
408
+
408
409
# Once again, start a renewal alarm
409
410
this ._start_token_update_alarm ()
410
411
@@ -428,6 +429,57 @@ def request_new_token():
428
429
self ._token_update_alarm .daemon = True
429
430
self ._token_update_alarm .start ()
430
431
432
+ @pipeline_thread .runs_on_pipeline_thread
433
+ def _reauthorize (self ):
434
+ self_weakref = weakref .ref (self )
435
+
436
+ @pipeline_thread .runs_on_pipeline_thread
437
+ def on_reauthorize_complete (op , error ):
438
+ this = self_weakref ()
439
+ if error :
440
+ logger .info (
441
+ "{}: Connection reauthorization failed. Error={}" .format (this .name , error )
442
+ )
443
+ handle_exceptions .handle_background_exception (error )
444
+ # If connection has not been somehow re-established, we need to keep trying
445
+ # because for the reauthorization to originally have been issued, we were in
446
+ # a connected state.
447
+ # NOTE: we only do this if connection retry is enabled on the pipeline. If it is,
448
+ # we have a contract to maintain a connection. If it has been disabled, we have
449
+ # a contract to not do so.
450
+ # NOTE: We can't rely on the ReconnectStage to do this because 1) the pipeline
451
+ # stages should stand on their own, and 2) if the reauth failed, the ReconnectStage
452
+ # wouldn't know to reconnect, because the expected state of a failed reauth is
453
+ # to be disconnected.
454
+ if (
455
+ not this .pipeline_root .connected
456
+ and this .pipeline_root .pipeline_configuration .connection_retry
457
+ ):
458
+ logger .info ("{}: Retrying connection reauthorization" .format (this .name ))
459
+ # No need to cancel the timer, because if this is running, it has already ended
460
+
461
+ def retry_reauthorize ():
462
+ # We need to check this when the timer expires as well as before creating
463
+ # the timer in case connection has been re-established while timer was
464
+ # running
465
+ if not this .pipeline_root .connected :
466
+ this ._reauthorize ()
467
+
468
+ this ._reauth_retry_timer = threading .Timer (
469
+ this .pipeline_root .pipeline_configuration .connection_retry_interval ,
470
+ retry_reauthorize ,
471
+ )
472
+ this ._reauth_retry_timer .daemon = True
473
+ this ._reauth_retry_timer .start ()
474
+
475
+ else :
476
+ logger .info ("{}: Connection reauthorization successful" .format (this .name ))
477
+
478
+ logger .info ("{}: Starting reauthorization process for new SAS token" .format (self .name ))
479
+ self .send_op_down (
480
+ pipeline_ops_base .ReauthorizeConnectionOperation (callback = on_reauthorize_complete )
481
+ )
482
+
431
483
432
484
class AutoConnectStage (PipelineStage ):
433
485
"""
0 commit comments