diff --git a/README.md b/README.md index 30cbe72e..e22f5975 100644 --- a/README.md +++ b/README.md @@ -73,9 +73,12 @@ Cromshell is a CLI for submitting workflows to a Cromwell server and monitoring/ #### Logs * `logs [workflow-id] [[workflow-id]...]` - * List the log files produced by a workflow. - * [COMING SOON] `fetch-logs [workflow-id] [[workflow-id]...]` - * Download all logs produced by a workflow. + * List the log files produced by a workflow, Defaults to print `Failed` status only. + * `-f` Download the log files produced by a workflow. + * `-p` Print the log files produced by a workflow. + * `-des` Don't expand the subworkflows. + * `-j` Print the log files produced by a workflow in JSON format. + * `-s [STATUS]` Only print logs for jobs with the given `[STATUS]`. #### Job Outputs * `list-outputs [workflow-id] [[workflow-id]...]` diff --git a/requirements.txt b/requirements.txt index 0d9de70a..6eedb85c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ +azure-identity>=1.12.0 +azure-storage-blob>=12.16.0 gcsfs>=2022.3.0 google-cloud-bigquery>=3.5.0 termcolor>=1.1.0 diff --git a/src/cromshell/logs/command.py b/src/cromshell/logs/command.py index b2187803..972ad1ec 100644 --- a/src/cromshell/logs/command.py +++ b/src/cromshell/logs/command.py @@ -1,34 +1,40 @@ +import json import logging import os +import sys +from pathlib import Path import click -import gcsfs from termcolor import colored +import cromshell.utilities.http_utils as http_utils +import cromshell.utilities.io_utils as io_utils from cromshell.metadata import command as metadata_command -from cromshell.utilities import command_setup_utils, http_utils -from cromshell.utilities.io_utils import get_color_for_status_key +from cromshell.utilities import command_setup_utils LOGGER = logging.getLogger(__name__) @click.command(name="logs") -@click.argument("workflow_id") -@click.option( - "-s", - "--status", - default="Failed", - help="Return a list with links to the logs with the indicated status. " - "Separate multiple keys by comma or use 'ALL' to print all logs. " - "Some standard Cromwell status options are 'ALL', 'Done', 'RetryableFailure', 'Running', and 'Failed'.", -) +@click.argument("workflow_ids", required=True, nargs=-1) @click.option( "-p", "--print-logs", is_flag=True, default=False, help="Print the contents of the logs to stdout if true. " - "Note: This assumes GCS bucket logs with default permissions otherwise this may not work", + "Note: This assumes GCS or Azure stored logs with default permissions otherwise this will not work", +) +@click.option( + "-d", + "--fetch-logs", + "--download-logs", + is_flag=False, + flag_value=Path.cwd(), + default=None, + show_default=True, + type=click.Path(exists=True), + help="Download the logs to the current directory or provided directory path. ", ) @click.option( "-des", @@ -37,18 +43,42 @@ default=False, help="Do not expand subworkflow info in metadata", ) +@click.option( + "-j", + "--json-summary", + is_flag=True, + default=False, + help="Print a json summary of logs, including non-file types.", +) +@click.option( + "-s", + "--status", + default="Failed", + show_default=True, + help="Return a list with links to the task logs with the indicated status. " + "Separate multiple keys by comma or use 'ALL' to print all logs. " + "Some standard Cromwell status options are 'Done', 'RetryableFailure', 'Running', and 'Failed'.", +) @click.pass_obj def main( config, - workflow_id: str, + workflow_ids: list, + json_summary: bool, status: list, dont_expand_subworkflows: bool, print_logs: bool, + fetch_logs, ): - """Get a subset of the workflow metadata.""" + """Get the logs for a workflow. + + Note: + By default, only failed tasks are returned unless + otherwise specified using -s/--status.""" LOGGER.info("logs") + return_code = 0 + # If no keys were provided then set key_param to empty else # strip trailing comma from keys and split keys by comma status_param = ( @@ -57,79 +87,67 @@ def main( else str(status).strip(",").split(",") ) - command_setup_utils.resolve_workflow_id_and_server( - workflow_id=workflow_id, cromshell_config=config - ) - LOGGER.info("Status keys set to %s", status_param) - # To grab the logs we only need a subset of the metadata from the server - obtain_and_print_logs( - config=config, - metadata_param=[ - "id", - "executionStatus", - "backendLogs", - "subWorkflowMetadata", - "subWorkflowId", - "failures", - ], - status_params=status_param, - dont_expand_subworkflows=dont_expand_subworkflows, - print_logs=print_logs, - ) - - return 0 - + for workflow_id in workflow_ids: + command_setup_utils.resolve_workflow_id_and_server( + workflow_id=workflow_id, cromshell_config=config + ) -def check_workflow_for_calls(workflow_status_json: dict) -> None: - """Check if the workflow has any calls""" + task_logs = get_task_level_logs( + config, + requested_status=status_param, + expand_subworkflows=not dont_expand_subworkflows, + ) - if not workflow_status_json.get("calls"): - if workflow_status_json.get("failures"): - LOGGER.error( - "Empty 'calls' key found in workflow metadata. " - "Workflow failed with the following error(s): %s" - % workflow_status_json["failures"], - ) - raise KeyError( - "Empty 'calls' key found in workflow metadata. " - "Workflow failed with the following error(s): %s" - % workflow_status_json["failures"], + if fetch_logs: + download_task_level_logs( + all_task_log_metadata=task_logs, path_to_download=fetch_logs ) + + if json_summary: + io_utils.pretty_print_json(format_json=task_logs) else: - LOGGER.error( - "Empty 'calls' key found in workflow metadata. " - "This may indicate no tasks were run by the workflow, " - "the workflow has yet to run any tasks, or " - "a failure occurred before the workflow started." - ) - raise KeyError( - "Empty 'calls' key found in workflow metadata." - "This may indicate no tasks were run by the workflow, " - "the workflow has yet to run any tasks, or " - "a failure occurred before the workflow started." - ) + print_task_level_logs(all_task_log_metadata=task_logs, cat_logs=print_logs) + return return_code -def obtain_and_print_logs( - config, - metadata_param: list, - print_logs: bool, - status_params: list, - dont_expand_subworkflows: bool, -) -> None: - """Format metadata parameters and obtains metadata from cromwell server""" - # Combine keys and flags into a dictionary +def get_task_level_logs(config, expand_subworkflows, requested_status) -> dict: + """Get the task level logs from the workflow metadata + + Note: This command isn't using Cromwell's 'log' api to obtain the logs. + Instead, the logs is extracted from the metadata, this allows us to filter + logs by task status, also retrieve subworkflows logs of a workflow. + + Args: + config (object): The cromshell config object + requested_status (list): The list of requested status + expand_subworkflows (bool) : Whether to expand subworkflows + """ + + metadata_keys = [ + "id", + "executionStatus", + "subWorkflowMetadata", + "subWorkflowId", + "failures", + "attempt", + "backendLogs", + "backend", + "shardIndex", + "stderr", + "stdout", + ] + + # Get metadata formatted_metadata_parameter = metadata_command.format_metadata_params( - list_of_keys=metadata_param, + list_of_keys=metadata_keys, exclude_keys=False, - expand_subworkflows=not dont_expand_subworkflows, # Invert variable + expand_subworkflows=expand_subworkflows, ) - # Request workflow metadata - workflow_status_json = metadata_command.get_workflow_metadata( + workflow_metadata = metadata_command.get_workflow_metadata( meta_params=formatted_metadata_parameter, api_workflow_id=config.cromwell_api_workflow_id, timeout=config.requests_connect_timeout, @@ -137,142 +155,215 @@ def obtain_and_print_logs( headers=http_utils.generate_headers(config), ) - check_workflow_for_calls(workflow_status_json) - - # Parse the metadata for logs and print them to the output - found_logs = print_workflow_logs( - workflow_metadata=workflow_status_json, - indent="", - expand_sub_workflows=not dont_expand_subworkflows, - status_keys=status_params, - cat_logs=print_logs, + return filter_task_logs_from_workflow_metadata( + workflow_metadata=workflow_metadata, requested_status=requested_status ) - if not found_logs: - print( - f"No logs with status {status_params} found for workflow, try adding " - f"the argument '-s ALL' to list logs with any status" - ) +def filter_task_logs_from_workflow_metadata( + workflow_metadata: dict, requested_status: list +) -> dict: + """Get the logs from the workflow metadata -def print_workflow_logs( - workflow_metadata: dict, - indent: str, - expand_sub_workflows: bool, - status_keys: list, - cat_logs: bool, -) -> bool: + Args: + workflow_metadata (dict): The workflow metadata + requested_status (list): The list of requested status """ - Recursively runs through each task of a workflow metadata and calls function to - call out to the helper in order to print found logs - :param workflow_metadata: Metadata of the workflow to process - :param indent: Indent string given as "\t", used to indent print out - :param expand_sub_workflows: Boolean, whether to print subworkflows - :return: true if any logs matching the parameters were found + calls_metadata = workflow_metadata["calls"] + all_task_logs = {} + + for call, index_list in calls_metadata.items(): + if "subWorkflowMetadata" in calls_metadata[call][0]: + all_task_logs[call] = [] + for scatter in calls_metadata[call]: + all_task_logs[call].append( + filter_task_logs_from_workflow_metadata( + scatter["subWorkflowMetadata"], + requested_status=requested_status, + ) + ) + else: + all_task_logs[call] = [] + for index in index_list: + if ( + "ALL" in requested_status + or index.get("executionStatus") in requested_status + ): + all_task_logs[call].append( + { + "attempt": index.get("attempt"), + "backendLogs": get_backend_logs(task_instance=index), + "backend": index.get("backend"), + "executionStatus": index.get("executionStatus"), + "shardIndex": index.get("shardIndex"), + "stderr": index.get("stderr"), + "stdout": index.get("stdout"), + }, + ) + + check_for_empty_logs( + workflow_logs=all_task_logs, + workflow_id=workflow_metadata["id"], + requested_status=requested_status, + ) + + return all_task_logs + + +def print_task_level_logs(all_task_log_metadata: dict, cat_logs: bool) -> None: + """Print the logs from the workflow metadata + task_logs_metadata: {call_name:[index1{task_log_name: taskvalue}, index2{...}, ...], call_name:[], ...} + + Args: + all_task_log_metadata (dict): All task logs metadata from the workflow + cat_logs (bool): Whether to print the logs """ - did_print = False - - tasks = list(workflow_metadata["calls"].keys()) - - for task in tasks: # For each task in given metadata - # If task has a key called 'subworkflowMetadata' in its first (zero) element - # (shard) and expand_sub_workflow parameter is set to true then rerun this - # function on that subworkflow - if ( - "subWorkflowMetadata" in workflow_metadata["calls"][task][0] - and expand_sub_workflows - ): - sub_workflow_name = task - task_shards = workflow_metadata["calls"][sub_workflow_name] - print(f"{indent}SubWorkflow {sub_workflow_name}") - - # For each element in total number of subworkflow calls get the subworkflow - # metadata. This loop will go through each shard if task is scattered - for i in range(len(task_shards) - 1): - sub_workflow_metadata = task_shards[i]["subWorkflowMetadata"] - - print_workflow_logs( - workflow_metadata=sub_workflow_metadata, - indent=indent + "\t", - expand_sub_workflows=expand_sub_workflows, - status_keys=status_keys, - cat_logs=cat_logs, + + for call, list_of_call_instances in all_task_log_metadata.items(): + print(f"{call}:") + for call_instance in list_of_call_instances: + if call_instance is not None: + print_file_like_value_in_dict( + task_log_metadata=call_instance, indent=1, cat_logs=cat_logs ) - # If no subworkflow is found then print status summary for the task - else: - did_print = ( - print_task_logs( - task=task, - indent=indent, - workflow_metadata=workflow_metadata, - status_keys=status_keys, + +def print_file_like_value_in_dict( + task_log_metadata: dict, indent: int, cat_logs: bool +) -> None: + """Print the file like values in the output metadata dictionary + + Args: + task_log_metadata (dict): The output metadata + indent (int): The number of tabs to indent the output + cat_logs (bool): Whether to print the logs + """ + + i = "\t" * indent + + task_status = task_log_metadata.get("executionStatus") + task_status_font = ( + io_utils.get_color_for_status_key(task_status) if task_status else None + ) + + print(colored(f"{i}status: {task_status}", color=task_status_font)) + + for log_name, log_value in task_log_metadata.items(): + if isinstance(log_value, str): + print_output_name_and_file( + output_name=log_name, + output_value=log_value, + indent=indent, + txt_color=task_status_font, + cat_logs=cat_logs, + backend=task_log_metadata.get("backend"), + ) + elif isinstance(log_value, dict): + print_file_like_value_in_dict(log_value, indent=indent, cat_logs=cat_logs) + elif isinstance(log_value, list): # Lists are subworkflows, an item is a task + print(f"{i}{log_name}:\t") # Print the subworkflow task name + for output_value_item in log_value: + print_file_like_value_in_dict( + task_log_metadata=output_value_item, + indent=indent + 1, cat_logs=cat_logs, ) - or did_print + + +def print_output_name_and_file( + output_name: str, + output_value: str, + indent: int = 0, + txt_color: str = None, + cat_logs: bool = False, + backend: str = None, +) -> None: + """Print the task name and the file name + + Args: + output_name (str): The task output name + output_value (str): The task output value + indent (bool): Whether to indent the output + cat_logs (bool): Whether to cat the log file + txt_color (str): The color to use for printing the output. Default is None. + backend: The backend to use for printing the output. Default is None. + """ + + i = "\t" * indent + + if isinstance(output_value, str) and io_utils.is_path_or_url_like(output_value): + if cat_logs: + print_log_file_content( + output_name=output_name, + output_value=output_value, + txt_color=txt_color, + backend=backend, ) + else: + print(colored(f"{i}{output_name}: {output_value}", color=txt_color)) - return did_print +def print_log_file_content( + output_name: str, + output_value: str, + txt_color: None or str = "blue", + backend: str = None, +) -> None: + """Prints output logs and cat the file if possible. -def print_task_logs( - task: str, - indent: str, - workflow_metadata: dict, - status_keys: list, - cat_logs: bool, -) -> bool: - """ - Prints the backend logs from the workflow - :param task: Name of the task - :param indent: Indent string given as a string of "\t" characters, - used to indent print out - :param workflow_metadata: Metadata of the workflow to process - :param status_keys: Determines what logs to show based on call status - :param cat_logs: Will use GCS to attempt to print the logs - :return: true if any logs were printed + Args: + output_name (str): The name of the output log. + output_value (str): The value of the output log. + txt_color (str): The color to use for printing the output. Default is "blue". + backend (str): The backend to used to run workflow. Default is None. """ + term_size = os.get_terminal_size().columns if sys.stdout.isatty() else 80 + print( + colored( + f"{'=' * term_size}\n{output_name}: {output_value}\n{'=' * term_size}", + color=txt_color, + ) + ) - did_print = False + file_contents = io_utils.cat_file(output_value, backend=backend) + if file_contents is None: + print(f"Unable to locate logs at {output_value}.") + else: + print(file_contents) - shard_list = workflow_metadata["calls"][task] + print("\n") # Add some space between logs - sharded = workflow_metadata["calls"][task][0]["shardIndex"] != -1 - for i in range(len(shard_list)): - status = shard_list[i]["executionStatus"] - if "ALL" in status_keys or status in status_keys: - task_status_font = get_color_for_status_key(status) +def check_for_empty_logs( + workflow_logs: dict, workflow_id: str, requested_status +) -> None: + """Check if the workflow logs are empty - shardstring = ( - "" if not sharded else "-shard-" + str(shard_list[i]["shardIndex"]) + Args: + :param requested_status: The status requested to be filtered + :param workflow_logs: The workflow logs + :param workflow_id: The workflow id + """ + if not workflow_logs: + LOGGER.error(f"No calls found for workflow: {workflow_id}") + raise Exception(f"No calls found for workflow: {workflow_id}") + + if "log" not in json.dumps(workflow_logs): + substrings = io_utils.BackendType.AZURE.value + io_utils.BackendType.LOCAL.value + if any(substring in json.dumps(workflow_logs) for substring in substrings): + # Cromwell does not return backendlogs for TES backend at the moment. + pass + else: + LOGGER.error( + f"No logs found for workflow: {workflow_id} with status: " + f"{requested_status}. Try adding the argument '-s ALL' to " + f"list logs with any status." + ) + raise Exception( + f"No logs found for workflow: {workflow_id} with status: " + f"{requested_status}. Try adding the argument '-s ALL' to " + f"list logs with any status." ) - - logs = get_backend_logs(shard_list[i]) - - if cat_logs: - print( - colored( - f"\n\n\n{'=' * os.get_terminal_size().columns}\n{indent}{task}{shardstring}:\t{status}\t {logs}\n{'=' * os.get_terminal_size().columns}", - color=task_status_font, - ) - ) - fs = gcsfs.GCSFileSystem() - if fs.exists(logs): - with fs.open(logs, "r") as f: - print(f.read()) - else: - print(f"Unable to locate logs at {logs}.") - - else: - print( - colored( - f"{indent}{task}{shardstring}:\t{status}\t {logs}", - color=task_status_font, - ) - ) - did_print = True - return did_print def get_backend_logs(task_instance: dict) -> str: @@ -294,5 +385,58 @@ def get_backend_logs(task_instance: dict) -> str: return backend_logs.get("log") -if __name__ == "__main__": - main() +def download_file_like_value_in_dict( + task_log_metadata: dict, path_to_download: Path or str +) -> None: + """Download the file like values in the output metadata dictionary + + :param task_log_metadata: The task log metadata + :return: None + """ + + files_to_download = [] + + for log_name, log_value in task_log_metadata.items(): + if isinstance(log_value, str): + if io_utils.is_path_or_url_like(log_value): + files_to_download.append(log_value) + elif isinstance(log_value, dict): + download_file_like_value_in_dict(log_value) + elif isinstance(log_value, list): # Lists are subworkflows, an item is a task + for output_value_item in log_value: + download_file_like_value_in_dict(task_log_metadata=output_value_item) + + path_to_downloaded_files = path_to_download + if task_log_metadata.get("backend") in io_utils.BackendType.GCP.value: + io_utils.download_gcs_files( + file_paths=files_to_download, local_dir=path_to_downloaded_files + ) + print(f"Downloaded files to: {path_to_downloaded_files}") + elif task_log_metadata.get("backend") in io_utils.BackendType.AZURE.value: + io_utils.download_azure_files( + file_paths=files_to_download, local_dir=path_to_downloaded_files + ) + print(f"Downloaded files to: {path_to_downloaded_files}") + else: + print( + f"Downloading items is unsupported for backend : {task_log_metadata.get('backend')}" + ) + + +def download_task_level_logs( + all_task_log_metadata: dict, path_to_download: Path or str +): + """Download the logs from the workflow metadata + task_logs_metadata: {call_name:[index1{task_log_name: taskvalue}, index2{...}, ...], call_name:[], ...} + + Args: + all_task_log_metadata (dict): All task logs metadata from the workflow + path_to_download: Path to download log files + """ + + for call, index_list in all_task_log_metadata.items(): + for call_index in index_list: + if call_index is not None: + download_file_like_value_in_dict( + task_log_metadata=call_index, path_to_download=path_to_download + ) diff --git a/src/cromshell/utilities/config_options_file_utils.py b/src/cromshell/utilities/config_options_file_utils.py index 85a3d4e0..23aa888d 100644 --- a/src/cromshell/utilities/config_options_file_utils.py +++ b/src/cromshell/utilities/config_options_file_utils.py @@ -12,6 +12,7 @@ "gcloud_token_email": "str", "referer_header_url": "str", "bq_cost_table": "str", + "azure_storage_account": "str", } diff --git a/src/cromshell/utilities/io_utils.py b/src/cromshell/utilities/io_utils.py index 022380aa..5ba2d62f 100644 --- a/src/cromshell/utilities/io_utils.py +++ b/src/cromshell/utilities/io_utils.py @@ -3,11 +3,16 @@ import re import shutil from contextlib import nullcontext +from enum import Enum from io import BytesIO from pathlib import Path from typing import BinaryIO, List, Union from zipfile import ZIP_DEFLATED, ZipFile +import azure.core.exceptions +from azure.identity import DefaultAzureCredential +from azure.storage.blob import BlobServiceClient +from google.cloud import storage from pygments import formatters, highlight, lexers from termcolor import colored @@ -230,6 +235,213 @@ def copy_files_to_directory( shutil.copy(inputs, directory) +def cat_file(file_path: str or Path, backend: str = None) -> str: + """Prints the contents of a file to stdout. The path can either be a + local file path, GCP file path, Azure file path, or AWS file path.""" + + # Check if the file path is a local path + if backend in BackendType.LOCAL.value: + with open(file_path, "r") as file: + file_contents = file.read() + # Check if the file path is a GCP path + elif backend in BackendType.GCP.value: + file_contents = get_gcp_file_content(file_path) + # Check if the file path is an Azure path + elif backend in BackendType.AZURE.value: + file_contents = get_azure_file_content(file_path) + else: + raise ValueError("Invalid file path") + return file_contents + + +def get_gcp_file_content(file_path: str) -> str or None: + """Returns the contents of a file located on GCP""" + + bucket_name, blob_path = file_path.split("//")[-1].split("/", 1) + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(blob_path) + + if not blob.exists(): + LOGGER.warning( + "Unable to find file '%s' in bucket '%s'", blob_path, bucket_name + ) + return None + else: + return blob.download_as_string().decode("utf-8") + + +def get_azure_file_content(file_path: str) -> str or None: + """Returns the contents of a file located on Azure + + file_path: full blob path to file on Azure example: + "/cromwell-executions/HelloWorld/5dd14f5c-4bf5-413a-9641-b6498a1778c3/call-HelloWorldTask/execution/stdout" + """ + + blob_service_client = BlobServiceClient( + account_url=f"https://{get_az_storage_account()}.blob.core.windows.net", + credential=DefaultAzureCredential(), + ) + container_name, blob_path = file_path.split("/", 2)[1:] + blob_client = blob_service_client.get_blob_client( + container=container_name, blob=blob_path + ) + blob_client.download_blob() + + try: + if blob_client.exists(): + return blob_client.download_blob().readall().decode("utf-8") + else: + LOGGER.warning( + "Unable to find file '%s' in container '%s'", blob_path, container_name + ) + return None + except azure.core.exceptions.HttpResponseError as e: + if "AuthorizationPermissionMismatch" in str(e): + LOGGER.error( + "Caught an AuthorizationPermissionMismatch error, check that" + "the Azure Storage Container has your account listed to have" + "Storage Blob Data Contributor" + ) + else: + LOGGER.error( + "Caught an error while trying to download the file from Azure: %s", e + ) + raise e + + +def get_az_storage_account() -> str: + """Returns the account name of the Azure storage account""" + + import cromshell.utilities.cromshellconfig as config + + try: + return config.cromshell_config_options["azure_storage_account"] + except KeyError: + LOGGER.error( + "An 'azure_storage_account' is required for this action but" + "was not found in Cromshell configuration file. " + ) + raise KeyError("Missing 'azure_storage_account' in Cromshell configuration") + + +def is_path_or_url_like(in_string: str) -> bool: + """Check if the string is a path or url + + Args: + in_string (str): The string to check for path or url like-ness + Returns: + bool: True if the string is a path or URL, False otherwise. + """ + prefixes = ("gs://", "/", "http://", "https://", "s3://") + return any(in_string.startswith(prefix) for prefix in prefixes) + + +def create_local_subdirectory(local_dir: str or Path, blob_path: str or Path) -> Path: + """ + Creates a local subdirectory for a given blob path. + A blob path is a path to a file in a GCS bucket. + + :param local_dir: Path to local directory + :param blob_path: Path to blob in GCS bucket + :return: + """ + + LOGGER.debug("Creating local subdirectory %s", blob_path) + + local_subdir = Path(local_dir) / Path(blob_path).parent + Path.mkdir(local_subdir, exist_ok=True, parents=True) + + return local_subdir + + +def download_gcs_files(file_paths: list, local_dir: str or Path) -> None: + """ + Downloads GCS files to local_dir while preserving directory structure + + Args: + file_paths: list of GCS file paths to download + local_dir: local directory to download files to + """ + storage_client = storage.Client() + + for file_path in file_paths: + # Extract bucket and blob path from file path + LOGGER.debug("Downloading file %s", file_path) + bucket_name, blob_path = file_path.split("//")[-1].split("/", 1) + + # Create local subdirectory if it doesn't exist + local_subdir = create_local_subdirectory( + local_dir=local_dir, blob_path=blob_path + ) + + # Download file to local subdirectory + LOGGER.debug("Downloading file %s to %s", file_path, local_subdir) + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(blob_path) + if blob.exists(): + local_path = Path(local_subdir) / Path(blob_path).name + blob.download_to_filename(local_path) + + LOGGER.debug("Downloaded file %s to %s", file_path, local_subdir) + else: + LOGGER.warning("File %s does not exist", file_path) + + +def download_azure_files(file_paths: list, local_dir: str or Path) -> None: + """ + Downloads Azure files to local_dir while preserving directory structure + + Args: + file_paths (list): List of Azure file paths to download + local_dir (str): Local directory to download files to + + """ + + default_credential = DefaultAzureCredential() + account_url = f"https://{get_az_storage_account()}.blob.core.windows.net" + + for file_path in file_paths: + # Extract container and blob path from file path + LOGGER.debug("Downloading file %s", file_path) + blob_service_client = BlobServiceClient( + account_url=account_url, credential=default_credential + ) + container_name, blob_path = file_path.split("/", 2)[1:] + blob_client = blob_service_client.get_blob_client( + container=container_name, blob=blob_path + ) + + # Create local subdirectory if it doesn't exist + local_subdir = create_local_subdirectory( + local_dir=local_dir, blob_path=blob_path + ) + + # Download file to local subdirectory + LOGGER.debug("Downloading file %s to %s", file_path, local_subdir) + if blob_client.exists(): + local_path = Path(local_subdir) / Path(blob_path).name + with open(local_path, "wb") as file: + file.write(blob_client.download_blob().readall()) + + LOGGER.debug("Downloaded file %s to %s", file_path, local_subdir) + else: + LOGGER.warning("File %s does not exist", file_path) + + +class BackendType(Enum): + """Enum to hold supported backend types""" + + # Backends listed here: https://cromwell.readthedocs.io/en/latest/backends/Backends/ + + AWS = ("AWSBatch", "AWSBatchOld", "AWSBatchOld_Single", "AWSBatch_Single") + AZURE = ("TES", "AzureBatch", "AzureBatch_Single") + GA4GH = ("TES",) + GCP = ("PAPIv2", "PAPIv2alpha1", "PAPIv2beta", "PAPIv2alpha") + LOCAL = ("Local",) + HPC = ("SGE", "SLURM", "LSF", "SunGridEngine", "HtCondor") + + class TextStatusesColor: """Holds stdout formatting per workflow status""" @@ -247,14 +459,16 @@ class TextStatusesColor: TASK_COLOR_FAILED = "red" -def get_color_for_status_key(status: str) -> str: +def get_color_for_status_key(status: str) -> str or None: """ Helper method for getting the correct font color for a given execution status for a job (or none for unrecognized statuses) """ - task_status_font = None + from cromshell.utilities.cromshellconfig import color_output + if not color_output: + return None if "Done" in status: task_status_font = TextStatusesColor.TASK_COLOR_SUCCEEDED elif "Running" in status: diff --git a/tests/integration/test_logs.py b/tests/integration/test_logs.py index 4b3316a0..aa701d22 100644 --- a/tests/integration/test_logs.py +++ b/tests/integration/test_logs.py @@ -9,17 +9,21 @@ class TestLogs: @pytest.mark.parametrize( - "wdl, json_file, expected_logs", + "wdl, json_file, status, expected_logs, exit_code", [ ( "tests/workflows/helloWorld.wdl", "tests/workflows/helloWorld.json", - "No logs with status ['Failed'] found for workflow, try adding the argument '-s ALL' to list logs with any status\n", + "ALL", + """HelloWorld.HelloWorldTask:\n\tstatus: Done\n\tstderr: /cromwell-executions/HelloWorld/2686fb3f-d2e6-4a4c-aa66-5dede568310f/call-HelloWorldTask/execution/stderr\n\tstdout: /cromwell-executions/HelloWorld/2686fb3f-d2e6-4a4c-aa66-5dede568310f/call-HelloWorldTask/execution/stdout\n""", + 0, ), ( "tests/workflows/helloWorldFail.wdl", "tests/workflows/helloWorld.json", - "HelloWorld.HelloWorldTask:\tFailed\t Backend Logs Not Found\n", + "Done", + "No logs found for workflow: 2686fb3f-d2e6-4a4c-aa66-5dede568310f with status: ['Done']. Try adding the argument '-s ALL' to list logs with any status.", + 1, ), ], ) @@ -28,7 +32,9 @@ def test_logs( local_cromwell_url: str, wdl: str, json_file: str, + status: str, expected_logs: str, + exit_code: int, ansi_escape, ): test_workflow_id = utility_test_functions.submit_workflow( @@ -44,13 +50,23 @@ def test_logs( # run logs logs_result = utility_test_functions.run_cromshell_command( - command=["logs", test_workflow_id], - exit_code=0, + command=["logs", "-s", status, test_workflow_id], + exit_code=exit_code, ) - print("Print workflow counts results:") + print("Print workflow logs results:") print(logs_result.stdout) + print(logs_result.stderr) + print(logs_result.exception) + + workflow_logs = ( + ansi_escape.sub("", logs_result.stdout) + if exit_code == 0 + else str(logs_result.exception) + ) - workflow_logs = ansi_escape.sub("", logs_result.stdout) + id_updated_expected_logs = utility_test_functions.replace_uuids( + expected_logs, test_workflow_id + ) - assert workflow_logs == expected_logs + assert workflow_logs == id_updated_expected_logs diff --git a/tests/integration/utility_test_functions.py b/tests/integration/utility_test_functions.py index c628f0fe..16d5c2e8 100644 --- a/tests/integration/utility_test_functions.py +++ b/tests/integration/utility_test_functions.py @@ -1,4 +1,5 @@ import json +import re from importlib import reload from pathlib import Path from traceback import print_exception @@ -144,3 +145,22 @@ def run_cromshell_submit( f"\n{print_exception(*result.exc_info)}" ) return result + + +def replace_uuids(input_string: str, replacement_uuid: str): + """ + Replace all UUIDs in a string with a given UUID + :param input_string: the string to replace UUIDs in + :param replacement_uuid: the UUID to replace all UUIDs in the string with + :return: + """ + # Define the pattern to match 128-bit UUIDs + uuid_pattern = r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}" + + # Generate the replacement UUID + new_uuid = str(replacement_uuid) + + # Use regular expressions to find and replace UUIDs in the string + output_string = re.sub(uuid_pattern, new_uuid, input_string) + + return output_string diff --git a/tests/unit/test_io_utils.py b/tests/unit/test_io_utils.py index ce9c86b2..99208f68 100644 --- a/tests/unit/test_io_utils.py +++ b/tests/unit/test_io_utils.py @@ -328,6 +328,64 @@ def test_update_all_workflow_database_tsv( ): assert row[column_to_update] == update_value + @pytest.mark.parametrize( + "file_path, backend, should_fail", + [ + [ + "", + "Local", + False, + ], + [ + "gs://gcs-public-data--genomics/cannabis/README.txt", + "PAPIv2", + False, + ], + ], + ) + def test_cat_file( + self, + file_path: str, + backend: str, + should_fail: bool, + mock_workflow_database_tsv, + ) -> None: + if file_path: + io_utils.cat_file(file_path=file_path, backend=backend) + else: + io_utils.cat_file(file_path=mock_workflow_database_tsv, backend=backend) + + @pytest.mark.parametrize( + "file_path, should_fail", + [ + [ + "gs://gcs-public-data--genomics/cannabis/README.txt", + False, + ], + [ + "README.txt", + True, + ], + [ + "http://", + False, + ], + [ + "https://", + False, + ], + [ + "s3://", + False, + ], + ], + ) + def test_is_path_or_url_like(self, file_path: str, should_fail: bool) -> None: + if should_fail: + assert not io_utils.is_path_or_url_like(in_string=file_path) + else: + assert io_utils.is_path_or_url_like(in_string=file_path) + @pytest.fixture def mock_data_path(self): return Path(__file__).parent.joinpath("mock_data/") diff --git a/tests/unit/test_logs.py b/tests/unit/test_logs.py index ea97baee..2c31c1cb 100644 --- a/tests/unit/test_logs.py +++ b/tests/unit/test_logs.py @@ -1,5 +1,6 @@ import json import os +from pathlib import Path import pytest @@ -12,77 +13,406 @@ class TestLogs: @pytest.mark.parametrize( "test_file, status_keys, expect_logs", [ - ("success.json", ["Failed"], False), - ("success.json", ["Done"], True), - ("will_fail.json", ["Failed"], True), - ("will_fail.json", ["Failed", "Done"], True), - ("will_fail.json", ["RetryableFailure"], False), - ("will_fail.json", ["ALL"], True), + ( + "success.json", + ["Done"], + { + "HelloWorld.HelloWorldTask": [ + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log", + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + }, + ), + ( + "will_fail.json", + ["Failed"], + { + "WillFailTester.FailFastTask": [ + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-FailFastTask/FailFastTask.log", + "executionStatus": "Failed", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + "WillFailTester.PassRunsLong": [], + }, + ), + ( + "will_fail.json", + ["Failed", "Done"], + { + "WillFailTester.FailFastTask": [ + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-FailFastTask/FailFastTask.log", + "executionStatus": "Failed", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + "WillFailTester.PassRunsLong": [ + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-PassRunsLong/PassRunsLong.log", + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + }, + ), + # ("will_fail.json", ["RetryableFailure"], "Exception: No logs found for workflow: 019d7962-4c0c-4651-87ac-b90efff26ff6 with status: ['RetryableFailure']"), + ( + "will_fail.json", + ["ALL"], + { + "WillFailTester.FailFastTask": [ + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-FailFastTask/FailFastTask.log", + "executionStatus": "Failed", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + "WillFailTester.PassRunsLong": [ + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-PassRunsLong/PassRunsLong.log", + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + }, + ), ], ) - def test_workflow_that_is_doomed( + def test_filter_task_logs_from_workflow_metadata( self, test_file, status_keys, expect_logs, mock_data_path ): workflow_metadata_path = os.path.join(mock_data_path, test_file) with open(workflow_metadata_path, "r") as f: workflow_metadata = json.load(f) - logs_output = logs_command.print_workflow_logs( + logs_output = logs_command.filter_task_logs_from_workflow_metadata( workflow_metadata=workflow_metadata, - expand_sub_workflows=True, - indent="", - status_keys=status_keys, - cat_logs=False, + requested_status=status_keys, ) assert logs_output == expect_logs @pytest.mark.parametrize( - "test_file, task, expect_logs", + "test_file, status_keys, expect_logs", [ ( - "local_helloworld_metadata.json", - "HelloWorld.HelloWorldTask", - "Backend Logs Not Available Due to Local Execution", + "success.json", + ["Failed"], + "No logs found for workflow: 261ee81a-b6c4-4547-8373-4c879eb24858 with status: ['Failed']. Try adding the argument '-s ALL' to list logs with any status.", ), ( - "PAPIV2_helloworld_metadata.json", - "HelloWorld.HelloWorldTask", - "gs://broad-dsp-lrma-cromwell-central/HelloWorld/9ee4aa2e-7ac5-4c61-88b2-88a4d10f168b/call-HelloWorldTask/HelloWorldTask.log", + "will_fail.json", + ["RetryableFailure"], + "No logs found for workflow: 019d7962-4c0c-4651-87ac-b90efff26ff6 with status: ['RetryableFailure']. Try adding the argument '-s ALL' to list logs with any status.", ), ], ) - def test_get_backend_logs(self, test_file, task, expect_logs, mock_data_path): + def test_filter_task_logs_from_workflow_metadata_failure( + self, test_file, status_keys, expect_logs, mock_data_path + ): workflow_metadata_path = os.path.join(mock_data_path, test_file) - with open(workflow_metadata_path, "r") as f: workflow_metadata = json.load(f) - shard_list = workflow_metadata["calls"][task] + with pytest.raises(Exception) as e: + logs_command.filter_task_logs_from_workflow_metadata( + workflow_metadata=workflow_metadata, + requested_status=status_keys, + ) + + assert str(e.value) == expect_logs + + @pytest.mark.parametrize( + "all_task_log_metadata, expect_logs", + [ + ( + { + "HelloWorld.HelloWorldTask": [ + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log", + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + }, + "HelloWorld.HelloWorldTask:\n\tstatus: Done\n\tbackendLogs: gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log\n", + ) + ], + ) + def test_print_task_level_logs(self, all_task_log_metadata, expect_logs, capsys): + logs_command.print_task_level_logs( + all_task_log_metadata=all_task_log_metadata, + cat_logs=False, + ) + captured = capsys.readouterr() + assert captured.out == expect_logs + + @pytest.mark.parametrize( + "task_log_metadata, expect_logs", + [ + ( + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log", + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + }, + "\tstatus: Done\n\tbackendLogs: gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log\n", + ) + ], + ) + def test_print_file_like_value_in_dict( + self, task_log_metadata, expect_logs, capsys + ): + logs_command.print_file_like_value_in_dict( + task_log_metadata=task_log_metadata, + indent=1, + cat_logs=False, + ) + captured = capsys.readouterr() + assert captured.out == expect_logs + + @pytest.mark.parametrize( + "output_name, output_value, indent, expect_logs", + [ + ( + "bla", + "/bla/bla.txt", + 0, + "bla: /bla/bla.txt\n", + ), + ( # Test when output is string but not file like + "bla", + "not a file", + 0, + "", + ), + ( # Test when output is a float + "bla", + 0.0, + 0, + "", + ), + ], + ) + def test_print_output_name_and_file( + self, + output_name, + output_value, + indent, + expect_logs, + capsys, + ): + logs_command.print_output_name_and_file( + output_name=output_name, + output_value=output_value, + indent=indent, + txt_color=None, + ) + captured = capsys.readouterr() + assert captured.out == expect_logs + + @pytest.mark.parametrize( + "output_name, output_value", + [ + ( + "fileName", + "success.json", + ), + ], + ) + def test_print_log_file_content( + self, output_name, output_value, mock_data_path, capsys + ): + abs_output_value = Path(mock_data_path).joinpath(output_value) + + with open(abs_output_value, "r") as f: + file_content = f.read() + + logs_command.print_log_file_content( + output_name=output_name, + output_value=str(abs_output_value), + txt_color=None, + backend="Local", + ) + captured = capsys.readouterr() + assert file_content in captured.out + + @pytest.mark.parametrize( + "workflow_logs, workflow_id, requested_status", + [ + ( + { + "HelloWorld.HelloWorldTask": [ + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log", + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + }, + "261ee81a-b6c4-4547-8373-4c879eb24858", + "Done", + ), + ( + { + "HelloWorld.HelloWorldTask": [ + { + "attempt": 1, + "backend": "TES", + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + }, + "261ee81a-b6c4-4547-8373-4c879eb24858", + "Done", + ), + ], + ) + def test_check_for_empty_logs( + self, workflow_logs: dict, workflow_id: str, requested_status + ): + logs_command.check_for_empty_logs( + workflow_logs=workflow_logs, + workflow_id=workflow_id, + requested_status=requested_status, + ) + + @pytest.mark.parametrize( + "workflow_logs, workflow_id, requested_status", + [ + ( + { + "HelloWorld.HelloWorldTask": [ + { + "attempt": 1, + "backend": None, + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + }, + "261ee81a-b6c4-4547-8373-4c879eb24858", + "Done", + ), + ({}, "261ee81a-b6c4-4547-8373-4c879eb24858", "Done"), + ], + ) + def test_check_for_empty_logs_failure( + self, workflow_logs: dict, workflow_id: str, requested_status + ): + if workflow_logs: + expected_error = ( + f"No logs found for workflow: {workflow_id} with status: " + f"{requested_status}. Try adding the argument '-s ALL' to " + f"list logs with any status." + ) + else: + expected_error = f"No calls found for workflow: {workflow_id}" + + with pytest.raises(Exception) as e: + logs_command.check_for_empty_logs( + workflow_logs=workflow_logs, + workflow_id=workflow_id, + requested_status=requested_status, + ) - assert logs_command.get_backend_logs(task_instance=shard_list[0]) == expect_logs + assert str(e.value) == expected_error @pytest.mark.parametrize( - "metadata_json", + "task_instance ", [ - { - "backend": "Local", - "calls": {}, - "failures": [{"message": "Runtime validation failed"}], - }, - { - "backend": "Local", - "calls": {"blah": "blah"}, - "failures": [{"message": "Runtime validation failed"}], - }, + ( + { + "attempt": 1, + "backend": "PAPI_V2", + "backendLogs": { + "log": "gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log" + }, + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ), + ( + { + "attempt": 1, + "backend": "PAPI_V2", + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ), + ( + { + "attempt": 1, + "backend": "Local", + "backendLogs": { + "log": "gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log" + }, + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ), ], ) - def test_check_workflow_for_calls(self, metadata_json): - if not metadata_json.get("calls"): - with pytest.raises(KeyError): - logs_command.check_workflow_for_calls(metadata_json) + def test_get_backend_logs(self, task_instance: dict): + backend_log = logs_command.get_backend_logs(task_instance=task_instance) + if task_instance["backend"] == "Local": + assert backend_log == "Backend Logs Not Available Due to Local Execution" + elif task_instance["backend"] != "Local" and "backendLogs" not in task_instance: + assert backend_log == "Backend Logs Not Found" else: - logs_command.check_workflow_for_calls(metadata_json) + assert backend_log == task_instance["backendLogs"]["log"] @pytest.fixture def mock_data_path(self):