From 35b98d4506c805d6034f5e87911630e2c2140d62 Mon Sep 17 00:00:00 2001 From: Anthony Bartoletti Date: Wed, 18 Dec 2024 19:10:42 -0600 Subject: [PATCH] Passed non_blocking variable down call-chain, implemented loop for task_wait on blocking, force globus_transfer to return a sting status, added more logging. --- zstash/globus.py | 64 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 5 deletions(-) diff --git a/zstash/globus.py b/zstash/globus.py index 964baf9b..3769fc7a 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -11,6 +11,7 @@ from globus_sdk import TransferAPIError, TransferClient, TransferData from globus_sdk.services.transfer.response.iterable import IterableTransferResponse from six.moves.urllib.parse import urlparse +from .utils import ts_utc from .settings import logger @@ -158,7 +159,7 @@ def file_exists(name: str) -> bool: def globus_transfer( - remote_ep: str, remote_path: str, name: str, transfer_type: str + remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool ): # noqa: C901 global transfer_client global local_endpoint @@ -167,6 +168,7 @@ def globus_transfer( global task_id global archive_directory_listing + logger.info(f"{ts_utc()}: Entered globus_transfer() for name = {name}") if not transfer_client: globus_activate("globus://" + remote_ep) if not transfer_client: @@ -216,21 +218,37 @@ def globus_transfer( try: if task_id: task = transfer_client.get_task(task_id) + # one of {ACTIVE, SUCCEEDED, FAILED, CANCELED, PENDING, INACTIVE} if task["status"] == "ACTIVE": - return + logger.info(f"{ts_utc()}: Globus task_id {task_id} Still Active") + return "ACTIVE" elif task["status"] == "SUCCEEDED": + logger.info(f"{ts_utc()}: Globus task_id {task_id} status = SUCCEEDED") src_ep = task["source_endpoint_id"] dst_ep = task["destination_endpoint_id"] label = task["label"] + ts = ts_utc() logger.info( - "Globus transfer {}, from {} to {}: {} succeeded".format( - task_id, src_ep, dst_ep, label + "{}:Globus transfer {}, from {} to {}: {} succeeded".format( + ts, task_id, src_ep, dst_ep, label ) ) else: - logger.error("Transfer FAILED") + logger.error(f"{ts_utc()}: Transfer FAILED (task_id = {task_id})") + + # DEBUG: review accumulated items in TransferData + logger.info(f"{ts_utc()}: TransferData: accumulated items:") + attribs = transfer_data.__dict__ + for item in attribs['data']['DATA']: + if item['DATA_TYPE'] == "transfer_item": + print(f" source item: {item['source_path']}") + + # SUBMIT new transfer here + logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") task = submit_transfer_with_checks(transfer_data) task_id = task.get("task_id") + logger.info(f"{ts_utc()}: SURFACE Submit Transfer returned task_id = {task_id} for label {transfer_data['label']}") + transfer_data = None except TransferAPIError as e: if e.code == "NoCredException": @@ -246,9 +264,45 @@ def globus_transfer( logger.error("Exception: {}".format(e)) sys.exit(1) + # test for blocking + task_status = "UNKNOWN" + if not non_blocking: + logger.info(f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}") + wait_timeout = 5 + max_retries = 3 + retry_count = 0 + while retry_count < max_retries: + try: + # Wait for the task to complete + transfer_client.task_wait(task_id, timeout=wait_timeout, polling_interval=1) + except GlobusHTTPError as e: + logger.error(f"Exception: {e}") + except Exception as e: + logger.error(f"Unexpected Exception: {e}") + else: + curr_task = transfer_client.get_task(task_id) + task_status = curr_task['status'] + if task_status == "SUCCEEDED": + break + finally: + retry_count += 1 + logger.info(f"{ts_utc()}: BLOCKING retry_count = {retry_count} of {max_retries} of timeout {wait_timeout} seconds") + + if retry_count == max_retries: + logger.info(f"{ts_utc()}: BLOCKING EXHAUSTED {max_retries} of timeout 5 seconds") + task_status = "EXHAUSTED_TIMEOUT_RETRIES" + + logger.info(f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}") + else: + logger.info(f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {task_id}") + + if transfer_type == "put": + return task_status + if transfer_type == "get" and task_id: globus_wait(task_id) + return task_status def globus_wait(task_id: str): global transfer_client