Skip to content

Commit

Permalink
Passed non_blocking variable down call-chain, implemented loop for ta…
Browse files Browse the repository at this point in the history
…sk_wait on blocking, force globus_transfer to return a sting status, added more logging.
  • Loading branch information
Anthony Bartoletti committed Dec 19, 2024
1 parent f48a372 commit 35b98d4
Showing 1 changed file with 59 additions and 5 deletions.
64 changes: 59 additions & 5 deletions zstash/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from globus_sdk import TransferAPIError, TransferClient, TransferData
from globus_sdk.services.transfer.response.iterable import IterableTransferResponse
from six.moves.urllib.parse import urlparse
from .utils import ts_utc

from .settings import logger

Expand Down Expand Up @@ -158,7 +159,7 @@ def file_exists(name: str) -> bool:


def globus_transfer(
remote_ep: str, remote_path: str, name: str, transfer_type: str
remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool
): # noqa: C901
global transfer_client
global local_endpoint
Expand All @@ -167,6 +168,7 @@ def globus_transfer(
global task_id
global archive_directory_listing

logger.info(f"{ts_utc()}: Entered globus_transfer() for name = {name}")
if not transfer_client:
globus_activate("globus://" + remote_ep)
if not transfer_client:
Expand Down Expand Up @@ -216,21 +218,37 @@ def globus_transfer(
try:
if task_id:
task = transfer_client.get_task(task_id)
# one of {ACTIVE, SUCCEEDED, FAILED, CANCELED, PENDING, INACTIVE}
if task["status"] == "ACTIVE":
return
logger.info(f"{ts_utc()}: Globus task_id {task_id} Still Active")
return "ACTIVE"
elif task["status"] == "SUCCEEDED":
logger.info(f"{ts_utc()}: Globus task_id {task_id} status = SUCCEEDED")
src_ep = task["source_endpoint_id"]
dst_ep = task["destination_endpoint_id"]
label = task["label"]
ts = ts_utc()
logger.info(
"Globus transfer {}, from {} to {}: {} succeeded".format(
task_id, src_ep, dst_ep, label
"{}:Globus transfer {}, from {} to {}: {} succeeded".format(
ts, task_id, src_ep, dst_ep, label
)
)
else:
logger.error("Transfer FAILED")
logger.error(f"{ts_utc()}: Transfer FAILED (task_id = {task_id})")

# DEBUG: review accumulated items in TransferData
logger.info(f"{ts_utc()}: TransferData: accumulated items:")
attribs = transfer_data.__dict__
for item in attribs['data']['DATA']:
if item['DATA_TYPE'] == "transfer_item":
print(f" source item: {item['source_path']}")

# SUBMIT new transfer here
logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}")
task = submit_transfer_with_checks(transfer_data)
task_id = task.get("task_id")
logger.info(f"{ts_utc()}: SURFACE Submit Transfer returned task_id = {task_id} for label {transfer_data['label']}")

transfer_data = None
except TransferAPIError as e:
if e.code == "NoCredException":
Expand All @@ -246,9 +264,45 @@ def globus_transfer(
logger.error("Exception: {}".format(e))
sys.exit(1)

# test for blocking
task_status = "UNKNOWN"
if not non_blocking:
logger.info(f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}")
wait_timeout = 5
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
# Wait for the task to complete
transfer_client.task_wait(task_id, timeout=wait_timeout, polling_interval=1)
except GlobusHTTPError as e:
logger.error(f"Exception: {e}")
except Exception as e:
logger.error(f"Unexpected Exception: {e}")
else:
curr_task = transfer_client.get_task(task_id)
task_status = curr_task['status']
if task_status == "SUCCEEDED":
break
finally:
retry_count += 1
logger.info(f"{ts_utc()}: BLOCKING retry_count = {retry_count} of {max_retries} of timeout {wait_timeout} seconds")

if retry_count == max_retries:
logger.info(f"{ts_utc()}: BLOCKING EXHAUSTED {max_retries} of timeout 5 seconds")
task_status = "EXHAUSTED_TIMEOUT_RETRIES"

logger.info(f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}")
else:
logger.info(f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {task_id}")

if transfer_type == "put":
return task_status

if transfer_type == "get" and task_id:
globus_wait(task_id)

return task_status

def globus_wait(task_id: str):
global transfer_client
Expand Down

0 comments on commit 35b98d4

Please sign in to comment.