diff --git a/.gitignore b/.gitignore index 90868ab0..2205be7a 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ __pycache__/ *.egg-info/ # Virtual environments +.venv/ .test_venv/ .cvs_venv/ .ruff_venv/ diff --git a/cvs/input/config_file/aorta/aorta_benchmark.yaml b/cvs/input/config_file/aorta/aorta_benchmark.yaml index 17c1360c..73cd1daa 100644 --- a/cvs/input/config_file/aorta/aorta_benchmark.yaml +++ b/cvs/input/config_file/aorta/aorta_benchmark.yaml @@ -68,6 +68,42 @@ analysis: gemm_script: scripts/gemm_analysis/run_tracelens_analysis.sh skip_if_exists: false +# Multi-node disaggregated launch. +# +# When the cluster file has more than one node entry, the runner launches a +# `torchrun` rank-group on every node in parallel (one per node), all +# rendezvous-ing on the head node. Mirrors aorta's own +# scripts/multi_node/local_launch.sh pattern, so a single cluster.json with N +# host entries is enough -- you no longer need N cluster files. +# +# `master_launch_mode: auto` keeps current single-node behavior (delegates to +# experiment_script) for 1-node clusters and switches to disaggregated +# torchrun for >1-node clusters. Force one or the other with `script` / +# `torchrun` if you want to override the auto-detection. +# +# When master_launch_mode resolves to torchrun, `experiment_script` is NOT +# used -- the runner builds: +# torchrun --nnodes --node_rank --nproc_per_node

