diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..edf33b9 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,39 @@ +FROM python:3.10-slim + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + curl \ + sudo \ + wget \ + git \ + tar \ + && rm -rf /var/lib/apt/lists/* + +# Install nebula +RUN ARCH=$(uname -m) && \ + if [ "$ARCH" = "x86_64" ]; then ARCH="amd64"; elif [ "$ARCH" = "aarch64" ]; then ARCH="arm64"; fi && \ + LATEST_VERSION=$(curl -s https://api.github.com/repos/slackhq/nebula/releases/latest | grep -Po '"tag_name": "\K.*?(?=")') && \ + curl -L -o /tmp/nebula.tar.gz "https://github.com/slackhq/nebula/releases/download/${LATEST_VERSION}/nebula-linux-${ARCH}.tar.gz" && \ + mkdir -p /tmp/nebula && \ + tar -xzf /tmp/nebula.tar.gz -C /tmp/nebula && \ + cp /tmp/nebula/nebula /usr/local/bin/ && \ + cp /tmp/nebula/nebula-cert /usr/local/bin/ && \ + chmod +x /usr/local/bin/nebula && \ + chmod +x /usr/local/bin/nebula-cert && \ + rm -rf /tmp/nebula /tmp/nebula.tar.gz + +# Install Python dependencies +RUN pip install --no-cache-dir torch torchvision torchft + +# Set working directory +WORKDIR /app + +# Copy test files +COPY test_multi_node.py /app/ +COPY train_ddp.py /app/ +COPY run_multi_node_test.sh /app/ + +# Make script executable +RUN chmod +x /app/run_multi_node_test.sh + +ENTRYPOINT ["/app/run_multi_node_test.sh"] \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..39b2483 --- /dev/null +++ b/Makefile @@ -0,0 +1,33 @@ +.PHONY: setup test test-local test-docker clean + +# Default target +all: test + +# Setup the environment +setup: + pip install torch torchvision torchft + if ! command -v nebula &> /dev/null; then \ + if [[ "$$OSTYPE" == "darwin"* ]]; then \ + brew install nebula; \ + else \ + echo "Please install Nebula manually: https://github.com/slackhq/nebula/releases"; \ + fi \ + fi + +# Run the local test (requires sudo) +test-local: + sudo ./run_multi_node_test.sh + +# Run the test using Docker +test-docker: + docker-compose build + docker-compose up + +# Run the test (either local or Docker) +test: test-docker + +# Clean up +clean: + rm -rf test_multi_node_env + docker-compose down + sudo pkill nebula || true \ No newline at end of file diff --git a/README.md b/README.md index aa9fff1..37c6230 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,11 @@ # KernelSwarm -## Lightouse details +## Lighthouse details Right now the Lighthouse is running on Digital Ocean on a $4/month droplet. If you'd like to be added please let us know and share your public ssh key. The lighthouse is primarily responsible for keeping track of all nodes in the swarm. - ## How to join the swarm Idea of the setup is that a user @@ -14,7 +13,6 @@ Idea of the setup is that a user 2. We share the client with them 3. They run the client and it connects them to the swarm - ## High level infra details 1. Lighthouse on Digital Ocean 2. Nebula VPN for swarm communication @@ -22,6 +20,64 @@ Idea of the setup is that a user 4. A fault tolerant PyTorch job that is responsible for the actual training 5. Share results in some public dashboard +## Running the multi-node test + +This project includes an end-to-end test for verifying the setup with multiple nodes connecting via Nebula and running distributed training using torchft. + +### Prerequisites + +- Python 3.6+ +- pip +- Nebula VPN (installed via the test script if not present) +- Docker and Docker Compose (for container-based testing) + +### Running the test + +There are two ways to run the test: + +#### 1. Using Docker (recommended) + +This method uses Docker to set up multiple containers, each representing a node in the swarm: + +```bash +# Build and run using Docker Compose +make test-docker +``` + +#### 2. Local testing + +This method simulates multiple nodes on your local machine: + +```bash +# Run the test locally (requires sudo for setting up Nebula interfaces) +make test-local +``` + +### Test options + +The test supports several command-line options: + +```bash +# Run with specific options +sudo ./run_multi_node_test.sh --num-nodes 3 --timeout 600 +``` + +Available options: +- `--num-nodes`: Number of nodes to simulate (default: 2) +- `--torchft-version`: Version of torchft to install (default: latest) +- `--node-ip-prefix`: IP prefix for node addresses (default: 192.168.100.) +- `--lighthouse-ip`: IP address of the lighthouse node (default: 192.168.100.1) +- `--username`: Username for nebula client creation (default: test) +- `--timeout`: Timeout in seconds for test completion (default: 300) + +### Cleaning up + +To clean up after running the tests: + +```bash +make clean +``` + ## TBD ![swarm](./swarm.png) \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..7ebb294 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,30 @@ +version: '3' + +services: + node1: + build: + context: . + dockerfile: Dockerfile + privileged: true # Needed for creating network interfaces + network_mode: "host" + volumes: + - ./:/app + command: ["--num-nodes", "2", "--node-ip-prefix", "192.168.100.", "--lighthouse-ip", "192.168.100.1"] + environment: + - REPLICA_GROUP_ID=0 + - NUM_REPLICA_GROUPS=2 + + node2: + build: + context: . + dockerfile: Dockerfile + privileged: true # Needed for creating network interfaces + network_mode: "host" + volumes: + - ./:/app + command: ["--num-nodes", "2", "--node-ip-prefix", "192.168.100.", "--lighthouse-ip", "192.168.100.1"] + environment: + - REPLICA_GROUP_ID=1 + - NUM_REPLICA_GROUPS=2 + depends_on: + - node1 \ No newline at end of file diff --git a/run_multi_node_test.sh b/run_multi_node_test.sh new file mode 100755 index 0000000..f8870f7 --- /dev/null +++ b/run_multi_node_test.sh @@ -0,0 +1,75 @@ +#!/bin/bash +# Script to run the multi-node test + +set -e + +# Check if script is run with sudo +if [ "$EUID" -ne 0 ]; then + echo "This script must be run with sudo to set up Nebula VPN interfaces." + echo "Please run: sudo $0 $@" + exit 1 +fi + +# Install nebula if not already installed +if ! command -v nebula &> /dev/null; then + echo "Nebula not found. Installing..." + + # Detect OS + if [[ "$OSTYPE" == "linux-gnu"* ]]; then + # For Linux (Ubuntu/Debian) + if command -v apt-get &> /dev/null; then + apt-get update + apt-get install -y curl + + # Download latest nebula release + ARCH=$(uname -m) + if [ "$ARCH" == "x86_64" ]; then + ARCH="amd64" + elif [ "$ARCH" == "aarch64" ]; then + ARCH="arm64" + fi + + LATEST_VERSION=$(curl -s https://api.github.com/repos/slackhq/nebula/releases/latest | grep -Po '"tag_name": "\K.*?(?=")') + curl -L -o /tmp/nebula.tar.gz "https://github.com/slackhq/nebula/releases/download/${LATEST_VERSION}/nebula-linux-${ARCH}.tar.gz" + + mkdir -p /tmp/nebula + tar -xzf /tmp/nebula.tar.gz -C /tmp/nebula + cp /tmp/nebula/nebula /usr/local/bin/ + cp /tmp/nebula/nebula-cert /usr/local/bin/ + + chmod +x /usr/local/bin/nebula + chmod +x /usr/local/bin/nebula-cert + + rm -rf /tmp/nebula /tmp/nebula.tar.gz + else + echo "Unsupported Linux distribution. Please install Nebula manually." + exit 1 + fi + elif [[ "$OSTYPE" == "darwin"* ]]; then + # For macOS + if command -v brew &> /dev/null; then + brew install nebula + else + echo "Homebrew not found. Please install Homebrew and then run: brew install nebula" + exit 1 + fi + else + echo "Unsupported operating system. Please install Nebula manually." + exit 1 + fi +fi + +# Install Python packages if needed +if ! command -v pip3 &> /dev/null; then + if [[ "$OSTYPE" == "linux-gnu"* ]]; then + apt-get update + apt-get install -y python3-pip + elif [[ "$OSTYPE" == "darwin"* ]]; then + echo "pip3 not found. Please install Python 3 and pip3." + exit 1 + fi +fi + +# Run the test +echo "Starting multi-node test..." +python3 test_multi_node.py "$@" \ No newline at end of file diff --git a/test_multi_node.py b/test_multi_node.py new file mode 100644 index 0000000..465fcdb --- /dev/null +++ b/test_multi_node.py @@ -0,0 +1,295 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import argparse +import logging +import os +import subprocess +import sys +import time +from pathlib import Path + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(message)s', + handlers=[logging.StreamHandler(sys.stdout)] +) + +logger = logging.getLogger(__name__) + +def parse_args(): + parser = argparse.ArgumentParser(description='Run multi-node training test') + parser.add_argument('--num-nodes', type=int, default=2, help='Number of nodes to simulate') + parser.add_argument('--torchft-version', type=str, default='latest', help='Version of torchft to install') + parser.add_argument('--node-ip-prefix', type=str, default='192.168.100.', help='IP prefix for node addresses') + parser.add_argument('--lighthouse-ip', type=str, default='192.168.100.1', help='IP address of the lighthouse node') + parser.add_argument('--username', type=str, default='test', help='Username for nebula client creation') + parser.add_argument('--timeout', type=int, default=300, help='Timeout in seconds for test completion') + return parser.parse_args() + +def check_prerequisites(): + """Check if all required tools are installed.""" + required_tools = ['python3', 'pip', 'nebula-cert'] + + for tool in required_tools: + try: + subprocess.run(['which', tool], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + except subprocess.CalledProcessError: + logger.error(f"{tool} not found. Please install it before running this test.") + sys.exit(1) + + logger.info("All prerequisites are met.") + +def setup_virtual_env(venv_path): + """Create and set up a virtual environment for testing.""" + if not venv_path.exists(): + logger.info(f"Creating virtual environment at {venv_path}") + subprocess.run(['python3', '-m', 'venv', str(venv_path)], check=True) + + pip_path = venv_path / 'bin' / 'pip' + + logger.info("Upgrading pip") + subprocess.run([str(pip_path), 'install', '--upgrade', 'pip'], check=True) + + logger.info("Installing required packages") + subprocess.run([str(pip_path), 'install', 'torch', 'torchvision', 'torchft'], check=True) + + return venv_path + +def create_nebula_configs(args, test_dir): + """Create nebula configuration files for each node.""" + nebula_dir = test_dir / 'nebula' + nebula_dir.mkdir(exist_ok=True) + + # Create CA certificate + ca_key = nebula_dir / 'ca.key' + ca_crt = nebula_dir / 'ca.crt' + + if not ca_crt.exists() or not ca_key.exists(): + logger.info("Creating Nebula CA certificate") + subprocess.run(['nebula-cert', 'ca', '-name', 'test-swarm'], cwd=nebula_dir, check=True) + + # Create client configs for each node + node_configs = [] + + for i in range(1, args.num_nodes + 1): + node_dir = nebula_dir / f'node{i}' + node_dir.mkdir(exist_ok=True) + + ip = f"{args.node_ip_prefix}{i+10}" + name = f"{args.username}-device{i}" + + # Sign certificate for this node + if not (node_dir / 'host.crt').exists(): + logger.info(f"Creating certificate for node {i}") + subprocess.run([ + 'nebula-cert', 'sign', + '-name', name, + '-ip', f"{ip}/24", + '-groups', "users", + '-out-crt', str(node_dir / 'host.crt'), + '-out-key', str(node_dir / 'host.key'), + '-ca-crt', str(ca_crt), + '-ca-key', str(ca_key) + ], check=True) + + # Copy CA certificate + with open(ca_crt, 'rb') as f: + ca_content = f.read() + with open(node_dir / 'ca.crt', 'wb') as f: + f.write(ca_content) + + # Create config file + config_content = f""" +pki: + ca: {node_dir}/ca.crt + cert: {node_dir}/host.crt + key: {node_dir}/host.key +static_host_map: + "{args.lighthouse_ip}": ["127.0.0.1:4242"] +lighthouse: + am_lighthouse: {'true' if i == 1 else 'false'} + interval: 60 + hosts: + - "{args.lighthouse_ip}" +listen: + host: 0.0.0.0 + port: {4242 + i - 1} +punchy: + punch: true +tun: + dev: nebula{i} + drop_local_broadcast: false + drop_multicast: false + tx_queue: 500 + mtu: 1300 +logging: + level: info + format: text +firewall: + outbound: + - port: any + proto: any + host: any + inbound: + - port: any + proto: any + host: any +""" + with open(node_dir / 'config.yml', 'w') as f: + f.write(config_content) + + node_configs.append({ + 'dir': node_dir, + 'ip': ip, + 'config': node_dir / 'config.yml', + 'is_lighthouse': i == 1 + }) + + return node_configs + +def start_nebula_nodes(node_configs): + """Start nebula processes for each node.""" + processes = [] + + for node in node_configs: + logger.info(f"Starting Nebula for node at {node['ip']}") + process = subprocess.Popen( + ['sudo', 'nebula', '-config', str(node['config'])], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + processes.append(process) + + # Give some time for the process to start + time.sleep(2) + + # Check if process is still running + if process.poll() is not None: + out, err = process.communicate() + logger.error(f"Failed to start Nebula: {err.decode()}") + for p in processes: + p.terminate() + sys.exit(1) + + # Wait a bit for the network to establish + logger.info("Waiting for Nebula network to establish...") + time.sleep(10) + + return processes + +def stop_nebula_nodes(processes): + """Stop all nebula processes.""" + for process in processes: + process.terminate() + process.wait() + + # Additional cleanup to make sure all nebula instances are stopped + subprocess.run(['sudo', 'pkill', 'nebula'], check=False) + + logger.info("All Nebula processes stopped.") + +def run_distributed_training(args, venv_path, node_configs): + """Run the distributed training across nodes.""" + python_path = venv_path / 'bin' / 'python' + + # Prepare environment variables + env = os.environ.copy() + env["NUM_REPLICA_GROUPS"] = str(args.num_nodes) + + processes = [] + for i, node in enumerate(node_configs): + node_env = env.copy() + node_env["REPLICA_GROUP_ID"] = str(i) + node_env["NEBULA_IP"] = node['ip'] + + logger.info(f"Starting training on node {i+1} with IP {node['ip']}") + + cmd = [ + str(python_path), + "train_ddp.py" + ] + + process = subprocess.Popen( + cmd, + env=node_env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + processes.append((process, f"node{i+1}")) + + # Wait for completion or timeout + start_time = time.time() + completed = [False] * len(processes) + outputs = [""] * len(processes) + + while not all(completed) and time.time() - start_time < args.timeout: + for i, (process, name) in enumerate(processes): + if not completed[i] and process.poll() is not None: + stdout, stderr = process.communicate() + completed[i] = True + outputs[i] = f"===== {name} OUTPUT =====\n{stdout}\n===== {name} ERROR =====\n{stderr}\n" + if process.returncode != 0: + logger.error(f"Process on {name} failed with code {process.returncode}") + else: + logger.info(f"Process on {name} completed successfully") + + time.sleep(1) + + # Kill any remaining processes + for i, (process, name) in enumerate(processes): + if not completed[i]: + logger.warning(f"Process on {name} did not complete within timeout, terminating") + process.terminate() + stdout, stderr = process.communicate() + outputs[i] = f"===== {name} OUTPUT (TIMEOUT) =====\n{stdout}\n===== {name} ERROR =====\n{stderr}\n" + + # Print all outputs + for output in outputs: + print(output) + + # Check if all processes completed successfully + success = all(completed) + return success + +def main(): + args = parse_args() + + # Check prerequisites + check_prerequisites() + + # Create test directory + test_dir = Path.cwd() / 'test_multi_node_env' + test_dir.mkdir(exist_ok=True) + + # Setup virtual environment + venv_path = setup_virtual_env(test_dir / 'venv') + + # Create nebula configs + node_configs = create_nebula_configs(args, test_dir) + + nebula_processes = [] + try: + # Start nebula nodes + nebula_processes = start_nebula_nodes(node_configs) + + # Run distributed training + success = run_distributed_training(args, venv_path, node_configs) + + if success: + logger.info("Multi-node test completed successfully") + return 0 + else: + logger.error("Multi-node test failed") + return 1 + finally: + # Clean up nebula nodes + stop_nebula_nodes(nebula_processes) + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/train_ddp.py b/train_ddp.py new file mode 100644 index 0000000..ea18754 --- /dev/null +++ b/train_ddp.py @@ -0,0 +1,100 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import logging +import os +import sys +from datetime import timedelta + +import torch +import torch.nn.functional as F +import torchvision +import torchvision.transforms as transforms +from torch import nn, optim +from torch.distributed.elastic.multiprocessing.errors import record + +from torchft import ( + DistributedDataParallel, + Manager, + ProcessGroupBabyNCCL, + ProcessGroupGloo, +) + +logging.basicConfig(level=logging.INFO) + + +@record +def main() -> None: + REPLICA_GROUP_ID = int(os.environ.get("REPLICA_GROUP_ID", 0)) + NUM_REPLICA_GROUPS = int(os.environ.get("NUM_REPLICA_GROUPS", 2)) + + def load_state_dict(state_dict): + m.load_state_dict(state_dict["model"]) + + def state_dict(): + return { + "model": m.state_dict(), + } + + device = "cuda" if torch.cuda.is_available() else "cpu" + pg = ( + ProcessGroupBabyNCCL( + timeout=timedelta(seconds=5), + ) + if torch.cuda.is_available() + else ProcessGroupGloo(timeout=timedelta(seconds=5)) + ) + + manager = Manager( + pg=pg, + min_replica_size=1, + load_state_dict=load_state_dict, + state_dict=state_dict, + replica_id=f"allreduce_example_{REPLICA_GROUP_ID}", + timeout=timedelta(seconds=10), + ) + + class Net(nn.Module): + def __init__(self): + super().__init__() + self.conv1 = nn.Conv2d(3, 6, 5) + self.pool = nn.MaxPool2d(2, 2) + self.conv2 = nn.Conv2d(6, 16, 5) + self.fc1 = nn.Linear(16 * 5 * 5, 120) + self.fc2 = nn.Linear(120, 84) + self.fc3 = nn.Linear(84, 10) + + def forward(self, x): + x = self.pool(F.relu(self.conv1(x))) + x = self.pool(F.relu(self.conv2(x))) + x = torch.flatten(x, 1) # flatten all dimensions except batch + x = F.relu(self.fc1(x)) + x = F.relu(self.fc2(x)) + x = self.fc3(x) + return x + + m = Net().to(device) + m = DistributedDataParallel(manager, m) + + print(f"Model created: {m}") + + # Create a dummy tensor for allreduce + dummy_tensor = torch.randn(100, device=device) + print(f"Rank {manager.pg.rank()}: Performing allreduce operation") + + # Call allreduce instead of training + result = manager.allreduce(dummy_tensor) + + print(f"Rank {manager.pg.rank()}: AllReduce completed successfully") + print(f"Result tensor shape: {result.shape}") + + # Optionally you can perform additional operations with the result + # For example, calculate and print mean of the reduced tensor + print(f"Mean of reduced tensor: {result.mean().item()}") + + +if __name__ == "__main__": + main() \ No newline at end of file