From f2a32db630d592bd07ea50494f9808c29784079d Mon Sep 17 00:00:00 2001 From: Josh McVey Date: Tue, 8 Oct 2024 09:55:45 -0500 Subject: [PATCH] feat(abt): analyze protocols faster --- .../citools/generate_analyses.py | 304 ++++++------------ 1 file changed, 96 insertions(+), 208 deletions(-) diff --git a/analyses-snapshot-testing/citools/generate_analyses.py b/analyses-snapshot-testing/citools/generate_analyses.py index f1335b102ae..a6ce27c399d 100644 --- a/analyses-snapshot-testing/citools/generate_analyses.py +++ b/analyses-snapshot-testing/citools/generate_analyses.py @@ -1,13 +1,13 @@ +import concurrent import json import os -import signal import time -from contextlib import contextmanager +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import datetime, timezone from enum import Enum, auto from pathlib import Path -from typing import Any, Dict, Generator, List, Optional +from typing import Any, Dict, List, Optional import docker # type: ignore from automation.data.protocol import Protocol @@ -15,40 +15,27 @@ from rich.traceback import install install(show_locals=True) -IMAGE = "opentrons-analysis" -CONTAINER_LABWARE = "/var/lib/ot" -HOST_LABWARE = Path(Path(__file__).parent.parent, "files", "labware") -HOST_PROTOCOLS_ROOT = Path(Path(__file__).parent.parent, "files", "protocols") -CONTAINER_PROTOCOLS_ROOT = "/var/lib/ot/protocols" -CONTAINER_RESULTS = "/var/lib/ot/analysis_results" -HOST_RESULTS = Path(Path(__file__).parent.parent, "analysis_results") -ANALYSIS_SUFFIX = "analysis.json" +IMAGE: str = "opentrons-analysis" +CONTAINER_LABWARE: str = "/var/lib/ot" +HOST_LABWARE: Path = Path(Path(__file__).parent.parent, "files", "labware") +HOST_PROTOCOLS_ROOT: Path = Path(Path(__file__).parent.parent, "files", "protocols") +CONTAINER_PROTOCOLS_ROOT: str = "/var/lib/ot/protocols" +CONTAINER_RESULTS: str = "/var/lib/ot/analysis_results" +HOST_RESULTS: Path = Path(Path(__file__).parent.parent, "analysis_results") +ANALYSIS_SUFFIX: str = "analysis.json" +ANALYSIS_TIMEOUT_SECONDS: int = 30 +ANALYSIS_CONTAINER_INSTANCES: int = 6 console = Console() -@contextmanager -def timeout(seconds: int) -> Generator[None, None, None]: - # Signal handler function - def raise_timeout(signum, frame) -> None: # type: ignore[no-untyped-def] - raise TimeoutError - - # Set the signal handler for the alarm signal - signal.signal(signal.SIGALRM, raise_timeout) - signal.alarm(seconds) # Set the alarm - try: - yield - finally: - signal.alarm(0) # Disable the alarm - - class ProtocolType(Enum): PROTOCOL_DESIGNER = auto() PYTHON = auto() @dataclass -class AnalyzedProtocol: +class TargetProtocol: host_protocol_file: Path container_protocol_file: Path host_analysis_file: Path @@ -111,7 +98,7 @@ def set_analysis_execution_time(self, analysis_execution_time: float) -> None: self.analysis_execution_time = analysis_execution_time -def stop_and_restart_container(image_name: str, timeout: int = 60) -> docker.models.containers.Container: +def start_containers(image_name: str, num_containers: int, timeout: int = 60) -> List[docker.models.containers.Container]: client = docker.from_env() volumes = { str(HOST_LABWARE): {"bind": CONTAINER_LABWARE, "mode": "rw"}, @@ -119,64 +106,55 @@ def stop_and_restart_container(image_name: str, timeout: int = 60) -> docker.mod str(HOST_PROTOCOLS_ROOT): {"bind": CONTAINER_PROTOCOLS_ROOT, "mode": "rw"}, } - # Find the running container using the specified image - containers = client.containers.list(filters={"ancestor": image_name, "status": "running"}) - + # Stop and remove existing containers + containers: List[docker.models.containers.Container] = client.containers.list(filters={"ancestor": image_name}) if containers: - console.print("Stopping the running container(s)...") + console.print("Stopping and removing existing container(s)...") for container in containers: container.stop(timeout=10) + container.remove() - # Start a new container with the specified volume - console.print("Starting a new container.") - container = client.containers.run(image_name, detach=True, volumes=volumes) + # Start new containers + console.print(f"Starting {num_containers} new container(s).") + containers = [] + for _ in range(num_containers): + container = client.containers.run(image_name, detach=True, volumes=volumes) + containers.append(container) - # Wait for the container to be ready if a readiness command is provided + # Wait for containers to be ready start_time = time.time() while time.time() - start_time < timeout: - exit_code, output = container.exec_run(f"ls -al {CONTAINER_LABWARE}") - if exit_code == 0: - console.print("Container is ready.") + all_ready = True + for container in containers: + exit_code, _ = container.exec_run(f"ls -al {CONTAINER_LABWARE}") + if exit_code != 0: + all_ready = False + break + if all_ready: + console.print("All containers are ready.") break else: - console.print("Waiting for container to be ready...") - time.sleep(5) + console.print("Waiting for containers to be ready...") + time.sleep(5) else: - console.print("Timeout waiting for container to be ready. Proceeding anyway.") - return container + console.print("Timeout waiting for containers to be ready. Proceeding anyway.") + return containers def stop_and_remove_containers(image_name: str) -> None: client = docker.from_env() - - # Find all containers created from the specified image containers = client.containers.list(all=True, filters={"ancestor": image_name}) - for container in containers: try: - # Stop the container if it's running if container.status == "running": console.print(f"Stopping container {container.short_id}...") container.stop(timeout=10) - - # Remove the container console.print(f"Removing container {container.short_id}...") container.remove() - except docker.errors.ContainerError as e: + except Exception as e: console.print(f"Error stopping/removing container {container.short_id}: {e}") -def has_designer_application(json_file_path: Path) -> bool: - try: - with open(json_file_path, "r", encoding="utf-8") as file: - data = json.load(file) - return "designerApplication" in data - except json.JSONDecodeError: - # Handle the exception if the file is not a valid JSON - console.print(f"Invalid JSON file: {json_file_path}") - return False - - def host_analysis_path(protocol_file: Path, tag: str) -> Path: return Path(HOST_RESULTS, f"{protocol_file.stem}_{tag}_{ANALYSIS_SUFFIX}") @@ -185,79 +163,6 @@ def container_analysis_path(protocol_file: Path, tag: str) -> Path: return Path(CONTAINER_RESULTS, f"{protocol_file.stem}_{tag}_{ANALYSIS_SUFFIX}") -def generate_protocols(tag: str) -> List[AnalyzedProtocol]: - - # Since we do not have the specification for which labware to use - # we will use all labware in the host labware directory - all_custom_labware_paths = [str(host_path.relative_to(CONTAINER_LABWARE)) for host_path in list(Path(HOST_LABWARE).rglob("*.json"))] - - def find_pd_protocols() -> List[AnalyzedProtocol]: - # Check if the provided path is a valid directory - if not HOST_PROTOCOLS_ROOT.is_dir(): - raise NotADirectoryError(f"The path {HOST_PROTOCOLS_ROOT} is not a valid directory.") - - nonlocal all_custom_labware_paths - - # Recursively find all .json files - json_files = list(HOST_PROTOCOLS_ROOT.rglob("*.json")) - filtered_json_files = [file for file in json_files if has_designer_application(file)] - pd_protocols: List[AnalyzedProtocol] = [] - for path in filtered_json_files: - relative_path = path.relative_to(HOST_PROTOCOLS_ROOT) - updated_path = Path(CONTAINER_PROTOCOLS_ROOT, relative_path) - pd_protocols.append( - AnalyzedProtocol( - host_protocol_file=path, - container_protocol_file=updated_path, - host_analysis_file=host_analysis_path(path, tag), - container_analysis_file=container_analysis_path(path, tag), - tag=tag, - custom_labware_paths=all_custom_labware_paths, - ) - ) - return pd_protocols - - def find_python_protocols() -> List[AnalyzedProtocol]: - # Check if the provided path is a valid directory - if not HOST_PROTOCOLS_ROOT.is_dir(): - raise NotADirectoryError(f"The path {HOST_PROTOCOLS_ROOT} is not a valid directory.") - - nonlocal all_custom_labware_paths - - # Recursively find all .py files - python_files = list(HOST_PROTOCOLS_ROOT.rglob("*.py")) - py_protocols: List[AnalyzedProtocol] = [] - - for path in python_files: - relative_path = path.relative_to(HOST_PROTOCOLS_ROOT) - container_path = Path(CONTAINER_PROTOCOLS_ROOT, relative_path) - py_protocols.append( - AnalyzedProtocol( - host_protocol_file=path, - container_protocol_file=container_path, - host_analysis_file=host_analysis_path(path, tag), - container_analysis_file=container_analysis_path(path, tag), - tag=tag, - custom_labware_paths=all_custom_labware_paths, - ) - ) - return py_protocols - - return find_pd_protocols() + find_python_protocols() - - -def remove_all_files_in_directory(directory: Path) -> None: - for filename in os.listdir(directory): - file_path = os.path.join(directory, filename) - try: - if os.path.isfile(file_path) or os.path.islink(file_path): - os.unlink(file_path) - elif os.path.isdir(file_path): - pass # Currently, subdirectories are not removed - except Exception as e: - console.print(f"Failed to delete {file_path}. Reason: {e}") - - def protocol_custom_labware_paths_in_container(protocol: Protocol) -> List[str]: if not HOST_LABWARE.is_dir() or protocol.custom_labware is None: return [] @@ -269,98 +174,81 @@ def protocol_custom_labware_paths_in_container(protocol: Protocol) -> List[str]: ] -def analyze(protocol: AnalyzedProtocol, container: docker.models.containers.Container) -> bool: - # Run the analyze command - command = f"python -I -m opentrons.cli analyze --json-output {protocol.container_analysis_file} {protocol.container_protocol_file} {' '.join(protocol.custom_labware_paths)}" # noqa: E501 +def analyze(protocol: TargetProtocol, container: docker.models.containers.Container) -> bool: + command = ( + f"python -I -m opentrons.cli analyze --json-output {protocol.container_analysis_file} " + f"{protocol.container_protocol_file} {' '.join(protocol.custom_labware_paths)}" + ) start_time = time.time() - timeout_duration = 30 # seconds + result = None + exit_code = None try: - with timeout(timeout_duration): - command_result = container.exec_run(cmd=command) - exit_code = command_result.exit_code - result = command_result.output - protocol.command_output = result.decode("utf-8") - protocol.command_exit_code = exit_code - protocol.set_analysis() - protocol.set_analysis_execution_time(time.time() - start_time) - return True - except TimeoutError: - console.print(f"Command execution exceeded {timeout_duration} seconds and was aborted.") - logs = container.logs() - # Decode and print the logs - console.print(f"container logs{logs.decode('utf-8')}") - except KeyboardInterrupt: - console.print("Execution was interrupted by the user.") - raise + command_result = container.exec_run(cmd=command) + exit_code = command_result.exit_code + result = command_result.output + protocol.command_output = result.decode("utf-8") if result else "" + protocol.command_exit_code = exit_code + protocol.set_analysis() + return True except Exception as e: console.print(f"An unexpected error occurred: {e}") - protocol.command_output = result.decode("utf-8") - console.print(f"Command output: {protocol.command_output}") - protocol.command_exit_code = exit_code - console.print(f"Exit code: {protocol.command_exit_code}") + protocol.command_output = result.decode("utf-8") if result else str(e) + protocol.command_exit_code = exit_code if exit_code is not None else -1 protocol.set_analysis() return False - protocol.command_output = None - protocol.command_exit_code = None - protocol.analysis = None - protocol.set_analysis_execution_time(time.time() - start_time) - return False - + finally: + protocol.set_analysis_execution_time(time.time() - start_time) + console.print(f"Analysis of {protocol.host_protocol_file.name} completed in {protocol.analysis_execution_time:.2f} seconds.") + + +def analyze_many(protocol_files: List[TargetProtocol], containers: List[docker.models.containers.Container]) -> None: + num_containers = len(containers) + with ThreadPoolExecutor(max_workers=num_containers) as executor: + futures = [] + for i, protocol in enumerate(protocol_files): + container = containers[i % num_containers] + future = executor.submit(analyze, protocol, container) + futures.append((future, protocol)) + for future, protocol in futures: + try: + future.result(timeout=ANALYSIS_TIMEOUT_SECONDS) + except concurrent.futures.TimeoutError: + console.print(f"Analysis of {protocol.host_protocol_file} exceeded {ANALYSIS_TIMEOUT_SECONDS} seconds and was aborted.") + # Handle timeout (e.g., mark as failed) + except Exception as e: + console.print(f"An error occurred during analysis: {e}") -def analyze_many(protocol_files: List[AnalyzedProtocol], container: docker.models.containers.Container) -> None: - for file in protocol_files: - analyze(file, container) accumulated_time = sum(protocol.analysis_execution_time for protocol in protocol_files if protocol.analysis_execution_time is not None) console.print(f"{len(protocol_files)} protocols with total analysis time of {accumulated_time:.2f} seconds.\n") -def analyze_against_image(tag: str) -> List[AnalyzedProtocol]: +def analyze_against_image(tag: str, protocols: List[TargetProtocol], num_containers: int = 1) -> List[TargetProtocol]: image_name = f"{IMAGE}:{tag}" - protocols = generate_protocols(tag) - protocols_to_process = protocols - # protocols_to_process = protocols[:1] # For testing try: - console.print(f"Analyzing {len(protocols_to_process)} protocol(s) against {image_name}...") - container = stop_and_restart_container(image_name) - analyze_many(protocols_to_process, container) + console.print(f"\nAnalyzing {len(protocols)} protocol(s) against {image_name} using {num_containers} container(s)...") + containers = start_containers(image_name, num_containers) + analyze_many(protocols, containers) finally: stop_and_remove_containers(image_name) - return protocols_to_process + return protocols def generate_analyses_from_test(tag: str, protocols: List[Protocol]) -> None: """Generate analyses from the tests.""" - try: - image_name = f"{IMAGE}:{tag}" - protocols_to_process: List[AnalyzedProtocol] = [] - # convert the protocols to AnalyzedProtocol - for test_protocol in protocols: - host_protocol_file = Path(test_protocol.file_path) - container_protocol_file = Path(CONTAINER_PROTOCOLS_ROOT, host_protocol_file.relative_to(HOST_PROTOCOLS_ROOT)) - host_analysis_file = host_analysis_path(host_protocol_file, tag) - container_analysis_file = container_analysis_path(host_protocol_file, tag) - protocols_to_process.append( - AnalyzedProtocol( - host_protocol_file, - container_protocol_file, - host_analysis_file, - container_analysis_file, - tag, - protocol_custom_labware_paths_in_container(test_protocol), - ) + protocols_to_process: List[TargetProtocol] = [] + for test_protocol in protocols: + host_protocol_file = Path(test_protocol.file_path) + container_protocol_file = Path(CONTAINER_PROTOCOLS_ROOT, host_protocol_file.relative_to(HOST_PROTOCOLS_ROOT)) + host_analysis_file = host_analysis_path(host_protocol_file, tag) + container_analysis_file = container_analysis_path(host_protocol_file, tag) + protocols_to_process.append( + TargetProtocol( + host_protocol_file, + container_protocol_file, + host_analysis_file, + container_analysis_file, + tag, + protocol_custom_labware_paths_in_container(test_protocol), ) - console.print(f"Analyzing {len(protocols_to_process)} protocol(s) against {tag}...") - container = stop_and_restart_container(image_name) - # Analyze the protocols - for protocol_to_analyze in protocols_to_process: - console.print(f"Analyzing {protocol_to_analyze.host_protocol_file}...") - analyzed = analyze(protocol_to_analyze, container) - if not analyzed: # Fail fast - console.print("Analysis failed. Exiting.") - stop_and_remove_containers(image_name) - accumulated_time = sum( - protocol.analysis_execution_time for protocol in protocols_to_process if protocol.analysis_execution_time is not None ) - console.print(f"{len(protocols_to_process)} protocols with total analysis time of {accumulated_time:.2f} seconds.\n") - finally: - stop_and_remove_containers(image_name) + analyze_against_image(tag, protocols_to_process, ANALYSIS_CONTAINER_INSTANCES)