+# --master_addr --master_port +# / +# --config / +# [--override training_overrides...] +# +# extra_env is exported inside each container before torchrun -- use it for +# transport-specific knobs that depend on the cluster (NCCL_SOCKET_IFNAME, +# NCCL_IB_HCA, NCCL_IB_GID_INDEX, ...). On a single ethernet network you can +# usually leave it empty. +multi_node: + master_launch_mode: auto + # nproc_per_node: 8 # defaults to gpus_per_node + # master_port: 29500 # default: pick a free ephemeral port + # master_addr: 10.0.0.1 # default: head node from cluster.json + train_script: train.py + extra_torchrun_args: [] + extra_train_args: [] + extra_env: {} + collect_traces: true + # Expected results: default thresholds for gfx942 (e.g. MI300). Change these as per your # testing config (GPU, node count, workload); see docs/reference/configuration-files/aorta.rst. # Tuned for host raw-trace parsing; use stricter values (e.g. min_compute_ratio 0.5+) with TraceLens Excel reports. diff --git a/cvs/parsers/schemas.py b/cvs/parsers/schemas.py index 1b7b5d96..dc0f8f67 100644 --- a/cvs/parsers/schemas.py +++ b/cvs/parsers/schemas.py @@ -375,6 +375,96 @@ class AortaAnalysisConfigFile(BaseModel): ) +class AortaMultiNodeConfigFile(BaseModel): + """ + Schema for the optional ``multi_node`` section in aorta_benchmark.yaml. + + When the cluster file contains more than one node, the runner launches a + disaggregated ``torchrun`` invocation on every node (one per ``node_dict`` + entry), rendezvous-ing on the head node. This block tunes that path. + + Single-node clusters ignore this block; ``master_launch_mode`` defaults to + ``auto`` which means: ``script`` (current behavior, single-node) when the + cluster has one node, ``torchrun`` (multi-node disaggregated) when it has + more than one. + """ + + model_config = ConfigDict(extra="forbid") + + master_launch_mode: str = Field( + default="auto", + description=( + "How the experiment is launched on each node. 'auto' picks 'script' " + "for single-node clusters and 'torchrun' for multi-node clusters. " + "'script' always uses the configured experiment_script (single-node only). " + "'torchrun' always builds a multi-node torchrun command and ignores " + "experiment_script." + ), + ) + nproc_per_node: Optional[int] = Field( + default=None, + ge=1, + description=( + "Processes (GPUs) per node passed to torchrun --nproc_per_node. " + "Defaults to the top-level gpus_per_node when unset." + ), + ) + master_port: Optional[int] = Field( + default=None, + ge=1024, + le=65535, + description=( + "Port used for torchrun rendezvous (--master_port). When unset the " + "runner picks a free ephemeral port on the head node." + ), + ) + master_addr: Optional[str] = Field( + default=None, + description=( + "Override the master address (--master_addr). Defaults to the head node hostname/IP from the cluster file." + ), + ) + train_script: str = Field( + default="train.py", + description=( + "Path to the Aorta training entry script relative to aorta_path. " + "Used when master_launch_mode resolves to 'torchrun'." + ), + ) + extra_torchrun_args: List[str] = Field( + default_factory=list, + description="Additional CLI flags appended to the torchrun command.", + ) + extra_train_args: List[str] = Field( + default_factory=list, + description="Additional CLI flags appended to train.py after --config.", + ) + extra_env: Dict[str, str] = Field( + default_factory=dict, + description=( + "Extra environment variables to export inside the container before " + "torchrun. Useful for NCCL_SOCKET_IFNAME, NCCL_IB_HCA, " + "NCCL_IB_GID_INDEX, and similar transport-tuning knobs." + ), + ) + collect_traces: bool = Field( + default=True, + description=( + "When true, copy each node's torch_profiler artifacts back to the " + "head node under /combined_traces/node_/ so the " + "host parsers see one unified trace tree." + ), + ) + + @field_validator('master_launch_mode') + @classmethod + def validate_launch_mode(cls, v: str) -> str: + allowed = {"auto", "script", "torchrun"} + if v not in allowed: + raise ValueError(f"master_launch_mode must be one of {sorted(allowed)}, got {v!r}") + return v + + class AortaBenchmarkConfigFile(BaseModel): """ Schema for the entire aorta_benchmark.yaml configuration file. @@ -444,6 +534,16 @@ class AortaBenchmarkConfigFile(BaseModel): default_factory=AortaAnalysisConfigFile, description="Post-benchmark analysis configuration" ) + # Multi-node disaggregated launch (one container + torchrun rank per node) + multi_node: AortaMultiNodeConfigFile = Field( + default_factory=AortaMultiNodeConfigFile, + description=( + "Multi-node launch configuration. Used when the cluster file lists " + "more than one node; ignored for single-node clusters unless " + "master_launch_mode is forced to 'torchrun'." + ), + ) + @field_validator('aorta_path') @classmethod def validate_aorta_path_not_placeholder(cls, v: str) -> str: @@ -483,6 +583,11 @@ def validate_paths_exist(self) -> List[str]: if not exp_script.exists(): errors.append(f"experiment_script does not exist: {exp_script}") + if self.multi_node.master_launch_mode == "torchrun": + train_script_path = aorta / self.multi_node.train_script + if not train_script_path.exists(): + errors.append(f"multi_node.train_script does not exist: {train_script_path}") + # Check analysis scripts if enabled if self.analysis.enable_tracelens: tracelens_script = aorta / self.analysis.tracelens_script diff --git a/cvs/runners/aorta.py b/cvs/runners/aorta.py index a0b3afea..0a0791eb 100644 --- a/cvs/runners/aorta.py +++ b/cvs/runners/aorta.py @@ -11,6 +11,8 @@ from __future__ import annotations import logging +import shlex +import socket import subprocess import time from concurrent.futures import ThreadPoolExecutor, as_completed @@ -38,6 +40,19 @@ log = logging.getLogger(__name__) +def combined_traces_in(path: Path, root: Path) -> bool: + """Return True if ``path`` lives under ``root/combined_traces``. + + Used to skip already-collected traces when rescanning the head node so + repeated runs do not nest combined_traces inside itself. + """ + try: + rel = path.relative_to(root) + except ValueError: + return False + return rel.parts and rel.parts[0] == "combined_traces" + + @dataclass class RcclConfig: """RCCL build and runtime configuration.""" @@ -94,6 +109,26 @@ class AortaAnalysisConfig: skip_if_exists: bool = False +@dataclass +class AortaMultiNodeConfig: + """ + Multi-node disaggregated launch configuration. + + See ``AortaMultiNodeConfigFile`` in ``cvs/parsers/schemas.py`` for the + YAML-facing description of each field. + """ + + master_launch_mode: str = "auto" + nproc_per_node: Optional[int] = None + master_port: Optional[int] = None + master_addr: Optional[str] = None + train_script: str = "train.py" + extra_torchrun_args: List[str] = field(default_factory=list) + extra_train_args: List[str] = field(default_factory=list) + extra_env: Dict[str, str] = field(default_factory=dict) + collect_traces: bool = True + + @dataclass class AortaConfig(RunConfig): """ @@ -131,6 +166,9 @@ class AortaConfig(RunConfig): # Analysis configuration (use Aorta's built-in scripts) analysis: AortaAnalysisConfig = field(default_factory=AortaAnalysisConfig) + # Multi-node disaggregated launch configuration + multi_node: AortaMultiNodeConfig = field(default_factory=AortaMultiNodeConfig) + # Scripts to execute (relative to container mount) build_script: str = "scripts/build_rccl.sh" experiment_script: str = "scripts/rccl_exp.sh" @@ -196,6 +234,12 @@ def validate_config(self) -> List[str]: if not exp_script.exists(): errors.append(f"Experiment script does not exist: {exp_script}") + resolved_mode = self._resolve_launch_mode() + if resolved_mode == "torchrun": + train_script = self.config.aorta_path / self.config.multi_node.train_script + if not train_script.exists(): + errors.append(f"multi_node.train_script does not exist: {train_script}") + return errors def _connect_docker(self, node: str) -> docker.DockerClient: @@ -243,6 +287,156 @@ def _cleanup_existing_containers(self, client: docker.DockerClient, node: str): except Exception as e: log.warning(f"Error cleaning up container on {node}: {e}") + def _collect_multi_node_traces(self, nodes: List[str]) -> Optional[Path]: + """ + Collect torch_profiler trees from every node into a single tree on the + head node and return the parent directory. + + Layout:: + + /combined_traces/node_//torch_profiler/... + + The head node is rsynced locally; non-head nodes are pulled with rsync + over SSH (``rsync -az`` with the configured ``priv_key_file``). When + rsync is unavailable we fall back to ``scp -r``. Failures on + individual nodes are logged but do not abort the overall collection; + the returned directory is the best-effort union. + + Returns ``None`` only when nothing could be collected at all. + """ + head = self.head_node + combined_root = self.config.aorta_path / "combined_traces" + try: + combined_root.mkdir(parents=True, exist_ok=True) + except OSError as e: + log.error(f"Cannot create {combined_root}: {e}") + return None + + any_collected = False + for rank, node in enumerate(nodes): + dest = combined_root / f"node_{rank}" + dest.mkdir(parents=True, exist_ok=True) + + try: + # First pass: copy from the orchestrator's local filesystem. This handles + # the head==orchestrator case and any NFS-shared aorta_path. + found = False + if node == head: + found = self._copy_local_torch_profilers(self.config.aorta_path, dest) + # Pull over SSH for non-head nodes, and also for the head when the + # orchestrator's local fs didn't actually have the head's traces (i.e. + # orchestrator is a separate login node from the head). + if not found: + found = self._copy_remote_torch_profilers(node, dest) + if found: + any_collected = True + log.info(f"Collected traces for node_{rank} ({node}) -> {dest}") + else: + log.warning(f"No torch_profiler artifacts found for node {node} (rank {rank})") + except Exception as e: + log.warning(f"Failed to collect traces for node {node} (rank {rank}): {e}") + + return combined_root if any_collected else None + + def _copy_local_torch_profilers(self, src_root: Path, dest: Path) -> bool: + """ + Copy any ``torch_profiler/`` trees under ``src_root`` into ``dest``, + preserving the relative path. Used for the head node. + """ + import shutil + + copied = False + for tp in src_root.glob("**/torch_profiler"): + if not tp.is_dir(): + continue + if combined_traces_in(tp, src_root): + continue + rel = tp.relative_to(src_root) + target = dest / rel + target.parent.mkdir(parents=True, exist_ok=True) + try: + if target.exists(): + shutil.rmtree(target) + shutil.copytree(tp, target, symlinks=True, dirs_exist_ok=False) + copied = True + except OSError as e: + log.warning(f"Local copy {tp} -> {target} failed: {e}") + return copied + + def _copy_remote_torch_profilers(self, node: str, dest: Path) -> bool: + """ + Pull every ``torch_profiler/`` tree under the remote ``aorta_path`` to + ``dest`` using rsync over SSH. Falls back to ``scp -r`` if rsync is + unavailable. + """ + ssh_user = self.config.username + remote_root = str(self.config.aorta_path) + + ssh_opts = ["-o", "StrictHostKeyChecking=no", "-o", "BatchMode=yes", "-o", "ConnectTimeout=15"] + if self.config.pkey: + ssh_opts.extend(["-i", self.config.pkey]) + ssh_cmd = "ssh " + " ".join(shlex.quote(p) for p in ssh_opts) + + list_cmd = [ + "ssh", + *ssh_opts, + f"{ssh_user}@{node}", + f"find {shlex.quote(remote_root)} -type d -name torch_profiler -not -path '*/combined_traces/*'", + ] + try: + r = subprocess.run(list_cmd, capture_output=True, text=True, timeout=120) + except (subprocess.TimeoutExpired, FileNotFoundError) as e: + log.warning(f"Listing remote torch_profiler dirs on {node} failed: {e}") + return False + + if r.returncode != 0: + log.warning(f"find on {node} returned {r.returncode}: {r.stderr.strip()}") + return False + + remote_paths = [p.strip() for p in r.stdout.splitlines() if p.strip()] + if not remote_paths: + return False + + copied = False + rsync_available = ( + subprocess.run( + ["bash", "-lc", "command -v rsync >/dev/null"], + capture_output=True, + ).returncode + == 0 + ) + for rp in remote_paths: + try: + rel = Path(rp).relative_to(remote_root) + except ValueError: + rel = Path(Path(rp).name) + target_parent = dest / rel.parent + target_parent.mkdir(parents=True, exist_ok=True) + + if rsync_available: + cmd = [ + "rsync", + "-az", + "-e", + ssh_cmd, + f"{ssh_user}@{node}:{rp}/", + str(target_parent / rel.name) + "/", + ] + else: + cmd = ["scp", "-r", *ssh_opts, f"{ssh_user}@{node}:{rp}", str(target_parent)] + + log.info(f"[{node}] copying {rp} -> {target_parent / rel.name}") + try: + rr = subprocess.run(cmd, capture_output=True, text=True, timeout=1800) + if rr.returncode == 0: + copied = True + else: + log.warning(f"copy of {rp} from {node} failed (exit {rr.returncode}): {rr.stderr.strip()}") + except (subprocess.TimeoutExpired, FileNotFoundError) as e: + log.warning(f"copy of {rp} from {node} failed: {e}") + + return copied + def _get_remote_uid_gid(self, node: str) -> Optional[Tuple[int, int]]: """ Get UID and GID for config.username on the given node via SSH. @@ -308,7 +502,8 @@ def _launch_container(self, client: docker.DockerClient, node: str) -> Container volumes=volumes, devices=devices, working_dir=self.config.container_mount_path, - group_add=["video"], + user="root", + group_add=["video", "render"], cap_add=["SYS_PTRACE"], security_opt=["seccomp=unconfined"], ulimits=[ @@ -553,12 +748,155 @@ def setup(self) -> bool: log.info(f"All {num_nodes} node(s) set up successfully") return True + def _resolve_launch_mode(self) -> str: + """ + Resolve ``multi_node.master_launch_mode`` to a concrete mode. + + - ``script`` keeps the legacy single-node behavior (delegates to + ``experiment_script``); fails if the cluster has more than one node. + - ``torchrun`` builds a multi-node ``torchrun`` command on every node, + rendezvous-ing on the head node. + - ``auto`` picks ``script`` for single-node, ``torchrun`` for >1 node. + """ + mode = self.config.multi_node.master_launch_mode + if mode == "auto": + return "script" if len(self.config.nodes) <= 1 else "torchrun" + return mode + + def _pick_master_port(self) -> int: + """ + Return ``multi_node.master_port`` if set, otherwise a free ephemeral + port on the orchestrator host. + + The bound socket is closed before the port is returned, so there is a + small TOCTOU window. Operators who care can pin ``master_port`` + explicitly in the config. + """ + configured = self.config.multi_node.master_port + if configured: + return int(configured) + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return int(s.getsockname()[1]) + + def _build_base_env(self) -> Dict[str, str]: + """Build the env dict shared by every node's launch.""" + env = self.config.environment.to_dict() + + rccl_path = self.config.rccl.build_path + env["LD_LIBRARY_PATH"] = ( + f"{rccl_path}/build/release/:/opt/rocm/lib:/opt/rocm/lib64:" + f"/opt/openmpi/lib:/opt/rccl-tests/build:$LD_LIBRARY_PATH" + ) + env["rccl_path"] = rccl_path + + if self.config.training_overrides: + # aorta train.py exposes `--override` with `nargs="*"`; multiple + # `--override` groups collapse to the last group's values. Emit a + # single `--override` followed by all key=value tokens so that + # downstream legacy launch scripts also forward them correctly. + tokens = " ".join(f'{key}="{value}"' for key, value in self.config.training_overrides.items()) + env["AORTA_OVERRIDE_ARGS"] = f"--override {tokens}" + + for k, v in self.config.multi_node.extra_env.items(): + env[str(k)] = str(v) + + return env + + def _build_torchrun_command( + self, + *, + node_rank: int, + nnodes: int, + master_addr: str, + master_port: int, + nproc_per_node: int, + ) -> str: + """ + Build the ``torchrun ... train.py --config `` shell command run + inside each node's container. + + Mirrors ``scripts/multi_node/local_launch.sh`` from Aorta's own + repository: one rank-group per node, rendezvous on ``master_addr``. + Training overrides from the config are appended via ``--override`` so + operators get the same knobs as the single-node ``script`` mode. + """ + mn = self.config.multi_node + mount = self.config.container_mount_path + config_path = f"{mount}/{self.config.base_config}" + train_script = f"{mount}/{mn.train_script}" + + parts: List[str] = [ + "torchrun", + f"--nnodes={nnodes}", + f"--node_rank={node_rank}", + f"--nproc_per_node={nproc_per_node}", + f"--master_addr={shlex.quote(str(master_addr))}", + f"--master_port={int(master_port)}", + ] + parts.extend(str(a) for a in mn.extra_torchrun_args) + parts.append(shlex.quote(train_script)) + parts.extend(["--config", shlex.quote(config_path)]) + # aorta's train.py uses `argparse(--override, nargs="*")` so emitting + # one `--override` per key only keeps the LAST group's values. Pack + # all key=value tokens behind a single `--override` so they all stick. + if self.config.training_overrides: + parts.append("--override") + for key, value in self.config.training_overrides.items(): + parts.append(f"{key}={shlex.quote(str(value))}") + parts.extend(str(a) for a in mn.extra_train_args) + + return "bash -lc " + shlex.quote(" ".join(parts)) + + def _run_single_node( + self, + *, + node: str, + node_rank: int, + launch_cmd: str, + env: Dict[str, str], + ) -> Tuple[str, int, str]: + """ + Execute ``launch_cmd`` inside ``node``'s container. + + Returns ``(node, exit_code, output)``. Used as the parallel worker for + multi-node launches. + """ + container = self._containers.get(node) + if container is None: + return (node, -1, f"No container found for {node}") + + log.info(f"[node {node_rank}/{node}] Launching: {launch_cmd[:200]}...") + exit_code, output = self._exec_in_container( + container, + launch_cmd, + environment=env, + workdir=self.config.container_mount_path, + stream=True, + ) + log.info(f"[node {node_rank}/{node}] Exit code: {exit_code}") + return (node, exit_code, output) + def run(self, **kwargs) -> RunResult: """ Execute the Aorta benchmark. - Runs the experiment script inside the container and collects - profiling artifacts. + Two execution modes are supported, selected by + ``multi_node.master_launch_mode``: + + - ``script`` (single-node, legacy): runs ``experiment_script`` inside + the head node's container. + - ``torchrun`` (disaggregated multi-node): launches ``torchrun`` on + every node in parallel with proper ``--nnodes/--node_rank/ + --master_addr/--master_port`` rendezvous, mirroring Aorta's + ``scripts/multi_node/local_launch.sh`` pattern. ``auto`` picks + ``script`` for single-node clusters and ``torchrun`` for + multi-node clusters. + + Profiling artifacts (``torch_profiler/`` trees) from every node are + collected into ``/combined_traces/node_/`` on the + head node when running multi-node, and exposed via the + ``torch_traces`` artifact for downstream parsers. """ start_time = time.time() stdout_dict: Dict[str, str] = {} @@ -567,103 +905,177 @@ def run(self, **kwargs) -> RunResult: artifacts: Dict[str, Path] = {} try: - # For now, run on head node only (single node v1) - node = self.head_node - container = self._containers.get(node) + launch_mode = self._resolve_launch_mode() + nodes = list(self.config.nodes) - if not container: + if launch_mode == "script" and len(nodes) > 1: return RunResult( status=RunStatus.FAILED, start_time=start_time, end_time=time.time(), - error_message=f"No container found for {node}", + error_message=( + "master_launch_mode='script' but cluster has " + f"{len(nodes)} nodes; either set master_launch_mode='torchrun' " + "or 'auto' in the multi_node block, or shrink the cluster file." + ), ) - # Build environment with computed values - env = self.config.environment.to_dict() - - # Add RCCL library path - rccl_path = self.config.rccl.build_path - env["LD_LIBRARY_PATH"] = ( - f"{rccl_path}/build/release/:/opt/rocm/lib:/opt/rocm/lib64:" - f"/opt/openmpi/lib:/opt/rccl-tests/build:$LD_LIBRARY_PATH" - ) - env["rccl_path"] = rccl_path - - # Build override arguments if any - override_args = "" - if self.config.training_overrides: - for key, value in self.config.training_overrides.items(): - override_args += f' --override {key}="{value}"' - - # Execute experiment script with streaming output for real-time feedback - # Note: override_args is passed via environment if the script supports it - if override_args: - env["AORTA_OVERRIDE_ARGS"] = override_args.strip() - log.info(f"Training overrides: {override_args.strip()}") - - # Pass the base config file to the experiment script - # launch_rocm.sh expects: CONFIG=${1:-default.yaml} - config_path = f"{self.config.container_mount_path}/{self.config.base_config}" - exp_cmd = f"bash {self.config.container_mount_path}/{self.config.experiment_script} {config_path}" - log.info(f"Running experiment: {exp_cmd}") - log.info("Streaming output (this may take several minutes)...") + log.info(f"Launch mode: {launch_mode}; nodes={len(nodes)}; head_node={self.head_node}") + + base_env = self._build_base_env() + if base_env.get("AORTA_OVERRIDE_ARGS"): + log.info(f"Training overrides: {base_env['AORTA_OVERRIDE_ARGS']}") + + if launch_mode == "script": + node = self.head_node + container = self._containers.get(node) + if not container: + return RunResult( + status=RunStatus.FAILED, + start_time=start_time, + end_time=time.time(), + error_message=f"No container found for {node}", + ) - exit_code, output = self._exec_in_container( - container, - exp_cmd, - environment=env, - stream=True, # Stream output for real-time feedback - ) + config_path = f"{self.config.container_mount_path}/{self.config.base_config}" + exp_cmd = f"bash {self.config.container_mount_path}/{self.config.experiment_script} {config_path}" + log.info(f"Running experiment: {exp_cmd}") + log.info("Streaming output (this may take several minutes)...") - stdout_dict[node] = output - exit_codes[node] = exit_code + exit_code, output = self._exec_in_container( + container, + exp_cmd, + environment=base_env, + stream=True, + ) + stdout_dict[node] = output + exit_codes[node] = exit_code - if exit_code != 0: - log.error(f"Experiment failed on {node} with exit code {exit_code}") - return RunResult( - status=RunStatus.FAILED, - start_time=start_time, - end_time=time.time(), - stdout=stdout_dict, - exit_codes=exit_codes, - error_message=f"Experiment exited with code {exit_code}", + if exit_code != 0: + log.error(f"Experiment failed on {node} with exit code {exit_code}") + return RunResult( + status=RunStatus.FAILED, + start_time=start_time, + end_time=time.time(), + stdout=stdout_dict, + exit_codes=exit_codes, + error_message=f"Experiment exited with code {exit_code}", + ) + else: + mn = self.config.multi_node + nnodes = len(nodes) + nproc_per_node = mn.nproc_per_node or self.config.gpus_per_node + master_addr = mn.master_addr or self.head_node + master_port = self._pick_master_port() + + log.info( + f"Disaggregated launch: nnodes={nnodes}, " + f"nproc_per_node={nproc_per_node}, " + f"master={master_addr}:{master_port}" ) + futures = {} + with ThreadPoolExecutor(max_workers=max(1, nnodes)) as executor: + for rank, node in enumerate(nodes): + cmd = self._build_torchrun_command( + node_rank=rank, + nnodes=nnodes, + master_addr=master_addr, + master_port=master_port, + nproc_per_node=nproc_per_node, + ) + fut = executor.submit( + self._run_single_node, + node=node, + node_rank=rank, + launch_cmd=cmd, + env=base_env, + ) + futures[fut] = (rank, node) + + for fut in as_completed(futures): + rank, node = futures[fut] + try: + n, ec, out = fut.result() + except Exception as e: + log.exception(f"Node {node} (rank {rank}) raised: {e}") + stdout_dict[node] = str(e) + exit_codes[node] = -1 + continue + stdout_dict[n] = out + exit_codes[n] = ec + + failed = {n: c for n, c in exit_codes.items() if c != 0} + if failed: + log.error(f"Disaggregated run failed on {len(failed)}/{nnodes} nodes: {failed}") + return RunResult( + status=RunStatus.FAILED, + start_time=start_time, + end_time=time.time(), + stdout=stdout_dict, + exit_codes=exit_codes, + error_message=(f"Disaggregated experiment failed on nodes: {sorted(failed.keys())}"), + ) + + if mn.collect_traces: + combined = self._collect_multi_node_traces(nodes) + if combined is not None: + artifacts["torch_traces"] = combined + log.info(f"Combined per-node traces collected at {combined}") + # Find torch_profiler directory - Aorta saves traces to output_dir/torch_profiler # The output_dir is configured in the YAML config (e.g., "overlap_debug_repro") # We search for the most recent torch_profiler directory nch = self.config.environment.NCCL_MAX_NCHANNELS compute_ch = 256 - nch - trace_dir = None - output_dir = None + trace_dir: Optional[Path] = None + output_dir: Optional[Path] = None + trace_mtime: float = -1.0 - # Search for torch_profiler directories in aorta_path (handles nested dirs like artifacts/*/torch_profiler) + if "torch_traces" in artifacts: + trace_dir = artifacts["torch_traces"] + output_dir = trace_dir.parent + # Multi-node combined_traces should win unless a fresher single-node tree + # is discovered below; seed mtime from this tree so the comparison is valid. + try: + latest_file = max( + trace_dir.glob("**/*"), + key=lambda p: p.stat().st_mtime if p.is_file() else 0, + default=None, + ) + if latest_file is not None and latest_file.is_file(): + trace_mtime = latest_file.stat().st_mtime + else: + trace_mtime = trace_dir.stat().st_mtime + except (ValueError, OSError): + trace_mtime = trace_dir.stat().st_mtime + + # Search for torch_profiler directories in aorta_path (handles nested dirs like artifacts/*/torch_profiler). + # Skip anything inside the combined_traces tree we just collected so the + # original (older) per-node copies don't shadow the consolidated set. + combined_root = self.config.aorta_path / "combined_traces" for candidate in self.config.aorta_path.glob("**/torch_profiler"): - if candidate.is_dir(): - # Use the most recently modified one (check mtime of rank subdirs or files inside) - try: - # Get mtime of most recent file in the directory - latest_file = max( - candidate.glob("**/*"), key=lambda p: p.stat().st_mtime if p.is_file() else 0, default=None - ) - candidate_mtime = ( - latest_file.stat().st_mtime - if latest_file and latest_file.is_file() - else candidate.stat().st_mtime - ) - except (ValueError, OSError): - candidate_mtime = candidate.stat().st_mtime - - if trace_dir is None: - trace_dir = candidate - output_dir = candidate.parent - trace_mtime = candidate_mtime - elif candidate_mtime > trace_mtime: - trace_dir = candidate - output_dir = candidate.parent - trace_mtime = candidate_mtime + if not candidate.is_dir(): + continue + if combined_traces_in(candidate, combined_root): + continue + try: + latest_file = max( + candidate.glob("**/*"), key=lambda p: p.stat().st_mtime if p.is_file() else 0, default=None + ) + candidate_mtime = ( + latest_file.stat().st_mtime + if latest_file and latest_file.is_file() + else candidate.stat().st_mtime + ) + except (ValueError, OSError): + candidate_mtime = candidate.stat().st_mtime + + if trace_dir is None or candidate_mtime > trace_mtime: + trace_dir = candidate + output_dir = candidate.parent + trace_mtime = candidate_mtime # Required artifact for host-side parsing: torch_traces (parse runs on host, not in container) if trace_dir and trace_dir.exists(): @@ -684,9 +1096,17 @@ def run(self, **kwargs) -> RunResult: # Optional container_analysis_path: run TraceLens in container only if enabled and deps present. # Parsing/validation use host venv by default; container reports are consumed when present. - if self.config.analysis.enable_tracelens and trace_dir and trace_dir.exists(): + # In multi-node mode the head node's container is used; the analysis scripts + # operate on traces under aorta_path which (for collected traces) is on the head node. + analysis_container = self._containers.get(self.head_node) + if ( + self.config.analysis.enable_tracelens + and trace_dir + and trace_dir.exists() + and analysis_container is not None + ): log.info("Container TraceLens analysis (optional): attempting in-container report generation") - analysis_result = self._run_tracelens_analysis(container, output_dir) + analysis_result = self._run_tracelens_analysis(analysis_container, output_dir) if analysis_result: artifacts["tracelens_analysis"] = analysis_result log.info(f"Container TraceLens analysis completed: {analysis_result}") @@ -694,16 +1114,23 @@ def run(self, **kwargs) -> RunResult: log.warning("Container TraceLens skipped or failed; host will parse raw traces") # Run GEMM analysis if enabled (optional, same as TraceLens) - if self.config.analysis.enable_gemm_analysis and trace_dir and trace_dir.exists(): - gemm_result = self._run_gemm_analysis(container, output_dir) + if ( + self.config.analysis.enable_gemm_analysis + and trace_dir + and trace_dir.exists() + and analysis_container is not None + ): + gemm_result = self._run_gemm_analysis(analysis_container, output_dir) if gemm_result: artifacts["gemm_analysis"] = gemm_result log.info(f"GEMM analysis completed: {gemm_result}") - # Also collect training logs - log_file = self.config.aorta_path / f"training_{node}.log" - if log_file.exists(): - artifacts["training_log"] = log_file + # Also collect training logs (best-effort, single-node legacy path) + for log_node in self.config.nodes: + log_file = self.config.aorta_path / f"training_{log_node}.log" + if log_file.exists(): + artifacts.setdefault("training_log", log_file) + break return RunResult( status=RunStatus.COMPLETED, @@ -718,6 +1145,7 @@ def run(self, **kwargs) -> RunResult: "gpus_per_node": self.config.gpus_per_node, "nccl_channels": nch, "compute_channels": compute_ch, + "launch_mode": launch_mode, }, ) diff --git a/cvs/runners/unittests/__init__.py b/cvs/runners/unittests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cvs/runners/unittests/test_aorta_multinode.py b/cvs/runners/unittests/test_aorta_multinode.py new file mode 100644 index 00000000..927d4649 --- /dev/null +++ b/cvs/runners/unittests/test_aorta_multinode.py @@ -0,0 +1,357 @@ +""" +Unit tests for the multi-node disaggregated launch path of ``AortaRunner``. + +These tests exercise pure helpers on the runner (command construction, launch +mode resolution, port selection, head-node trace collection). The networked +container/SSH paths are not exercised here; see ``test_aorta.py`` for the +end-to-end pytest suite that runs against a real cluster. +""" + +import socket +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + +import cvs.runners.aorta as aorta_mod +from cvs.runners.aorta import ( + AortaConfig, + AortaDockerConfig, + AortaEnvironment, + AortaMultiNodeConfig, + AortaRunner, + RcclConfig, + combined_traces_in, +) + + +def _make_runner( + *, + nodes, + aorta_path, + multi_node=None, + base_config="config/distributed.yaml", + experiment_script="scripts/launch_rocm.sh", +): + cfg = AortaConfig( + nodes=list(nodes), + username="testuser", + pkey="/home/testuser/.ssh/id_rsa", + aorta_path=Path(aorta_path), + base_config=base_config, + docker=AortaDockerConfig(), + rccl=RcclConfig(), + environment=AortaEnvironment(), + multi_node=multi_node or AortaMultiNodeConfig(), + build_script="scripts/launch_rocm.sh", + experiment_script=experiment_script, + gpus_per_node=8, + ) + # The runner's __init__ aborts when the docker SDK is unavailable. None of + # the helpers under test actually call into docker, so flip the module flag + # for the duration of this call. This keeps the unit tests runnable in + # minimal CI environments without the docker package. + with patch.object(aorta_mod, "DOCKER_SDK_AVAILABLE", True): + return AortaRunner(cfg) + + +class TestResolveLaunchMode(unittest.TestCase): + def test_auto_resolves_to_script_for_single_node(self): + r = _make_runner(nodes=["10.0.0.1"], aorta_path="/tmp/aorta") + self.assertEqual(r._resolve_launch_mode(), "script") + + def test_auto_resolves_to_torchrun_for_multi_node(self): + r = _make_runner(nodes=["10.0.0.1", "10.0.0.2"], aorta_path="/tmp/aorta") + self.assertEqual(r._resolve_launch_mode(), "torchrun") + + def test_explicit_script_mode_is_respected(self): + r = _make_runner( + nodes=["10.0.0.1", "10.0.0.2"], + aorta_path="/tmp/aorta", + multi_node=AortaMultiNodeConfig(master_launch_mode="script"), + ) + self.assertEqual(r._resolve_launch_mode(), "script") + + def test_explicit_torchrun_mode_is_respected_single_node(self): + r = _make_runner( + nodes=["10.0.0.1"], + aorta_path="/tmp/aorta", + multi_node=AortaMultiNodeConfig(master_launch_mode="torchrun"), + ) + self.assertEqual(r._resolve_launch_mode(), "torchrun") + + +class TestPickMasterPort(unittest.TestCase): + def test_returns_configured_port_when_set(self): + mn = AortaMultiNodeConfig(master_port=29501) + r = _make_runner(nodes=["10.0.0.1", "10.0.0.2"], aorta_path="/tmp/aorta", multi_node=mn) + self.assertEqual(r._pick_master_port(), 29501) + + def test_returns_free_port_in_valid_range_when_unset(self): + r = _make_runner(nodes=["10.0.0.1", "10.0.0.2"], aorta_path="/tmp/aorta") + port = r._pick_master_port() + self.assertIsInstance(port, int) + self.assertGreater(port, 0) + self.assertLess(port, 65536) + # Port should be bindable right after we picked it (best-effort sanity). + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + s.bind(("", port)) + except OSError: + # Race acceptable; we just want the value to look plausible. + pass + + +class TestBuildTorchrunCommand(unittest.TestCase): + def setUp(self): + self.runner = _make_runner( + nodes=["10.0.0.1", "10.0.0.2"], + aorta_path="/tmp/aorta", + multi_node=AortaMultiNodeConfig(), + base_config="config/distributed_multinode.yaml", + ) + + def test_command_contains_required_torchrun_flags(self): + cmd = self.runner._build_torchrun_command( + node_rank=1, + nnodes=2, + master_addr="10.0.0.1", + master_port=29500, + nproc_per_node=8, + ) + self.assertIn("torchrun", cmd) + self.assertIn("--nnodes=2", cmd) + self.assertIn("--node_rank=1", cmd) + self.assertIn("--nproc_per_node=8", cmd) + self.assertIn("--master_addr=10.0.0.1", cmd) + self.assertIn("--master_port=29500", cmd) + + def test_command_uses_container_mount_paths(self): + cmd = self.runner._build_torchrun_command( + node_rank=0, + nnodes=2, + master_addr="10.0.0.1", + master_port=29500, + nproc_per_node=8, + ) + self.assertIn("/mnt/train.py", cmd) + self.assertIn("--config /mnt/config/distributed_multinode.yaml", cmd) + + def test_command_propagates_training_overrides(self): + runner = _make_runner(nodes=["10.0.0.1", "10.0.0.2"], aorta_path="/tmp/aorta") + runner.config.training_overrides = {"training.max_steps": 15, "profiling.active": 6} + cmd = runner._build_torchrun_command( + node_rank=0, + nnodes=2, + master_addr="10.0.0.1", + master_port=29500, + nproc_per_node=8, + ) + self.assertIn("--override", cmd) + self.assertIn("training.max_steps=15", cmd) + self.assertIn("profiling.active=6", cmd) + # All overrides must share a single `--override` group -- aorta's + # argparse(nargs="*") silently drops earlier groups otherwise. + self.assertEqual(cmd.count("--override"), 1) + + def test_extra_torchrun_and_train_args_are_appended(self): + mn = AortaMultiNodeConfig( + extra_torchrun_args=["--rdzv_backend=c10d"], + extra_train_args=["--enable-rocm-metrics"], + ) + runner = _make_runner(nodes=["a", "b"], aorta_path="/tmp/aorta", multi_node=mn) + cmd = runner._build_torchrun_command( + node_rank=0, + nnodes=2, + master_addr="a", + master_port=29500, + nproc_per_node=8, + ) + self.assertIn("--rdzv_backend=c10d", cmd) + self.assertIn("--enable-rocm-metrics", cmd) + + +class TestBuildBaseEnv(unittest.TestCase): + def test_extra_env_is_merged_in(self): + mn = AortaMultiNodeConfig(extra_env={"NCCL_SOCKET_IFNAME": "bond0", "MY_FLAG": "1"}) + runner = _make_runner(nodes=["a", "b"], aorta_path="/tmp/aorta", multi_node=mn) + env = runner._build_base_env() + self.assertEqual(env["NCCL_SOCKET_IFNAME"], "bond0") + self.assertEqual(env["MY_FLAG"], "1") + # Existing NCCL knobs should still be there. + self.assertEqual(env["NCCL_MAX_NCHANNELS"], "112") + self.assertIn("LD_LIBRARY_PATH", env) + + def test_training_overrides_become_env_var(self): + runner = _make_runner(nodes=["a"], aorta_path="/tmp/aorta") + runner.config.training_overrides = {"training.max_steps": 5} + env = runner._build_base_env() + self.assertIn("AORTA_OVERRIDE_ARGS", env) + self.assertIn("training.max_steps", env["AORTA_OVERRIDE_ARGS"]) + + def test_multi_key_overrides_share_one_override_group(self): + # Aorta train.py uses argparse(--override, nargs="*"); multiple + # `--override` groups would silently keep only the last group's values. + # Guarantee a single group regardless of how many keys are configured. + runner = _make_runner(nodes=["a"], aorta_path="/tmp/aorta") + runner.config.training_overrides = { + "training.max_steps": 5, + "training.batch_size": 8, + "profiling.active": 3, + } + env = runner._build_base_env() + self.assertEqual(env["AORTA_OVERRIDE_ARGS"].count("--override"), 1) + for key in runner.config.training_overrides: + self.assertIn(key, env["AORTA_OVERRIDE_ARGS"]) + + +class TestCombinedTracesIn(unittest.TestCase): + def test_returns_true_when_under_combined_traces(self): + root = Path("/aorta") + self.assertTrue(combined_traces_in(root / "combined_traces" / "node_0" / "torch_profiler", root)) + + def test_returns_false_for_real_run_artifacts(self): + root = Path("/aorta") + self.assertFalse(combined_traces_in(root / "artifacts" / "run1" / "torch_profiler", root)) + + def test_returns_false_for_path_outside_root(self): + root = Path("/aorta") + self.assertFalse(combined_traces_in(Path("/elsewhere/torch_profiler"), root)) + + +class TestCopyLocalTorchProfilers(unittest.TestCase): + def test_copies_torch_profiler_trees_and_skips_combined(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + # Real run artifact + (root / "artifacts" / "run1" / "torch_profiler" / "rank_0").mkdir(parents=True) + (root / "artifacts" / "run1" / "torch_profiler" / "rank_0" / "trace.json").write_text("{}") + + # Pre-existing combined traces (must be skipped to avoid recursion) + (root / "combined_traces" / "node_0" / "torch_profiler").mkdir(parents=True) + (root / "combined_traces" / "node_0" / "torch_profiler" / "trace.json").write_text("{}") + + dest = root / "combined_traces" / "node_0_new" + dest.mkdir() + + runner = _make_runner(nodes=["a"], aorta_path=str(root)) + copied = runner._copy_local_torch_profilers(root, dest) + + self.assertTrue(copied) + target = dest / "artifacts" / "run1" / "torch_profiler" / "rank_0" / "trace.json" + self.assertTrue(target.exists(), f"Expected {target} to exist") + # Combined traces tree itself must NOT have been re-copied under dest + self.assertFalse((dest / "combined_traces").exists()) + + def test_returns_false_when_no_traces(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + dest = root / "out" + dest.mkdir() + runner = _make_runner(nodes=["a"], aorta_path=str(root)) + self.assertFalse(runner._copy_local_torch_profilers(root, dest)) + + +class TestCollectMultiNodeTracesHeadOnly(unittest.TestCase): + """ + End-to-end happy path for trace collection where every node is the head + (no SSH involved) so we can exercise the directory layout logic without a + real cluster. + """ + + def test_layout_matches_combined_traces_node_rank(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + (root / "artifacts" / "torch_profiler" / "rank_0").mkdir(parents=True) + (root / "artifacts" / "torch_profiler" / "rank_0" / "trace.json").write_text("{}") + + # Single-node "cluster" so the head-node fast path is used for both ranks. + runner = _make_runner(nodes=[socket.gethostname()], aorta_path=str(root)) + result = runner._collect_multi_node_traces([socket.gethostname()]) + + self.assertIsNotNone(result) + self.assertEqual(result, root / "combined_traces") + self.assertTrue( + ( + root / "combined_traces" / "node_0" / "artifacts" / "torch_profiler" / "rank_0" / "trace.json" + ).exists() + ) + + +class TestValidateConfigChecksTrainScriptInTorchrunMode(unittest.TestCase): + def test_torchrun_mode_requires_train_script(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + # Set up a minimal aorta_path layout missing train.py + (root / "config").mkdir() + (root / "config" / "distributed.yaml").write_text("dummy: 1\n") + (root / "scripts").mkdir() + (root / "scripts" / "launch_rocm.sh").write_text("#!/bin/bash\n") + + runner = _make_runner( + nodes=["a", "b"], + aorta_path=str(root), + multi_node=AortaMultiNodeConfig(master_launch_mode="torchrun"), + ) + errors = runner.validate_config() + self.assertTrue( + any("train_script does not exist" in e for e in errors), + f"Expected a train_script error, got: {errors}", + ) + + def test_script_mode_does_not_require_train_script(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + (root / "config").mkdir() + (root / "config" / "distributed.yaml").write_text("dummy: 1\n") + (root / "scripts").mkdir() + (root / "scripts" / "launch_rocm.sh").write_text("#!/bin/bash\n") + + runner = _make_runner( + nodes=["a"], + aorta_path=str(root), + multi_node=AortaMultiNodeConfig(master_launch_mode="script"), + ) + errors = runner.validate_config() + self.assertFalse( + any("train_script" in e for e in errors), + f"train_script should not be required in script mode, got: {errors}", + ) + + +class TestSchemaMultiNodeBlock(unittest.TestCase): + def test_extra_keys_under_multi_node_are_rejected(self): + from cvs.parsers.schemas import AortaBenchmarkConfigFile + from pydantic import ValidationError + + raw = { + "aorta_path": "/tmp/aorta", + "multi_node": {"bogus_key": "value"}, + } + with self.assertRaises(ValidationError): + AortaBenchmarkConfigFile.model_validate(raw) + + def test_invalid_master_launch_mode_rejected(self): + from cvs.parsers.schemas import AortaBenchmarkConfigFile + from pydantic import ValidationError + + raw = { + "aorta_path": "/tmp/aorta", + "multi_node": {"master_launch_mode": "magic"}, + } + with self.assertRaises(ValidationError): + AortaBenchmarkConfigFile.model_validate(raw) + + def test_default_multi_node_block_has_auto_mode(self): + from cvs.parsers.schemas import AortaBenchmarkConfigFile + + raw = {"aorta_path": "/tmp/aorta"} + cfg = AortaBenchmarkConfigFile.model_validate(raw) + self.assertEqual(cfg.multi_node.master_launch_mode, "auto") + self.assertTrue(cfg.multi_node.collect_traces) + self.assertEqual(cfg.multi_node.train_script, "train.py") + + +if __name__ == "__main__": + unittest.main() diff --git a/cvs/tests/benchmark/test_aorta.py b/cvs/tests/benchmark/test_aorta.py index 9727a832..5361d465 100644 --- a/cvs/tests/benchmark/test_aorta.py +++ b/cvs/tests/benchmark/test_aorta.py @@ -23,6 +23,7 @@ RcclConfig, AortaEnvironment, AortaAnalysisConfig, + AortaMultiNodeConfig, ) from cvs.runners._base_runner import RunStatus from cvs.parsers.aorta_report import AortaReportParser @@ -159,6 +160,21 @@ def aorta_runner_config( skip_if_exists=analysis_cfg.skip_if_exists, ) + # Build multi-node config (used when cluster has >1 node, or when + # multi_node.master_launch_mode is forced to 'torchrun'). + mn_cfg = validated_aorta_config.multi_node + multi_node_config = AortaMultiNodeConfig( + master_launch_mode=mn_cfg.master_launch_mode, + nproc_per_node=mn_cfg.nproc_per_node, + master_port=mn_cfg.master_port, + master_addr=mn_cfg.master_addr, + train_script=mn_cfg.train_script, + extra_torchrun_args=list(mn_cfg.extra_torchrun_args), + extra_train_args=list(mn_cfg.extra_train_args), + extra_env=dict(mn_cfg.extra_env), + collect_traces=mn_cfg.collect_traces, + ) + # Build full runner config return AortaConfig( nodes=node_list, @@ -174,6 +190,7 @@ def aorta_runner_config( rccl=rccl_config, environment=env_config, analysis=analysis_config, + multi_node=multi_node_config, build_script=validated_aorta_config.build_script, experiment_script=validated_aorta_config.experiment_script, gpus_per_node=validated_aorta_config.gpus_per_node, diff --git a/docs/reference/configuration-files/aorta.rst b/docs/reference/configuration-files/aorta.rst index ede91ac3..0c68817f 100644 --- a/docs/reference/configuration-files/aorta.rst +++ b/docs/reference/configuration-files/aorta.rst @@ -62,6 +62,14 @@ Here's a code snippet of the ``aorta_benchmark.yaml`` file for reference: tracelens_script: scripts/tracelens_single_config/run_tracelens_single_config.sh skip_if_exists: false + multi_node: + master_launch_mode: auto + train_script: train.py + extra_torchrun_args: [] + extra_train_args: [] + extra_env: {} + collect_traces: true + expected_results: max_avg_iteration_ms: 7000 min_compute_ratio: 0.8 @@ -164,6 +172,33 @@ Here's an exhaustive list of the available parameters in the Aorta benchmark con * - ``analysis.skip_if_exists`` - false - Skip analysis if ``tracelens_analysis`` directory already exists + * - ``multi_node.master_launch_mode`` + - ``auto`` + - ``auto`` picks ``script`` for single-node clusters and ``torchrun`` for multi-node clusters. Set to ``script`` to force the single-node ``experiment_script`` path (errors out on >1 node), or ``torchrun`` to always build a disaggregated ``torchrun`` command. + * - ``multi_node.nproc_per_node`` + - ``null`` (defaults to ``gpus_per_node``) + - Processes/GPUs per node passed as ``torchrun --nproc_per_node``. + * - ``multi_node.master_port`` + - ``null`` (free ephemeral port) + - Port for the ``torchrun`` rendezvous (``--master_port``). Pin this when you need a deterministic port (e.g., firewalled environments). + * - ``multi_node.master_addr`` + - ``null`` (head node from cluster.json) + - Override the rendezvous address (``--master_addr``). + * - ``multi_node.train_script`` + - ``train.py`` + - Aorta training entry script relative to ``aorta_path``. Used in ``torchrun`` mode. + * - ``multi_node.extra_torchrun_args`` + - ``[]`` + - Additional ``torchrun`` flags appended before the training script. + * - ``multi_node.extra_train_args`` + - ``[]`` + - Additional ``train.py`` flags appended after ``--config``. + * - ``multi_node.extra_env`` + - ``{}`` + - Extra environment variables exported inside each container before ``torchrun``. Use for transport tuning (``NCCL_SOCKET_IFNAME``, ``NCCL_IB_HCA``, ``NCCL_IB_GID_INDEX``, ...). + * - ``multi_node.collect_traces`` + - ``true`` + - When true, the runner pulls each node's ``torch_profiler/`` trees back to ``/combined_traces/node_/`` on the head node so host parsers see one unified trace tree. * - ``expected_results.max_avg_iteration_ms`` - e.g. 7000 - Maximum acceptable average iteration time (ms); validation fails if exceeded @@ -191,6 +226,33 @@ From the CVS repo root (directory containing ``cvs`` and ``input``): Provide a valid ``cluster_file`` and ensure ``aorta_path`` in the config points to an existing Aorta checkout. The runner will build RCCL (unless ``skip_rccl_build`` is true), run the experiment script, collect ``torch_traces`` (PyTorch profiler output), and optionally run TraceLens in the container. Results are parsed on the host from raw traces or from TraceLens reports when present. +Multi-node disaggregated launch +=============================== + +By default, when the cluster file contains more than one node, ``test_aorta`` runs a disaggregated launch: a single Aorta container is started on every node, then the runner kicks off ``torchrun`` in parallel on each container with ``--nnodes``, ``--node_rank``, ``--master_addr``, and ``--master_port`` set so the ranks rendezvous on the head node. This mirrors Aorta's own ``scripts/multi_node/local_launch.sh`` pattern and brings the benchmark in line with the other multi-node CVS suites (sglang, pytorch-xdit), which only require **one** ``cluster.json`` for a multi-node run. + +The multi-node behavior is controlled by the ``multi_node`` block in ``aorta_benchmark.yaml``: + +.. code:: yaml + + multi_node: + master_launch_mode: auto # auto | script | torchrun + nproc_per_node: 8 # defaults to gpus_per_node + master_port: 29500 # default: free ephemeral port + master_addr: 10.0.0.1 # default: head node from cluster.json + train_script: train.py + extra_torchrun_args: [] + extra_train_args: [] + extra_env: + NCCL_SOCKET_IFNAME: bond0 + NCCL_IB_HCA: rdma0,rdma1,rdma2,rdma3,rdma4,rdma5,rdma6,rdma7 + NCCL_IB_GID_INDEX: "3" + collect_traces: true + +Single-node clusters keep using the configured ``experiment_script`` (``master_launch_mode: auto`` resolves to ``script``). Force the disaggregated path with ``master_launch_mode: torchrun`` if you want it for a single-node cluster too. + +When ``collect_traces`` is true, every node's ``torch_profiler/`` directories are rsynced back to ``/combined_traces/node_/`` on the head node and exposed as the ``torch_traces`` artifact, so the existing host parsers and threshold checks see one unified tree without further configuration. + Expected results and artifacts ==============================