diff --git a/packages/syft-runtimes/notebooks/enclave/enclave.ipynb b/packages/syft-runtimes/notebooks/enclave/enclave.ipynb new file mode 100644 index 00000000..21c26798 --- /dev/null +++ b/packages/syft-runtimes/notebooks/enclave/enclave.ipynb @@ -0,0 +1,10 @@ +{ + "cells": [], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/packages/syft-runtimes/notebooks/python/python.ipynb b/packages/syft-runtimes/notebooks/python/python.ipynb new file mode 100644 index 00000000..b4ae50e0 --- /dev/null +++ b/packages/syft-runtimes/notebooks/python/python.ipynb @@ -0,0 +1,408 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "0", + "metadata": {}, + "outputs": [], + "source": [ + "from syft_rds.orchestra import setup_rds_server, remove_rds_stack_dir" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1", + "metadata": {}, + "outputs": [], + "source": [ + "remove_rds_stack_dir(key=\"rds_stack\", root_dir=\"./\")" + ] + }, + { + "cell_type": "markdown", + "id": "2", + "metadata": {}, + "source": [ + "## DO1 Uploads Dataset" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3", + "metadata": {}, + "outputs": [], + "source": [ + "do_stack_1 = setup_rds_server(email=\"do1@openmined.org\", key=\"rds_stack\", root_dir=\"./\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4", + "metadata": {}, + "outputs": [], + "source": [ + "do_client_1 = do_stack_1.init_session(host=\"do1@openmined.org\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5", + "metadata": {}, + "outputs": [], + "source": [ + "from utils import generate_crop_data, create_readme\n", + "\n", + "DATASET_FILE_NAME = \"crop_stock_data.csv\"\n", + "DATASET_PATH = \"datasets/part_2\"\n", + "DATASET_PRIVATE_PATH = f\"{DATASET_PATH}/private\"\n", + "DATASET_MOCK_PATH = f\"{DATASET_PATH}/mock\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6", + "metadata": {}, + "outputs": [], + "source": [ + "# Private Data\n", + "generate_crop_data(\n", + " num_rows=10, output_path=f\"{DATASET_PRIVATE_PATH}/{DATASET_FILE_NAME}\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7", + "metadata": {}, + "outputs": [], + "source": [ + "# Mock Data\n", + "generate_crop_data(num_rows=10, output_path=f\"{DATASET_MOCK_PATH}/{DATASET_FILE_NAME}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [], + "source": [ + "README_PATH = f\"{DATASET_PATH}/README.md\"\n", + "create_readme(output_path=README_PATH)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9", + "metadata": {}, + "outputs": [], + "source": [ + "from syft_notebook_ui import show_dir\n", + "from pathlib import Path\n", + "\n", + "# Now we can see the dataset is in our sync folder\n", + "show_dir(Path(\"datasets/part_2\"))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10", + "metadata": {}, + "outputs": [], + "source": [ + "try:\n", + " dataset = do_client_1.dataset.create(\n", + " name=\"Organic Crop Stock Data\",\n", + " summary=\"This dataset contains information about organic crop stock.\",\n", + " readme_path=README_PATH,\n", + " private_path=DATASET_PRIVATE_PATH,\n", + " mock_path=DATASET_MOCK_PATH,\n", + " )\n", + " dataset.describe()\n", + "except Exception as e:\n", + " print(f\"Error: {e}\")" + ] + }, + { + "cell_type": "markdown", + "id": "11", + "metadata": {}, + "source": [ + "## DO2 Upload dataset" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "12", + "metadata": {}, + "outputs": [], + "source": [ + "do_stack_2 = setup_rds_server(email=\"do2@openmined.org\", key=\"rds_stack\", root_dir=\".\")\n", + "\n", + "do_client_2 = do_stack_2.init_session(host=\"do2@openmined.org\")\n", + "\n", + "DATASET_FILE_NAME = \"crop_stock_data.csv\"\n", + "DATASET_PATH = \"datasets/part_1\"\n", + "DATASET_PRIVATE_PATH = f\"{DATASET_PATH}/private\"\n", + "DATASET_MOCK_PATH = f\"{DATASET_PATH}/mock\"\n", + "\n", + "# Private Data\n", + "generate_crop_data(\n", + " num_rows=10, output_path=f\"{DATASET_PRIVATE_PATH}/{DATASET_FILE_NAME}\"\n", + ")\n", + "\n", + "# Mock Data\n", + "generate_crop_data(num_rows=10, output_path=f\"{DATASET_MOCK_PATH}/{DATASET_FILE_NAME}\")\n", + "\n", + "try:\n", + " dataset = do_client_2.dataset.create(\n", + " name=\"Organic Crop Stock Data\",\n", + " summary=\"This dataset contains information about organic crop stock.\",\n", + " readme_path=README_PATH,\n", + " private_path=DATASET_PRIVATE_PATH,\n", + " mock_path=DATASET_MOCK_PATH,\n", + " # auto_approval = [\"ds@openmined.org\"]\n", + " )\n", + " dataset.describe()\n", + "except Exception as e:\n", + " print(f\"Error: {e}\")" + ] + }, + { + "cell_type": "markdown", + "id": "13", + "metadata": {}, + "source": [ + "## DS submits jobs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "14", + "metadata": {}, + "outputs": [], + "source": [ + "ds_stack = setup_rds_server(email=\"ds@openmined.org\", key=\"rds_stack\", root_dir=\".\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "15", + "metadata": {}, + "outputs": [], + "source": [ + "DO1 = \"do1@openmined.org\"\n", + "DO2 = \"do2@openmined.org\"\n", + "\n", + "do_client_1 = ds_stack.init_session(host=DO1)\n", + "print(\"Logged into: \", do_client_1.host)\n", + "\n", + "do_client_2 = ds_stack.init_session(host=DO2)\n", + "print(\"Logged into: \", do_client_2.host)\n", + "\n", + "assert not do_client_1.is_admin\n", + "assert not do_client_2.is_admin" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "16", + "metadata": {}, + "outputs": [], + "source": [ + "DATASET_NAME = \"Organic Crop Stock Data\"\n", + "\n", + "dataset1 = do_client_1.dataset.get(name=DATASET_NAME)\n", + "dataset1.describe()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "17", + "metadata": {}, + "outputs": [], + "source": [ + "dataset2 = do_client_2.dataset.get(name=DATASET_NAME)\n", + "dataset2.describe()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "18", + "metadata": {}, + "outputs": [], + "source": [ + "do_client_1.runtime.get_all()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "19", + "metadata": {}, + "outputs": [], + "source": [ + "CODE_DIR = \"./code\"\n", + "Path(CODE_DIR).mkdir(parents=True, exist_ok=True)\n", + "\n", + "script_content = \"\"\"import os\n", + "from pathlib import Path\n", + "from sys import exit\n", + "\n", + "import pandas as pd\n", + "\n", + "DATA_DIR = os.environ[\"DATA_DIR\"]\n", + "OUTPUT_DIR = os.environ[\"OUTPUT_DIR\"]\n", + "\n", + "dataset_paths = [ Path(dataset_path) for dataset_path in DATA_DIR.split(\",\")]\n", + "total_carrots = 0\n", + "total_tomatoes = 0\n", + "\n", + "for dataset_path in dataset_paths:\n", + " if not dataset_path.exists():\n", + " print(\"Warning: Dataset path does not exist:\", dataset_path)\n", + " exit(1)\n", + " df = pd.read_csv(dataset_path / \"crop_stock_data.csv\")\n", + " total_carrots += df[df[\"Product name\"] == \"Carrots\"][\"Quantity\"].sum()\n", + " total_tomatoes += df[df[\"Product name\"] == \"Tomatoes\"][\"Quantity\"].sum()\n", + "\n", + "with open(os.path.join(OUTPUT_DIR, \"output.txt\"), \"w\") as f:\n", + " f.write(f\"Total Carrots: {total_carrots}\\n\")\n", + " f.write(f\"Total Tomatoes: {total_tomatoes}\\n\")\n", + "\"\"\"\n", + "\n", + "# Write the script to the entrypoint file\n", + "entrypoint_path = Path(CODE_DIR) / \"entrypoint.py\"\n", + "with open(entrypoint_path, \"w\") as f:\n", + " f.write(script_content)\n", + "\n", + "print(f\"Successfully wrote entrypoint script to: {entrypoint_path}\")" + ] + }, + { + "cell_type": "markdown", + "id": "20", + "metadata": {}, + "source": [ + "## New runtimes interface (folder-based)\n", + "\n", + "Let's interact with it from the DOs point of view for now, since the runtimes stay in the `private` folder (`SyftBox/private//syft_runtimes`)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "21", + "metadata": {}, + "outputs": [], + "source": [ + "from syft_runtimes import PythonRuntime, JobConfig, DEFAULT_RUNTIME\n", + "\n", + "python_runtime = PythonRuntime(\n", + " syftbox_client=do_stack_1.client, runtime_name=DEFAULT_RUNTIME\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "22", + "metadata": {}, + "outputs": [], + "source": [ + "python_runtime.init_runtime_dir()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "23", + "metadata": {}, + "outputs": [], + "source": [ + "job_config = JobConfig(\n", + " code_path=\"./code\",\n", + " entrypoint=\"entrypoint.py\",\n", + ")\n", + "\n", + "job_config.model_dump()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "24", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "25", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "26", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "27", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "28", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/packages/syft-runtimes/notebooks/python/utils.py b/packages/syft-runtimes/notebooks/python/utils.py new file mode 100644 index 00000000..935441d7 --- /dev/null +++ b/packages/syft-runtimes/notebooks/python/utils.py @@ -0,0 +1,103 @@ +import csv +import random +import string +from pathlib import Path + + +def generate_crop_data(num_rows: int, output_path: str): + """ + Generates a CSV file with mock organic crop stock data. + Each crop appears only once with a unique fixed ID. + All units are in 'kgs'. + + Args: + num_rows (int): Number of unique crop rows to generate (max limited to number of crops). + output_path (str): Path to save the CSV file. + """ + crops = [ + "Carrots", + "Spinach", + "Kale", + "Tomatoes", + "Zucchini", + "Potatoes", + "Onions", + "Beets", + "Radishes", + "Garlic", + "Ginger", + "Cabbage", + "Cauliflower", + "Broccoli", + "Peas", + ] + + # Limit rows to number of available crops + if num_rows > len(crops): + raise ValueError( + f"Can only generate up to {len(crops)} unique crops, got {num_rows}." + ) + + unit = "kgs" + + # Assign fixed unique IDs to each crop + crop_id_map = { + crop: "U" + "".join(random.choices(string.digits, k=4)) for crop in crops + } + + # Select a non-repeating subset of crops + selected_crops = random.sample(crops, num_rows) + + # Ensure output directory exists + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + + with open(output_path, mode="w", newline="") as file: + writer = csv.writer(file) + writer.writerow(["ID", "Product name", "Quantity", "Price ($)", "Unit"]) + + for crop in selected_crops: + crop_id = crop_id_map[crop] + quantity = random.randint(1, 500) + price = f"{round(random.uniform(1.0, 10.0) * quantity, 2)}" + writer.writerow([crop_id, crop, quantity, price, unit]) + + print(f"Crop data with {num_rows} unique crops saved to {output_path}") + + +def create_readme(output_path: str): + """ + Creates a short README.md file describing the organic crop stock dataset. + + Args: + output_path (str): The path where the README.md file should be created. + """ + readme_content = """# Organic Crop Stock Dataset + +This dataset contains data representing organic crop inventory for various products. + +### Schema + +| Column | Description | +|--------------|------------------------------------| +| ID | Unique ID for each crop | +| Product name | Name of the organic crop | +| Quantity | Quantity in stock | +| Price | Total price for the given quantity | +| Unit | Unit of measurement (always 'kgs') | + +### Notes + +- Each crop has a unique and consistent ID. +- All units are in **kilograms (kgs)**. + +""" + + # Ensure the directory exists + readme_path = Path(output_path) + readme_path.parent.mkdir(parents=True, exist_ok=True) + + # Write README.md + with open(readme_path, "w") as f: + f.write(readme_content) + + print(f"README written to {readme_path}") diff --git a/packages/syft-runtimes/src/syft_runtimes/__init__.py b/packages/syft-runtimes/src/syft_runtimes/__init__.py index 65f76e25..5c645501 100644 --- a/packages/syft-runtimes/src/syft_runtimes/__init__.py +++ b/packages/syft-runtimes/src/syft_runtimes/__init__.py @@ -1,8 +1,9 @@ -from syft_runtimes.runners import ( +from syft_runtimes.runtimes import ( DockerRunner, PythonRunner, FolderBasedRuntime, HighLowRuntime, + PythonRuntime, get_runner_cls, ) from syft_runtimes.output_handler import ( @@ -11,12 +12,15 @@ TextUI, ) from syft_runtimes.models import JobConfig, JobStatusUpdate, JobErrorKind, JobStatus +from syft_runtimes.consts import DEFAULT_RUNTIME + __all__ = [ "DockerRunner", "PythonRunner", "FolderBasedRuntime", "HighLowRuntime", + "PythonRuntime", "FileOutputHandler", "RichConsoleUI", "TextUI", @@ -25,4 +29,5 @@ "JobStatusUpdate", "JobErrorKind", "JobStatus", + "DEFAULT_RUNTIME", ] diff --git a/packages/syft-runtimes/src/syft_runtimes/consts.py b/packages/syft-runtimes/src/syft_runtimes/consts.py new file mode 100644 index 00000000..a4872d2a --- /dev/null +++ b/packages/syft-runtimes/src/syft_runtimes/consts.py @@ -0,0 +1 @@ +DEFAULT_RUNTIME = "default_python_runtime" diff --git a/packages/syft-runtimes/src/syft_runtimes/high_low/setup.py b/packages/syft-runtimes/src/syft_runtimes/high_low/setup.py index 354bfaca..a9e62a75 100644 --- a/packages/syft-runtimes/src/syft_runtimes/high_low/setup.py +++ b/packages/syft-runtimes/src/syft_runtimes/high_low/setup.py @@ -20,7 +20,7 @@ ) from syft_runtimes.high_low.consts import DEFAULT_HIGH_SIDE_DATA_DIR from syft_runtimes.models import BaseRuntimeConfig -from syft_runtimes.runners import HighLowRuntime +from syft_runtimes.runtimes import HighLowRuntime class HighLowRuntimeConfig(BaseRuntimeConfig): diff --git a/packages/syft-runtimes/src/syft_runtimes/models.py b/packages/syft-runtimes/src/syft_runtimes/models.py index c0e1af6f..c3eff281 100644 --- a/packages/syft-runtimes/src/syft_runtimes/models.py +++ b/packages/syft-runtimes/src/syft_runtimes/models.py @@ -4,8 +4,8 @@ import hashlib from typing import Any, TypeAlias, Union, Literal, Optional import os +from uuid import UUID, uuid4 from IPython.display import HTML, display -from datetime import datetime from loguru import logger from pydantic import ( @@ -16,6 +16,7 @@ ) from syft_notebook_ui.pydantic_html_repr import create_html_repr +from syft_runtimes.consts import DEFAULT_RUNTIME import yaml PathLike: TypeAlias = Union[str, os.PathLike, Path] @@ -200,56 +201,49 @@ class RuntimeUpdate(BaseModel): class JobConfig(BaseModel): - """Configuration for a job run""" - - function_folder: Path - args: list[str] - data_path: Path - runtime: Runtime - job_folder: Optional[Path] = Field( - default_factory=lambda: Path("jobs") / datetime.now().strftime("%Y%m%d_%H%M%S") - ) - timeout: int = 60 - data_mount_dir: str = "/app/data" - extra_env: dict[str, str] = {} - blocking: bool = Field(default=True) + # Job identification + job_id: UUID = Field(default_factory=uuid4) - @property - def job_path(self) -> Path: - """Derived path for job folder""" - return Path(self.job_folder) - - @property - def logs_dir(self) -> Path: - """Derived path for logs directory""" - return self.job_path / "logs" + # Core execution parameters + dataset_name: str | None = None # Name of the dataset to use + code_path: PathLike # Path to the code file/folder to execute + entrypoint: str | None = None + runtime_name: str = DEFAULT_RUNTIME # Which runtime to use - @property - def output_dir(self) -> Path: - """Derived path for output directory""" - return self.job_path / "output" + # Execution parameters + timeout: int = 300 # Timeout in seconds (5 minutes default) + args: list[str] = Field(default_factory=list) # Command line arguments + env: dict[str, str] = Field(default_factory=dict) # Environment variables - def get_env(self) -> dict[str, str]: - return self.extra_env | self._base_env + # Status and metadata + status: str = "pending_code_review" + created_at: Optional[float] = None + completed_at: Optional[float] = None - def get_env_as_docker_args(self) -> list[str]: - return [f"-e {k}={v}" for k, v in self.get_env().items()] + @model_validator(mode="after") + def validate_code_path(self): + if self.code_path is not None: + code_path = Path(self.code_path).expanduser().resolve() + if not code_path.exists(): + raise FileNotFoundError(f"Code path '{self.code_path}' does not exist") + return self - def get_extra_env_as_docker_args(self) -> list[str]: - return [f"-e {k}={v}" for k, v in self.extra_env.items()] + def save_to_yaml(self, config_path: PathLike) -> None: + """Save the config to a YAML file.""" + yaml_dump = yaml.safe_dump( + self.model_dump(mode="json", exclude_none=True), + indent=2, + sort_keys=False, + ) + Path(config_path).write_text(yaml_dump) - @property - def _base_env(self) -> dict[str, str]: - interpreter = " ".join(self.runtime.cmd) - # interpreter_str = f"'{interpreter}'" if " " in interpreter else interpreter - return { - "OUTPUT_DIR": str(self.output_dir.absolute()), - "DATA_DIR": str(self.data_path.absolute()), - "CODE_DIR": str(self.function_folder.absolute()), - "TIMEOUT": str(self.timeout), - "INPUT_FILE": str(self.function_folder / self.args[0]), - "INTERPRETER": interpreter, - } + @classmethod + def from_yaml(cls, config_path: Path) -> "JobConfig": + """Create instance from YAML file.""" + with open(config_path, "r") as f: + config_dict = yaml.safe_load(f) + config_dict["config_path"] = config_path + return cls(**config_dict) class JobResults(BaseModel): diff --git a/packages/syft-runtimes/src/syft_runtimes/runners.py b/packages/syft-runtimes/src/syft_runtimes/runtimes.py similarity index 69% rename from packages/syft-runtimes/src/syft_runtimes/runners.py rename to packages/syft-runtimes/src/syft_runtimes/runtimes.py index 2dc2f22b..26858122 100644 --- a/packages/syft-runtimes/src/syft_runtimes/runners.py +++ b/packages/syft-runtimes/src/syft_runtimes/runtimes.py @@ -1,13 +1,9 @@ import os import subprocess import time -import threading -import json -import shutil -from abc import abstractmethod from pathlib import Path from typing import Callable, Optional, Type -from uuid import uuid4 +from uuid import UUID from loguru import logger from syft_core import Client as SyftBoxClient @@ -395,6 +391,7 @@ def _prepare_run_command(self, job_config: JobConfig) -> list[str]: # ------- new runtime classes ------- class FolderBasedRuntime: """Base class for folder-based syft runtimes that has the following structure: + Suppose that we are looking from the perspective of the 'do1@openmined.org' └── SyftBox ├── datasites/ │ ├── do1@openmined.org @@ -461,198 +458,46 @@ def update_config(self, **kwargs) -> None: self.config.update(**kwargs) def submit_job(self, job_config: JobConfig) -> str: - """Submit a job to the execution queue""" - job_id = str(uuid4()) - job_file = self.jobs_dir / f"{job_id}.json" - - # Serialize job config to JSON - job_data = { - "job_id": job_id, - "config": job_config.model_dump_json(), - "status": JobStatus.pending_code_review.value, - "created_at": time.time(), - } - - job_file.write_text(json.dumps(job_data, indent=2)) - logger.info(f"Job {job_id} submitted to {job_file}") - return job_id - - def get_job_status(self, job_id: str) -> JobStatus: - """Get the current status of a job""" - # Check in execution queue - job_file = self.jobs_dir / f"{job_id}.json" - if job_file.exists(): - job_data = json.loads(job_file.read_text()) - return JobStatus(job_data["status"]) - - # Check in done folder - done_file = self.done_dir / f"{job_id}.json" - if done_file.exists(): - job_data = json.loads(done_file.read_text()) - return JobStatus(job_data["status"]) - - # Check status updates - status_file = self.running_dir / f"{job_id}_status.json" - if status_file.exists(): - status_data = json.loads(status_file.read_text()) - return JobStatus(status_data["status"]) - - raise ValueError(f"Job {job_id} not found") - - def get_job_results(self, job_id: str) -> JobResults: - """Get the results of a completed job""" - done_file = self.done_dir / f"{job_id}.json" - if not done_file.exists(): - raise ValueError(f"Job {job_id} is not completed or not found") - - job_data = json.loads(done_file.read_text()) - if job_data["status"] != JobStatus.job_run_finished.value: - raise ValueError(f"Job {job_id} has not finished successfully") - - results_dir = self.done_dir / f"{job_id}_results" - if not results_dir.exists(): - raise ValueError(f"Results directory for job {job_id} not found") - - return JobResults(results_dir=results_dir) - - def watch_folders(self) -> None: - """Start watching the folders for new jobs (non-blocking)""" - if self._running: - logger.warning("Folder watching is already running") - return + """Submit a job to the jobs (syft_runtimes//jobs) queue""" + raise NotImplementedError - self._running = True - self._watch_thread = threading.Thread(target=self._watch_loop, daemon=True) - self._watch_thread.start() - logger.info("Started folder watching thread") - - def stop_watching(self) -> None: - """Stop watching folders""" - self._running = False - if self._watch_thread: - self._watch_thread.join(timeout=5) - logger.info("Stopped folder watching") - - def _watch_loop(self) -> None: - """Main loop for watching and processing jobs""" - while self._running: - try: - self.process_job_queue() - time.sleep(1) # Check every second - except Exception as e: - logger.error(f"Error in watch loop: {e}") - time.sleep(5) # Wait longer on error - - def process_job_queue(self) -> None: - """Process all pending jobs in the queue""" - job_files = list(self.jobs_dir.glob("*.json")) - - for job_file in job_files: - try: - job_data = json.loads(job_file.read_text()) - job_id = job_data["job_id"] - - # Skip if not pending - if job_data["status"] != JobStatus.pending_code_review.value: - continue - - logger.info(f"Processing job {job_id}") - - # Load job config - job_config = JobConfig.model_validate_json(job_data["config"]) - - # Execute the job - self._execute_job(job_id, job_config, job_data) - - except Exception as e: - logger.error(f"Error processing job file {job_file}: {e}") - - def _execute_job(self, job_id: str, job_config: JobConfig, job_data: dict) -> None: - """Execute a single job""" - try: - # Update status to in progress - job_data["status"] = JobStatus.job_in_progress.value - job_file = self.jobs_dir / f"{job_id}.json" - job_file.write_text(json.dumps(job_data, indent=2)) - - # Run the job - result = self.run(job_config) - - if isinstance(result, tuple): - return_code, error_message = result - else: - # Handle non-blocking case - for folder-based runner, we'll make it blocking - return_code = result.wait() - error_message = None - if return_code != 0: - stderr_output = result.stderr.read() if result.stderr else "" - error_message = stderr_output - - # Update job status based on result - if return_code == 0: - job_data["status"] = JobStatus.job_run_finished.value - job_data["error"] = JobErrorKind.no_error.value - job_data["error_message"] = None - else: - job_data["status"] = JobStatus.job_run_failed.value - job_data["error"] = JobErrorKind.execution_failed.value - job_data["error_message"] = error_message - - job_data["completed_at"] = time.time() - - # Move to done folder - self.move_to_done(job_id, job_data, job_config) + def get_job_status(self, job_id: UUID) -> JobStatus: + """Get the current status of a job which can be pending (in `jobs/`), + in progress (in `running/`), or finished (in `done/`). + """ + raise NotImplementedError - except Exception as e: - logger.error(f"Error executing job {job_id}: {e}") - job_data["status"] = JobStatus.job_run_failed.value - job_data["error"] = JobErrorKind.execution_failed.value - job_data["error_message"] = str(e) - job_data["completed_at"] = time.time() - self.move_to_done(job_id, job_data, job_config) - - def move_to_done(self, job_id: str, job_data: dict, job_config: JobConfig) -> None: - """Move a completed job to the done folder""" - # Save job metadata to done folder - done_file = self.done_dir - done_file.write_text(json.dumps(job_data, indent=2)) - - # Copy job results (output and logs) to done folder if they exist - if job_config.output_dir.exists(): - results_dir = self.done_jobs_dir / f"{job_id}_results" - if results_dir.exists(): - shutil.rmtree(results_dir) - shutil.copytree(job_config.job_path, results_dir) - - # Remove from execution queue - job_file = self.jobs_to_execute_dir / f"{job_id}.json" - if job_file.exists(): - job_file.unlink() - - logger.info( - f"Job {job_id} moved to done folder with status {job_data['status']}" - ) + def get_job_results(self, job_id: UUID) -> JobResults: + """Get the results of a completed job (in the `done/` folder)""" + raise NotImplementedError - @abstractmethod - def _prepare_job_folders(self, job_config: JobConfig) -> None: - """Prepare job-specific folders - to be implemented by subclasses""" - pass + def _get_job_config(self, job_id: UUID) -> JobConfig: + """Get the job config for a particular job""" + raise NotImplementedError - @abstractmethod - def _validate_paths(self, job_config: JobConfig) -> None: - """Validate job paths - to be implemented by subclasses""" - pass + def _polling_jobs(self) -> None: + """Polling for new jobs by watching the `jobs/` folder""" + raise NotImplementedError - @abstractmethod - def _run_subprocess( - self, - cmd: list[str], - job_config: JobConfig, - env: dict | None = None, - blocking: bool = True, - ) -> tuple[int, str | None] | subprocess.Popen: - """Run subprocess - to be implemented by subclasses""" - pass + def _stop_polling_jobs(self) -> None: + """Stop polling for jobs by watching the `jobs/` folder""" + raise NotImplementedError + + def _move_to_running(self, job_id: UUID) -> None: + """Move a pending job from `jobs/` to the `running/` folder""" + raise NotImplementedError + + def _process_job_queue(self) -> None: + """Process all pending jobs in the `jobs/` folder""" + raise NotImplementedError + + def _execute_job(self, job_id: UUID) -> None: + """Execute a single job""" + raise NotImplementedError + + def _move_to_done(self, job_id: UUID) -> None: + """Move a completed or errored job from the `running/` to the `done/` folder""" + raise NotImplementedError class HighLowRuntime: @@ -672,6 +517,12 @@ def init_runtime_dir(self) -> None: self.lowside_runtime.init_runtime_dir() +class PythonRuntime(FolderBasedRuntime): + """Python runtime that runs a Python job in a local subprocess.""" + + pass + + def get_runner_cls(job_config: JobConfig) -> Type[SyftRuntime]: """Factory to get the appropriate runner class for a job config.""" runtime_kind = job_config.runtime.kind