From 7d28a73d4705adc7be0f9aa934f38695ad19887e Mon Sep 17 00:00:00 2001 From: Khoa Duy Nguyen <88959106+khoaguin@users.noreply.github.com> Date: Tue, 8 Jul 2025 11:52:00 +0700 Subject: [PATCH 1/2] initial notebooks for different runtimes --- .../notebooks/enclave/enclave.ipynb | 10 + .../notebooks/high-low/high_low.ipynb | 38 +- .../notebooks/python/python.ipynb | 398 ++++++++++++++++++ .../syft-runtimes/notebooks/python/utils.py | 103 +++++ 4 files changed, 534 insertions(+), 15 deletions(-) create mode 100644 packages/syft-runtimes/notebooks/enclave/enclave.ipynb create mode 100644 packages/syft-runtimes/notebooks/python/python.ipynb create mode 100644 packages/syft-runtimes/notebooks/python/utils.py diff --git a/packages/syft-runtimes/notebooks/enclave/enclave.ipynb b/packages/syft-runtimes/notebooks/enclave/enclave.ipynb new file mode 100644 index 00000000..21c26798 --- /dev/null +++ b/packages/syft-runtimes/notebooks/enclave/enclave.ipynb @@ -0,0 +1,10 @@ +{ + "cells": [], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/packages/syft-runtimes/notebooks/high-low/high_low.ipynb b/packages/syft-runtimes/notebooks/high-low/high_low.ipynb index 1dd47c9f..57fac068 100644 --- a/packages/syft-runtimes/notebooks/high-low/high_low.ipynb +++ b/packages/syft-runtimes/notebooks/high-low/high_low.ipynb @@ -24,7 +24,7 @@ "id": "1", "metadata": {}, "source": [ - "# Initializing the high datasite" + "## Initializing the high datasite and connect to it" ] }, { @@ -110,8 +110,6 @@ "outputs": [], "source": [ "# TODO - ensure the sync folders exist on the lowside (over ssh?)\n", - "\n", - "\n", "lowside_sync_folders = [\n", " sync_config.jobs_dir(Side.LOW),\n", " sync_config.outputs_dir(Side.LOW),\n", @@ -137,7 +135,7 @@ "id": "9", "metadata": {}, "source": [ - "# Create a high-side dataset" + "## Create a high-side dataset" ] }, { @@ -287,7 +285,7 @@ "id": "19", "metadata": {}, "source": [ - "# Switch to low-side" + "## Switch to low-side" ] }, { @@ -365,18 +363,28 @@ "id": "25", "metadata": {}, "source": [ - "# NEXT STEPS:\n", - "\n", - "- Rewrite syft-runtimes to the folder-based job runner (see https://docs.google.com/document/d/1ORYC-Z55t07dcu45mn2AL1RiMGWOlCD2mYb8kKUk0SY/edit?tab=t.0)\n", - "\n", - "High side:\n", - "- Run job runner on high-side as daemon process, watching `private/job_runners/highside-1234/jobs` for new jobs\n", - "- Manually sync pending jobs and results (just in a notebook for now)\n", - "\n", - "Low side:\n", + "## High side Job Runner\n", + "- Run job runner on high-side as daemon process, watching the folder `private/job_runners/highside-1234/jobs` for new jobs\n", + "- Manually sync pending jobs and results (just in a notebook for now)" + ] + }, + { + "cell_type": "markdown", + "id": "26", + "metadata": {}, + "source": [ + "## Low side submits jobs to the correct runner on the high side. Check for results in the outputs folder\n", "- Low side: submit job to the correct runner (according to the dataset location: `private/job_runners//jobs`)\n", - "- watch for results in the corresponding outputs dir\n" + "- watch for results in the corresponding outputs dir" ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "27", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/packages/syft-runtimes/notebooks/python/python.ipynb b/packages/syft-runtimes/notebooks/python/python.ipynb new file mode 100644 index 00000000..e599b441 --- /dev/null +++ b/packages/syft-runtimes/notebooks/python/python.ipynb @@ -0,0 +1,398 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "0", + "metadata": {}, + "outputs": [], + "source": [ + "from syft_rds.orchestra import setup_rds_server, remove_rds_stack_dir" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1", + "metadata": {}, + "outputs": [], + "source": [ + "remove_rds_stack_dir(key=\"rds_stack\", root_dir=\"./\")" + ] + }, + { + "cell_type": "markdown", + "id": "2", + "metadata": {}, + "source": [ + "## DO1 Uploads Dataset" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3", + "metadata": {}, + "outputs": [], + "source": [ + "do_stack_1 = setup_rds_server(email=\"do1@openmined.org\", key=\"rds_stack\", root_dir=\"./\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4", + "metadata": {}, + "outputs": [], + "source": [ + "do_client_1 = do_stack_1.init_session(host=\"do1@openmined.org\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5", + "metadata": {}, + "outputs": [], + "source": [ + "from utils import generate_crop_data, create_readme\n", + "\n", + "DATASET_FILE_NAME = \"crop_stock_data.csv\"\n", + "DATASET_PATH = \"datasets/part_2\"\n", + "DATASET_PRIVATE_PATH = f\"{DATASET_PATH}/private\"\n", + "DATASET_MOCK_PATH = f\"{DATASET_PATH}/mock\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6", + "metadata": {}, + "outputs": [], + "source": [ + "# Private Data\n", + "generate_crop_data(\n", + " num_rows=10, output_path=f\"{DATASET_PRIVATE_PATH}/{DATASET_FILE_NAME}\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7", + "metadata": {}, + "outputs": [], + "source": [ + "# Mock Data\n", + "generate_crop_data(num_rows=10, output_path=f\"{DATASET_MOCK_PATH}/{DATASET_FILE_NAME}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [], + "source": [ + "README_PATH = f\"{DATASET_PATH}/README.md\"\n", + "create_readme(output_path=README_PATH)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9", + "metadata": {}, + "outputs": [], + "source": [ + "from syft_notebook_ui import show_dir\n", + "from pathlib import Path\n", + "\n", + "# Now we can see the dataset is in our sync folder\n", + "show_dir(Path(\"datasets/part_2\"))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10", + "metadata": {}, + "outputs": [], + "source": [ + "try:\n", + " dataset = do_client_1.dataset.create(\n", + " name=\"Organic Crop Stock Data\",\n", + " summary=\"This dataset contains information about organic crop stock.\",\n", + " readme_path=README_PATH,\n", + " private_path=DATASET_PRIVATE_PATH,\n", + " mock_path=DATASET_MOCK_PATH,\n", + " # auto_approval = [\"ds@openmined.org\"]\n", + " )\n", + " dataset.describe()\n", + "except Exception as e:\n", + " print(f\"Error: {e}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "markdown", + "id": "12", + "metadata": {}, + "source": [ + "## DO2 Upload dataset" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "13", + "metadata": {}, + "outputs": [], + "source": [ + "do_stack_2 = setup_rds_server(email=\"do2@openmined.org\", key=\"rds_stack\", root_dir=\".\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "14", + "metadata": {}, + "outputs": [], + "source": [ + "do_client_2 = do_stack_2.init_session(host=\"do2@openmined.org\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "15", + "metadata": {}, + "outputs": [], + "source": [ + "DATASET_FILE_NAME = \"crop_stock_data.csv\"\n", + "DATASET_PATH = \"datasets/part_1\"\n", + "DATASET_PRIVATE_PATH = f\"{DATASET_PATH}/private\"\n", + "DATASET_MOCK_PATH = f\"{DATASET_PATH}/mock\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "16", + "metadata": {}, + "outputs": [], + "source": [ + "# Private Data\n", + "generate_crop_data(\n", + " num_rows=10, output_path=f\"{DATASET_PRIVATE_PATH}/{DATASET_FILE_NAME}\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "17", + "metadata": {}, + "outputs": [], + "source": [ + "# Mock Data\n", + "generate_crop_data(num_rows=10, output_path=f\"{DATASET_MOCK_PATH}/{DATASET_FILE_NAME}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "18", + "metadata": {}, + "outputs": [], + "source": [ + "try:\n", + " dataset = do_client_2.dataset.create(\n", + " name=\"Organic Crop Stock Data\",\n", + " summary=\"This dataset contains information about organic crop stock.\",\n", + " readme_path=README_PATH,\n", + " private_path=DATASET_PRIVATE_PATH,\n", + " mock_path=DATASET_MOCK_PATH,\n", + " # auto_approval = [\"ds@openmined.org\"]\n", + " )\n", + " dataset.describe()\n", + "except Exception as e:\n", + " print(f\"Error: {e}\")" + ] + }, + { + "cell_type": "markdown", + "id": "19", + "metadata": {}, + "source": [ + "## DS submits jobs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "20", + "metadata": {}, + "outputs": [], + "source": [ + "ds_stack = setup_rds_server(email=\"ds@openmined.org\", key=\"rds_stack\", root_dir=\".\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "21", + "metadata": {}, + "outputs": [], + "source": [ + "DO1 = \"do1@openmined.org\"\n", + "DO2 = \"do2@openmined.org\"\n", + "\n", + "do_client_1 = ds_stack.init_session(host=DO1)\n", + "print(\"Logged into: \", do_client_1.host)\n", + "\n", + "do_client_2 = ds_stack.init_session(host=DO2)\n", + "print(\"Logged into: \", do_client_2.host)\n", + "\n", + "assert not do_client_1.is_admin\n", + "assert not do_client_2.is_admin" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "22", + "metadata": {}, + "outputs": [], + "source": [ + "DATASET_NAME = \"Organic Crop Stock Data\"\n", + "\n", + "dataset1 = do_client_1.dataset.get(name=DATASET_NAME)\n", + "dataset1.describe()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "23", + "metadata": {}, + "outputs": [], + "source": [ + "dataset2 = do_client_2.dataset.get(name=DATASET_NAME)\n", + "dataset2.describe()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "24", + "metadata": {}, + "outputs": [], + "source": [ + "do_client_1.runtime.get_all()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "25", + "metadata": {}, + "outputs": [], + "source": [ + "CODE_DIR = \"./code\"\n", + "Path(CODE_DIR).mkdir(parents=True, exist_ok=True)\n", + "\n", + "script_content = \"\"\"import os\n", + "from pathlib import Path\n", + "from sys import exit\n", + "\n", + "import pandas as pd\n", + "\n", + "DATA_DIR = os.environ[\"DATA_DIR\"]\n", + "OUTPUT_DIR = os.environ[\"OUTPUT_DIR\"]\n", + "\n", + "dataset_paths = [ Path(dataset_path) for dataset_path in DATA_DIR.split(\",\")]\n", + "total_carrots = 0\n", + "total_tomatoes = 0\n", + "\n", + "for dataset_path in dataset_paths:\n", + " if not dataset_path.exists():\n", + " print(\"Warning: Dataset path does not exist:\", dataset_path)\n", + " exit(1)\n", + " df = pd.read_csv(dataset_path / \"crop_stock_data.csv\")\n", + " total_carrots += df[df[\"Product name\"] == \"Carrots\"][\"Quantity\"].sum()\n", + " total_tomatoes += df[df[\"Product name\"] == \"Tomatoes\"][\"Quantity\"].sum()\n", + "\n", + "with open(os.path.join(OUTPUT_DIR, \"output.txt\"), \"w\") as f:\n", + " f.write(f\"Total Carrots: {total_carrots}\\n\")\n", + " f.write(f\"Total Tomatoes: {total_tomatoes}\\n\")\n", + "\"\"\"\n", + "\n", + "\n", + "# Write the script to the entrypoint file\n", + "entrypoint_path = Path(CODE_DIR) / \"entrypoint.py\"\n", + "with open(entrypoint_path, \"w\") as f:\n", + " f.write(script_content)\n", + "\n", + "print(f\"Successfully wrote entrypoint script to: {entrypoint_path}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "26", + "metadata": {}, + "outputs": [], + "source": [ + "datasites = [do_client_1, do_client_2]\n", + "\n", + "for client in datasites:\n", + " job = client.job.submit(\n", + " name=\"Crop Avg Experiment\",\n", + " description=\"Farming Coop Avg Experiment\",\n", + " user_code_path=\"./code\",\n", + " dataset_name=DATASET_NAME,\n", + " tags=[\"enclave\", \"syft\"],\n", + " entrypoint=\"entrypoint.py\",\n", + " )\n", + " print(job)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "27", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/packages/syft-runtimes/notebooks/python/utils.py b/packages/syft-runtimes/notebooks/python/utils.py new file mode 100644 index 00000000..935441d7 --- /dev/null +++ b/packages/syft-runtimes/notebooks/python/utils.py @@ -0,0 +1,103 @@ +import csv +import random +import string +from pathlib import Path + + +def generate_crop_data(num_rows: int, output_path: str): + """ + Generates a CSV file with mock organic crop stock data. + Each crop appears only once with a unique fixed ID. + All units are in 'kgs'. + + Args: + num_rows (int): Number of unique crop rows to generate (max limited to number of crops). + output_path (str): Path to save the CSV file. + """ + crops = [ + "Carrots", + "Spinach", + "Kale", + "Tomatoes", + "Zucchini", + "Potatoes", + "Onions", + "Beets", + "Radishes", + "Garlic", + "Ginger", + "Cabbage", + "Cauliflower", + "Broccoli", + "Peas", + ] + + # Limit rows to number of available crops + if num_rows > len(crops): + raise ValueError( + f"Can only generate up to {len(crops)} unique crops, got {num_rows}." + ) + + unit = "kgs" + + # Assign fixed unique IDs to each crop + crop_id_map = { + crop: "U" + "".join(random.choices(string.digits, k=4)) for crop in crops + } + + # Select a non-repeating subset of crops + selected_crops = random.sample(crops, num_rows) + + # Ensure output directory exists + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + + with open(output_path, mode="w", newline="") as file: + writer = csv.writer(file) + writer.writerow(["ID", "Product name", "Quantity", "Price ($)", "Unit"]) + + for crop in selected_crops: + crop_id = crop_id_map[crop] + quantity = random.randint(1, 500) + price = f"{round(random.uniform(1.0, 10.0) * quantity, 2)}" + writer.writerow([crop_id, crop, quantity, price, unit]) + + print(f"Crop data with {num_rows} unique crops saved to {output_path}") + + +def create_readme(output_path: str): + """ + Creates a short README.md file describing the organic crop stock dataset. + + Args: + output_path (str): The path where the README.md file should be created. + """ + readme_content = """# Organic Crop Stock Dataset + +This dataset contains data representing organic crop inventory for various products. + +### Schema + +| Column | Description | +|--------------|------------------------------------| +| ID | Unique ID for each crop | +| Product name | Name of the organic crop | +| Quantity | Quantity in stock | +| Price | Total price for the given quantity | +| Unit | Unit of measurement (always 'kgs') | + +### Notes + +- Each crop has a unique and consistent ID. +- All units are in **kilograms (kgs)**. + +""" + + # Ensure the directory exists + readme_path = Path(output_path) + readme_path.parent.mkdir(parents=True, exist_ok=True) + + # Write README.md + with open(readme_path, "w") as f: + f.write(readme_content) + + print(f"README written to {readme_path}") From 7c9d7c46a586486ae4a30ddf04f04ba5abbbe68b Mon Sep 17 00:00:00 2001 From: Khoa Duy Nguyen <88959106+khoaguin@users.noreply.github.com> Date: Thu, 17 Jul 2025 17:45:17 +0700 Subject: [PATCH 2/2] new job config for folder based runtime --- .../notebooks/python/python.ipynb | 172 +++++++------ .../src/syft_runtimes/__init__.py | 7 +- .../syft-runtimes/src/syft_runtimes/consts.py | 1 + .../src/syft_runtimes/high_low/setup.py | 2 +- .../syft-runtimes/src/syft_runtimes/models.py | 84 +++---- .../syft_runtimes/{runners.py => runtimes.py} | 235 ++++-------------- 6 files changed, 181 insertions(+), 320 deletions(-) create mode 100644 packages/syft-runtimes/src/syft_runtimes/consts.py rename packages/syft-runtimes/src/syft_runtimes/{runners.py => runtimes.py} (69%) diff --git a/packages/syft-runtimes/notebooks/python/python.ipynb b/packages/syft-runtimes/notebooks/python/python.ipynb index e599b441..b4ae50e0 100644 --- a/packages/syft-runtimes/notebooks/python/python.ipynb +++ b/packages/syft-runtimes/notebooks/python/python.ipynb @@ -126,24 +126,15 @@ " readme_path=README_PATH,\n", " private_path=DATASET_PRIVATE_PATH,\n", " mock_path=DATASET_MOCK_PATH,\n", - " # auto_approval = [\"ds@openmined.org\"]\n", " )\n", " dataset.describe()\n", "except Exception as e:\n", " print(f\"Error: {e}\")" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "11", - "metadata": {}, - "outputs": [], - "source": [] - }, { "cell_type": "markdown", - "id": "12", + "id": "11", "metadata": {}, "source": [ "## DO2 Upload dataset" @@ -152,67 +143,27 @@ { "cell_type": "code", "execution_count": null, - "id": "13", - "metadata": {}, - "outputs": [], - "source": [ - "do_stack_2 = setup_rds_server(email=\"do2@openmined.org\", key=\"rds_stack\", root_dir=\".\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "14", - "metadata": {}, - "outputs": [], - "source": [ - "do_client_2 = do_stack_2.init_session(host=\"do2@openmined.org\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "15", + "id": "12", "metadata": {}, "outputs": [], "source": [ + "do_stack_2 = setup_rds_server(email=\"do2@openmined.org\", key=\"rds_stack\", root_dir=\".\")\n", + "\n", + "do_client_2 = do_stack_2.init_session(host=\"do2@openmined.org\")\n", + "\n", "DATASET_FILE_NAME = \"crop_stock_data.csv\"\n", "DATASET_PATH = \"datasets/part_1\"\n", "DATASET_PRIVATE_PATH = f\"{DATASET_PATH}/private\"\n", - "DATASET_MOCK_PATH = f\"{DATASET_PATH}/mock\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "16", - "metadata": {}, - "outputs": [], - "source": [ + "DATASET_MOCK_PATH = f\"{DATASET_PATH}/mock\"\n", + "\n", "# Private Data\n", "generate_crop_data(\n", " num_rows=10, output_path=f\"{DATASET_PRIVATE_PATH}/{DATASET_FILE_NAME}\"\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "17", - "metadata": {}, - "outputs": [], - "source": [ + ")\n", + "\n", "# Mock Data\n", - "generate_crop_data(num_rows=10, output_path=f\"{DATASET_MOCK_PATH}/{DATASET_FILE_NAME}\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "18", - "metadata": {}, - "outputs": [], - "source": [ + "generate_crop_data(num_rows=10, output_path=f\"{DATASET_MOCK_PATH}/{DATASET_FILE_NAME}\")\n", + "\n", "try:\n", " dataset = do_client_2.dataset.create(\n", " name=\"Organic Crop Stock Data\",\n", @@ -229,7 +180,7 @@ }, { "cell_type": "markdown", - "id": "19", + "id": "13", "metadata": {}, "source": [ "## DS submits jobs" @@ -238,7 +189,7 @@ { "cell_type": "code", "execution_count": null, - "id": "20", + "id": "14", "metadata": {}, "outputs": [], "source": [ @@ -248,7 +199,7 @@ { "cell_type": "code", "execution_count": null, - "id": "21", + "id": "15", "metadata": {}, "outputs": [], "source": [ @@ -268,7 +219,7 @@ { "cell_type": "code", "execution_count": null, - "id": "22", + "id": "16", "metadata": {}, "outputs": [], "source": [ @@ -281,7 +232,7 @@ { "cell_type": "code", "execution_count": null, - "id": "23", + "id": "17", "metadata": {}, "outputs": [], "source": [ @@ -292,7 +243,7 @@ { "cell_type": "code", "execution_count": null, - "id": "24", + "id": "18", "metadata": {}, "outputs": [], "source": [ @@ -302,7 +253,7 @@ { "cell_type": "code", "execution_count": null, - "id": "25", + "id": "19", "metadata": {}, "outputs": [], "source": [ @@ -335,7 +286,6 @@ " f.write(f\"Total Tomatoes: {total_tomatoes}\\n\")\n", "\"\"\"\n", "\n", - "\n", "# Write the script to the entrypoint file\n", "entrypoint_path = Path(CODE_DIR) / \"entrypoint.py\"\n", "with open(entrypoint_path, \"w\") as f:\n", @@ -344,27 +294,79 @@ "print(f\"Successfully wrote entrypoint script to: {entrypoint_path}\")" ] }, + { + "cell_type": "markdown", + "id": "20", + "metadata": {}, + "source": [ + "## New runtimes interface (folder-based)\n", + "\n", + "Let's interact with it from the DOs point of view for now, since the runtimes stay in the `private` folder (`SyftBox/private//syft_runtimes`)" + ] + }, { "cell_type": "code", "execution_count": null, - "id": "26", + "id": "21", "metadata": {}, "outputs": [], "source": [ - "datasites = [do_client_1, do_client_2]\n", + "from syft_runtimes import PythonRuntime, JobConfig, DEFAULT_RUNTIME\n", "\n", - "for client in datasites:\n", - " job = client.job.submit(\n", - " name=\"Crop Avg Experiment\",\n", - " description=\"Farming Coop Avg Experiment\",\n", - " user_code_path=\"./code\",\n", - " dataset_name=DATASET_NAME,\n", - " tags=[\"enclave\", \"syft\"],\n", - " entrypoint=\"entrypoint.py\",\n", - " )\n", - " print(job)" + "python_runtime = PythonRuntime(\n", + " syftbox_client=do_stack_1.client, runtime_name=DEFAULT_RUNTIME\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "22", + "metadata": {}, + "outputs": [], + "source": [ + "python_runtime.init_runtime_dir()" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "23", + "metadata": {}, + "outputs": [], + "source": [ + "job_config = JobConfig(\n", + " code_path=\"./code\",\n", + " entrypoint=\"entrypoint.py\",\n", + ")\n", + "\n", + "job_config.model_dump()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "24", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "25", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "26", + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "code", "execution_count": null, @@ -372,6 +374,14 @@ "metadata": {}, "outputs": [], "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "28", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/packages/syft-runtimes/src/syft_runtimes/__init__.py b/packages/syft-runtimes/src/syft_runtimes/__init__.py index 65f76e25..5c645501 100644 --- a/packages/syft-runtimes/src/syft_runtimes/__init__.py +++ b/packages/syft-runtimes/src/syft_runtimes/__init__.py @@ -1,8 +1,9 @@ -from syft_runtimes.runners import ( +from syft_runtimes.runtimes import ( DockerRunner, PythonRunner, FolderBasedRuntime, HighLowRuntime, + PythonRuntime, get_runner_cls, ) from syft_runtimes.output_handler import ( @@ -11,12 +12,15 @@ TextUI, ) from syft_runtimes.models import JobConfig, JobStatusUpdate, JobErrorKind, JobStatus +from syft_runtimes.consts import DEFAULT_RUNTIME + __all__ = [ "DockerRunner", "PythonRunner", "FolderBasedRuntime", "HighLowRuntime", + "PythonRuntime", "FileOutputHandler", "RichConsoleUI", "TextUI", @@ -25,4 +29,5 @@ "JobStatusUpdate", "JobErrorKind", "JobStatus", + "DEFAULT_RUNTIME", ] diff --git a/packages/syft-runtimes/src/syft_runtimes/consts.py b/packages/syft-runtimes/src/syft_runtimes/consts.py new file mode 100644 index 00000000..a4872d2a --- /dev/null +++ b/packages/syft-runtimes/src/syft_runtimes/consts.py @@ -0,0 +1 @@ +DEFAULT_RUNTIME = "default_python_runtime" diff --git a/packages/syft-runtimes/src/syft_runtimes/high_low/setup.py b/packages/syft-runtimes/src/syft_runtimes/high_low/setup.py index 354bfaca..a9e62a75 100644 --- a/packages/syft-runtimes/src/syft_runtimes/high_low/setup.py +++ b/packages/syft-runtimes/src/syft_runtimes/high_low/setup.py @@ -20,7 +20,7 @@ ) from syft_runtimes.high_low.consts import DEFAULT_HIGH_SIDE_DATA_DIR from syft_runtimes.models import BaseRuntimeConfig -from syft_runtimes.runners import HighLowRuntime +from syft_runtimes.runtimes import HighLowRuntime class HighLowRuntimeConfig(BaseRuntimeConfig): diff --git a/packages/syft-runtimes/src/syft_runtimes/models.py b/packages/syft-runtimes/src/syft_runtimes/models.py index c0e1af6f..c3eff281 100644 --- a/packages/syft-runtimes/src/syft_runtimes/models.py +++ b/packages/syft-runtimes/src/syft_runtimes/models.py @@ -4,8 +4,8 @@ import hashlib from typing import Any, TypeAlias, Union, Literal, Optional import os +from uuid import UUID, uuid4 from IPython.display import HTML, display -from datetime import datetime from loguru import logger from pydantic import ( @@ -16,6 +16,7 @@ ) from syft_notebook_ui.pydantic_html_repr import create_html_repr +from syft_runtimes.consts import DEFAULT_RUNTIME import yaml PathLike: TypeAlias = Union[str, os.PathLike, Path] @@ -200,56 +201,49 @@ class RuntimeUpdate(BaseModel): class JobConfig(BaseModel): - """Configuration for a job run""" - - function_folder: Path - args: list[str] - data_path: Path - runtime: Runtime - job_folder: Optional[Path] = Field( - default_factory=lambda: Path("jobs") / datetime.now().strftime("%Y%m%d_%H%M%S") - ) - timeout: int = 60 - data_mount_dir: str = "/app/data" - extra_env: dict[str, str] = {} - blocking: bool = Field(default=True) + # Job identification + job_id: UUID = Field(default_factory=uuid4) - @property - def job_path(self) -> Path: - """Derived path for job folder""" - return Path(self.job_folder) - - @property - def logs_dir(self) -> Path: - """Derived path for logs directory""" - return self.job_path / "logs" + # Core execution parameters + dataset_name: str | None = None # Name of the dataset to use + code_path: PathLike # Path to the code file/folder to execute + entrypoint: str | None = None + runtime_name: str = DEFAULT_RUNTIME # Which runtime to use - @property - def output_dir(self) -> Path: - """Derived path for output directory""" - return self.job_path / "output" + # Execution parameters + timeout: int = 300 # Timeout in seconds (5 minutes default) + args: list[str] = Field(default_factory=list) # Command line arguments + env: dict[str, str] = Field(default_factory=dict) # Environment variables - def get_env(self) -> dict[str, str]: - return self.extra_env | self._base_env + # Status and metadata + status: str = "pending_code_review" + created_at: Optional[float] = None + completed_at: Optional[float] = None - def get_env_as_docker_args(self) -> list[str]: - return [f"-e {k}={v}" for k, v in self.get_env().items()] + @model_validator(mode="after") + def validate_code_path(self): + if self.code_path is not None: + code_path = Path(self.code_path).expanduser().resolve() + if not code_path.exists(): + raise FileNotFoundError(f"Code path '{self.code_path}' does not exist") + return self - def get_extra_env_as_docker_args(self) -> list[str]: - return [f"-e {k}={v}" for k, v in self.extra_env.items()] + def save_to_yaml(self, config_path: PathLike) -> None: + """Save the config to a YAML file.""" + yaml_dump = yaml.safe_dump( + self.model_dump(mode="json", exclude_none=True), + indent=2, + sort_keys=False, + ) + Path(config_path).write_text(yaml_dump) - @property - def _base_env(self) -> dict[str, str]: - interpreter = " ".join(self.runtime.cmd) - # interpreter_str = f"'{interpreter}'" if " " in interpreter else interpreter - return { - "OUTPUT_DIR": str(self.output_dir.absolute()), - "DATA_DIR": str(self.data_path.absolute()), - "CODE_DIR": str(self.function_folder.absolute()), - "TIMEOUT": str(self.timeout), - "INPUT_FILE": str(self.function_folder / self.args[0]), - "INTERPRETER": interpreter, - } + @classmethod + def from_yaml(cls, config_path: Path) -> "JobConfig": + """Create instance from YAML file.""" + with open(config_path, "r") as f: + config_dict = yaml.safe_load(f) + config_dict["config_path"] = config_path + return cls(**config_dict) class JobResults(BaseModel): diff --git a/packages/syft-runtimes/src/syft_runtimes/runners.py b/packages/syft-runtimes/src/syft_runtimes/runtimes.py similarity index 69% rename from packages/syft-runtimes/src/syft_runtimes/runners.py rename to packages/syft-runtimes/src/syft_runtimes/runtimes.py index 2dc2f22b..26858122 100644 --- a/packages/syft-runtimes/src/syft_runtimes/runners.py +++ b/packages/syft-runtimes/src/syft_runtimes/runtimes.py @@ -1,13 +1,9 @@ import os import subprocess import time -import threading -import json -import shutil -from abc import abstractmethod from pathlib import Path from typing import Callable, Optional, Type -from uuid import uuid4 +from uuid import UUID from loguru import logger from syft_core import Client as SyftBoxClient @@ -395,6 +391,7 @@ def _prepare_run_command(self, job_config: JobConfig) -> list[str]: # ------- new runtime classes ------- class FolderBasedRuntime: """Base class for folder-based syft runtimes that has the following structure: + Suppose that we are looking from the perspective of the 'do1@openmined.org' └── SyftBox ├── datasites/ │ ├── do1@openmined.org @@ -461,198 +458,46 @@ def update_config(self, **kwargs) -> None: self.config.update(**kwargs) def submit_job(self, job_config: JobConfig) -> str: - """Submit a job to the execution queue""" - job_id = str(uuid4()) - job_file = self.jobs_dir / f"{job_id}.json" - - # Serialize job config to JSON - job_data = { - "job_id": job_id, - "config": job_config.model_dump_json(), - "status": JobStatus.pending_code_review.value, - "created_at": time.time(), - } - - job_file.write_text(json.dumps(job_data, indent=2)) - logger.info(f"Job {job_id} submitted to {job_file}") - return job_id - - def get_job_status(self, job_id: str) -> JobStatus: - """Get the current status of a job""" - # Check in execution queue - job_file = self.jobs_dir / f"{job_id}.json" - if job_file.exists(): - job_data = json.loads(job_file.read_text()) - return JobStatus(job_data["status"]) - - # Check in done folder - done_file = self.done_dir / f"{job_id}.json" - if done_file.exists(): - job_data = json.loads(done_file.read_text()) - return JobStatus(job_data["status"]) - - # Check status updates - status_file = self.running_dir / f"{job_id}_status.json" - if status_file.exists(): - status_data = json.loads(status_file.read_text()) - return JobStatus(status_data["status"]) - - raise ValueError(f"Job {job_id} not found") - - def get_job_results(self, job_id: str) -> JobResults: - """Get the results of a completed job""" - done_file = self.done_dir / f"{job_id}.json" - if not done_file.exists(): - raise ValueError(f"Job {job_id} is not completed or not found") - - job_data = json.loads(done_file.read_text()) - if job_data["status"] != JobStatus.job_run_finished.value: - raise ValueError(f"Job {job_id} has not finished successfully") - - results_dir = self.done_dir / f"{job_id}_results" - if not results_dir.exists(): - raise ValueError(f"Results directory for job {job_id} not found") - - return JobResults(results_dir=results_dir) - - def watch_folders(self) -> None: - """Start watching the folders for new jobs (non-blocking)""" - if self._running: - logger.warning("Folder watching is already running") - return + """Submit a job to the jobs (syft_runtimes//jobs) queue""" + raise NotImplementedError - self._running = True - self._watch_thread = threading.Thread(target=self._watch_loop, daemon=True) - self._watch_thread.start() - logger.info("Started folder watching thread") - - def stop_watching(self) -> None: - """Stop watching folders""" - self._running = False - if self._watch_thread: - self._watch_thread.join(timeout=5) - logger.info("Stopped folder watching") - - def _watch_loop(self) -> None: - """Main loop for watching and processing jobs""" - while self._running: - try: - self.process_job_queue() - time.sleep(1) # Check every second - except Exception as e: - logger.error(f"Error in watch loop: {e}") - time.sleep(5) # Wait longer on error - - def process_job_queue(self) -> None: - """Process all pending jobs in the queue""" - job_files = list(self.jobs_dir.glob("*.json")) - - for job_file in job_files: - try: - job_data = json.loads(job_file.read_text()) - job_id = job_data["job_id"] - - # Skip if not pending - if job_data["status"] != JobStatus.pending_code_review.value: - continue - - logger.info(f"Processing job {job_id}") - - # Load job config - job_config = JobConfig.model_validate_json(job_data["config"]) - - # Execute the job - self._execute_job(job_id, job_config, job_data) - - except Exception as e: - logger.error(f"Error processing job file {job_file}: {e}") - - def _execute_job(self, job_id: str, job_config: JobConfig, job_data: dict) -> None: - """Execute a single job""" - try: - # Update status to in progress - job_data["status"] = JobStatus.job_in_progress.value - job_file = self.jobs_dir / f"{job_id}.json" - job_file.write_text(json.dumps(job_data, indent=2)) - - # Run the job - result = self.run(job_config) - - if isinstance(result, tuple): - return_code, error_message = result - else: - # Handle non-blocking case - for folder-based runner, we'll make it blocking - return_code = result.wait() - error_message = None - if return_code != 0: - stderr_output = result.stderr.read() if result.stderr else "" - error_message = stderr_output - - # Update job status based on result - if return_code == 0: - job_data["status"] = JobStatus.job_run_finished.value - job_data["error"] = JobErrorKind.no_error.value - job_data["error_message"] = None - else: - job_data["status"] = JobStatus.job_run_failed.value - job_data["error"] = JobErrorKind.execution_failed.value - job_data["error_message"] = error_message - - job_data["completed_at"] = time.time() - - # Move to done folder - self.move_to_done(job_id, job_data, job_config) + def get_job_status(self, job_id: UUID) -> JobStatus: + """Get the current status of a job which can be pending (in `jobs/`), + in progress (in `running/`), or finished (in `done/`). + """ + raise NotImplementedError - except Exception as e: - logger.error(f"Error executing job {job_id}: {e}") - job_data["status"] = JobStatus.job_run_failed.value - job_data["error"] = JobErrorKind.execution_failed.value - job_data["error_message"] = str(e) - job_data["completed_at"] = time.time() - self.move_to_done(job_id, job_data, job_config) - - def move_to_done(self, job_id: str, job_data: dict, job_config: JobConfig) -> None: - """Move a completed job to the done folder""" - # Save job metadata to done folder - done_file = self.done_dir - done_file.write_text(json.dumps(job_data, indent=2)) - - # Copy job results (output and logs) to done folder if they exist - if job_config.output_dir.exists(): - results_dir = self.done_jobs_dir / f"{job_id}_results" - if results_dir.exists(): - shutil.rmtree(results_dir) - shutil.copytree(job_config.job_path, results_dir) - - # Remove from execution queue - job_file = self.jobs_to_execute_dir / f"{job_id}.json" - if job_file.exists(): - job_file.unlink() - - logger.info( - f"Job {job_id} moved to done folder with status {job_data['status']}" - ) + def get_job_results(self, job_id: UUID) -> JobResults: + """Get the results of a completed job (in the `done/` folder)""" + raise NotImplementedError - @abstractmethod - def _prepare_job_folders(self, job_config: JobConfig) -> None: - """Prepare job-specific folders - to be implemented by subclasses""" - pass + def _get_job_config(self, job_id: UUID) -> JobConfig: + """Get the job config for a particular job""" + raise NotImplementedError - @abstractmethod - def _validate_paths(self, job_config: JobConfig) -> None: - """Validate job paths - to be implemented by subclasses""" - pass + def _polling_jobs(self) -> None: + """Polling for new jobs by watching the `jobs/` folder""" + raise NotImplementedError - @abstractmethod - def _run_subprocess( - self, - cmd: list[str], - job_config: JobConfig, - env: dict | None = None, - blocking: bool = True, - ) -> tuple[int, str | None] | subprocess.Popen: - """Run subprocess - to be implemented by subclasses""" - pass + def _stop_polling_jobs(self) -> None: + """Stop polling for jobs by watching the `jobs/` folder""" + raise NotImplementedError + + def _move_to_running(self, job_id: UUID) -> None: + """Move a pending job from `jobs/` to the `running/` folder""" + raise NotImplementedError + + def _process_job_queue(self) -> None: + """Process all pending jobs in the `jobs/` folder""" + raise NotImplementedError + + def _execute_job(self, job_id: UUID) -> None: + """Execute a single job""" + raise NotImplementedError + + def _move_to_done(self, job_id: UUID) -> None: + """Move a completed or errored job from the `running/` to the `done/` folder""" + raise NotImplementedError class HighLowRuntime: @@ -672,6 +517,12 @@ def init_runtime_dir(self) -> None: self.lowside_runtime.init_runtime_dir() +class PythonRuntime(FolderBasedRuntime): + """Python runtime that runs a Python job in a local subprocess.""" + + pass + + def get_runner_cls(job_config: JobConfig) -> Type[SyftRuntime]: """Factory to get the appropriate runner class for a job config.""" runtime_kind = job_config.runtime.kind