diff --git a/.github/workflows/build-ubuntu.yml b/.github/workflows/build-ubuntu.yml index 97c493046..b23d0846e 100644 --- a/.github/workflows/build-ubuntu.yml +++ b/.github/workflows/build-ubuntu.yml @@ -235,6 +235,7 @@ jobs: env: ACCEPT_CLOUDXR_EULA: Y CXR_BUILD_CONTEXT: ${{ github.workspace }} + CXR_RUNTIME_NETWORK_MODE: bridge PYTHON_VERSION: ${{ matrix.python_version }} ISAACTELEOP_PIP_DEBUG: "0" ISAACTELEOP_PIP_FIND_LINKS: /workspace/install/wheels diff --git a/deps/cloudxr/docker-compose.test.yaml b/deps/cloudxr/docker-compose.test.yaml index b631cc194..42d13623c 100644 --- a/deps/cloudxr/docker-compose.test.yaml +++ b/deps/cloudxr/docker-compose.test.yaml @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # Test docker-compose override for running Isaac Teleop tests with CloudXR. diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index d5e6dc1eb..1a2f730a3 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -65,6 +65,11 @@ if(BUILD_TESTING) # Teleop session manager tests (Python) add_subdirectory(teleop_session_manager_tests) + # CloudXR tests (Python — requires the built wheel from python/) + if(BUILD_PYTHON_BINDINGS) + add_subdirectory(cloudxr_tests) + endif() + # MCAP tests (C++) add_subdirectory(mcap_tests/cpp) endif() diff --git a/src/core/cloudxr/python/CMakeLists.txt b/src/core/cloudxr/python/CMakeLists.txt index f3f9f55d1..5e88cf747 100644 --- a/src/core/cloudxr/python/CMakeLists.txt +++ b/src/core/cloudxr/python/CMakeLists.txt @@ -108,6 +108,7 @@ add_custom_target(cloudxr_python ALL "${CMAKE_CURRENT_SOURCE_DIR}/__init__.py" "${CMAKE_CURRENT_SOURCE_DIR}/__main__.py" "${CMAKE_CURRENT_SOURCE_DIR}/env_config.py" + "${CMAKE_CURRENT_SOURCE_DIR}/launcher.py" "${CMAKE_CURRENT_SOURCE_DIR}/runtime.py" "${CMAKE_CURRENT_SOURCE_DIR}/wss.py" "${CLOUDXR_PYTHON_DIR}/" diff --git a/src/core/cloudxr/python/__init__.py b/src/core/cloudxr/python/__init__.py index a53076f62..5fc7cc4ec 100644 --- a/src/core/cloudxr/python/__init__.py +++ b/src/core/cloudxr/python/__init__.py @@ -2,3 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 """CloudXR integration for isaacteleop.""" + +from .launcher import CloudXRLauncher + +__all__ = ["CloudXRLauncher"] diff --git a/src/core/cloudxr/python/__main__.py b/src/core/cloudxr/python/__main__.py index 662e66564..b8637b90a 100644 --- a/src/core/cloudxr/python/__main__.py +++ b/src/core/cloudxr/python/__main__.py @@ -4,27 +4,18 @@ """Entry point for python -m isaacteleop.cloudxr. Runs CloudXR runtime and WSS proxy; main process winds both down on exit.""" import argparse -import asyncio -import multiprocessing import os import signal -import sys -from datetime import datetime, timezone +import time from isaacteleop import __version__ as isaacteleop_version -from isaacteleop.cloudxr.env_config import EnvConfig -from isaacteleop.cloudxr.runtime import ( - check_eula, - latest_runtime_log, - run as runtime_run, - runtime_version, - terminate_or_kill_runtime, - wait_for_runtime_ready, -) -from isaacteleop.cloudxr.wss import run as wss_run +from isaacteleop.cloudxr.env_config import get_env_config +from isaacteleop.cloudxr.launcher import CloudXRLauncher +from isaacteleop.cloudxr.runtime import latest_runtime_log, runtime_version def _parse_args() -> argparse.Namespace: + """Parse command-line arguments for the CloudXR runtime entry point.""" parser = argparse.ArgumentParser(description="CloudXR runtime and WSS proxy") parser.add_argument( "--cloudxr-install-dir", @@ -48,60 +39,50 @@ def _parse_args() -> argparse.Namespace: return parser.parse_args() -async def _main_async() -> None: +def main() -> None: + """Launch the CloudXR runtime and WSS proxy, then block until interrupted.""" args = _parse_args() - env_cfg = EnvConfig.from_args(args.cloudxr_install_dir, args.cloudxr_env_config) - check_eula(accept_eula=args.accept_eula or None) - logs_dir_path = env_cfg.ensure_logs_dir() - wss_ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H%M%SZ") - wss_log_path = logs_dir_path / f"wss.{wss_ts}.log" - - runtime_proc = multiprocessing.Process(target=runtime_run) - runtime_proc.start() - - cxr_ver = runtime_version() - print( - f"Running Isaac Teleop \033[36m{isaacteleop_version}\033[0m, CloudXR Runtime \033[36m{cxr_ver}\033[0m" - ) - - try: - ready = await wait_for_runtime_ready(runtime_proc) - if not ready: - if not runtime_proc.is_alive() and runtime_proc.exitcode != 0: - sys.exit( - runtime_proc.exitcode if runtime_proc.exitcode is not None else 1 - ) - print("CloudXR runtime failed to start, terminating...") - sys.exit(1) - - stop = asyncio.get_running_loop().create_future() - def on_signal() -> None: - if not stop.done(): - stop.set_result(None) - - loop = asyncio.get_running_loop() - for sig in (signal.SIGINT, signal.SIGTERM): - loop.add_signal_handler(sig, on_signal) + with CloudXRLauncher( + install_dir=args.cloudxr_install_dir, + env_config=args.cloudxr_env_config, + accept_eula=args.accept_eula, + ) as launcher: + cxr_ver = runtime_version() + print( + f"Running Isaac Teleop \033[36m{isaacteleop_version}\033[0m, CloudXR Runtime \033[36m{cxr_ver}\033[0m" + ) + env_cfg = get_env_config() + logs_dir_path = env_cfg.ensure_logs_dir() cxr_log = latest_runtime_log() or logs_dir_path print( f"CloudXR runtime: \033[36mrunning\033[0m, log file: \033[90m{cxr_log}\033[0m" ) + wss_log = launcher.wss_log_path print( - f"CloudXR WSS proxy: \033[36mrunning\033[0m, log file: \033[90m{wss_log_path}\033[0m" + f"CloudXR WSS proxy: \033[36mrunning\033[0m, log file: \033[90m{wss_log}\033[0m" ) print( f"Activate CloudXR environment in another terminal: \033[1;32msource {env_cfg.env_filepath()}\033[0m" ) print("\033[33mKeep this terminal open, Ctrl+C to terminate.\033[0m") - await wss_run(log_file_path=wss_log_path, stop_future=stop) - finally: - terminate_or_kill_runtime(runtime_proc) + stop = False + + def on_signal(sig, frame): + nonlocal stop + stop = True + + signal.signal(signal.SIGINT, on_signal) + signal.signal(signal.SIGTERM, on_signal) + + while not stop: + launcher.health_check() + time.sleep(0.1) print("Stopped.") if __name__ == "__main__": - asyncio.run(_main_async()) + main() diff --git a/src/core/cloudxr/python/launcher.py b/src/core/cloudxr/python/launcher.py new file mode 100644 index 000000000..c3cfdbed7 --- /dev/null +++ b/src/core/cloudxr/python/launcher.py @@ -0,0 +1,410 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Programmatic launcher for the CloudXR runtime and WSS proxy. + +Wraps the logic from ``python -m isaacteleop.cloudxr`` into a reusable +start/stop API that can be called from embedding applications (e.g. +Isaac Lab Teleop) without requiring a separate terminal. +""" + +from __future__ import annotations + +import asyncio +import atexit +import logging +import os +import signal +import subprocess +import sys +import threading +import time +from datetime import datetime, timezone +from pathlib import Path + +from .env_config import EnvConfig +from .runtime import ( + RUNTIME_STARTUP_TIMEOUT_SEC, + RUNTIME_TERMINATE_TIMEOUT_SEC, + check_eula, + wait_for_runtime_ready_sync, +) + +logger = logging.getLogger(__name__) + +_RUNTIME_WORKER_CODE = """\ +import sys, os +sys.path = [p for p in sys.path if p] +from isaacteleop.cloudxr.runtime import run +run() +""" + + +class CloudXRLauncher: + """Programmatic launcher for the CloudXR runtime and WSS proxy. + + Manages the full lifecycle of a CloudXR runtime process and its + accompanying WSS TLS proxy. The runtime and WSS proxy are started + immediately on construction; use :meth:`stop` or the context + manager protocol to shut them down. + + The runtime is launched as a fully isolated subprocess (via + :class:`subprocess.Popen`) to avoid CUDA context conflicts with + host applications like Isaac Sim that have already initialized GPU + resources. + + Example:: + + with CloudXRLauncher() as launcher: + # runtime + WSS proxy are running + ... + + Or with explicit stop:: + + launcher = CloudXRLauncher(install_dir="~/.cloudxr") + try: + # ... use the running runtime ... + finally: + launcher.stop() + """ + + def __init__( + self, + install_dir: str = "~/.cloudxr", + env_config: str | Path | None = None, + accept_eula: bool = False, + ) -> None: + """Launch the CloudXR runtime and WSS proxy. + + Configures the environment, spawns the runtime subprocess, and + starts the WSS TLS proxy. Blocks until the runtime signals + readiness (up to + :data:`~isaacteleop.cloudxr.runtime.RUNTIME_STARTUP_TIMEOUT_SEC`) + or raises :class:`RuntimeError` on failure. + + Args: + install_dir: CloudXR install directory. + env_config: Optional path to a KEY=value env file for + CloudXR env-var overrides. + accept_eula: Accept the NVIDIA CloudXR EULA + non-interactively. When ``False`` and the EULA marker + does not exist, the user is prompted on stdin. + + Raises: + RuntimeError: If the EULA is not accepted or the runtime + fails to start within the timeout. + """ + self._install_dir = install_dir + self._env_config = str(env_config) if env_config is not None else None + self._accept_eula = accept_eula + + self._runtime_proc: subprocess.Popen | None = None + self._wss_thread: threading.Thread | None = None + self._wss_loop: asyncio.AbstractEventLoop | None = None + self._wss_stop_future: asyncio.Future | None = None + self._wss_log_path: Path | None = None + self._atexit_registered = False + + env_cfg = EnvConfig.from_args(self._install_dir, self._env_config) + try: + check_eula(accept_eula=self._accept_eula or None) + except SystemExit as exc: + raise RuntimeError( + "CloudXR EULA was not accepted; cannot start the runtime" + ) from exc + logs_dir_path = env_cfg.ensure_logs_dir() + + self._cleanup_stale_runtime(env_cfg) + + self._runtime_proc = subprocess.Popen( + [sys.executable, "-c", _RUNTIME_WORKER_CODE], + env=os.environ.copy(), + stderr=subprocess.PIPE, + start_new_session=True, + ) + logger.info("CloudXR runtime process started (pid=%s)", self._runtime_proc.pid) + + if not wait_for_runtime_ready_sync(is_process_alive=self._is_runtime_alive): + detail = self._collect_startup_failure_detail(logs_dir_path) + self.stop() + raise RuntimeError( + "CloudXR runtime failed to start within " + f"{RUNTIME_STARTUP_TIMEOUT_SEC}s. {detail}" + ) + logger.info("CloudXR runtime ready") + + if not self._atexit_registered: + atexit.register(self.stop) + self._atexit_registered = True + + wss_ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H%M%SZ") + wss_log_path = logs_dir_path / f"wss.{wss_ts}.log" + self._wss_log_path = wss_log_path + self._start_wss_proxy(wss_log_path) + logger.info("CloudXR WSS proxy started (log=%s)", wss_log_path) + + # ------------------------------------------------------------------ + # Context manager + # ------------------------------------------------------------------ + + def __enter__(self) -> CloudXRLauncher: + """Return the launcher for use in a ``with`` block.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + """Stop the launcher on exiting the ``with`` block.""" + self.stop() + + def stop(self) -> None: + """Shut down the WSS proxy and terminate the runtime process. + + Safe to call multiple times or when nothing is running. + + Raises: + RuntimeError: If the runtime process could not be terminated. + The process handle is retained so callers can retry or + inspect the still-running process. + """ + self._stop_wss_proxy() + + if self._runtime_proc is not None: + try: + self._terminate_runtime() + except RuntimeError: + logger.warning( + "Failed to cleanly terminate CloudXR runtime process (pid=%s); " + "handle retained for later cleanup", + self._runtime_proc.pid, + ) + raise + self._runtime_proc = None + logger.info("CloudXR runtime process stopped") + + def health_check(self) -> None: + """Verify that the runtime process and WSS proxy are healthy. + + Returns immediately when both components are running. Raises + :class:`RuntimeError` with diagnostic details when any component + has stopped unexpectedly, allowing embedding applications to + perform a controlled teardown. + + Raises: + RuntimeError: If the launcher has not been started, or if + the runtime process or WSS proxy has stopped. + """ + if self._runtime_proc is None: + raise RuntimeError("CloudXR launcher is not running") + + exit_code = self._runtime_proc.poll() + if exit_code is not None: + raise RuntimeError( + f"CloudXR runtime process exited unexpectedly (exit code {exit_code})" + ) + + if self._wss_thread is not None and not self._wss_thread.is_alive(): + raise RuntimeError("CloudXR WSS proxy thread stopped unexpectedly") + + @property + def wss_log_path(self) -> Path | None: + """Path to the WSS proxy log file, or ``None`` if not yet started.""" + return self._wss_log_path + + # ------------------------------------------------------------------ + # Private helpers + # ------------------------------------------------------------------ + + @staticmethod + def _cleanup_stale_runtime(env_cfg: EnvConfig) -> None: + """Remove stale sentinel files from a previous runtime that wasn't cleaned up. + + If the ``ipc_cloudxr`` socket still exists in the run directory, a + previous Monado/CloudXR process is likely still alive. We send + SIGTERM to the process group that owns the socket, giving it a + chance to exit cleanly before we start a fresh runtime. + """ + run_dir = env_cfg.openxr_run_dir() + ipc_socket = os.path.join(run_dir, "ipc_cloudxr") + + if os.path.exists(ipc_socket): + logger.warning( + "Stale CloudXR IPC socket found at %s; attempting cleanup of previous runtime", + ipc_socket, + ) + try: + result = subprocess.run( + ["fuser", "-k", "-TERM", ipc_socket], + capture_output=True, + timeout=5, + ) + if result.returncode == 0: + time.sleep(1) + logger.info("Sent SIGTERM to processes holding stale IPC socket") + except (FileNotFoundError, subprocess.TimeoutExpired): + pass + + try: + os.remove(ipc_socket) + except FileNotFoundError: + pass + + for name in ("runtime_started", "monado.pid", "cloudxr.pid"): + try: + os.remove(os.path.join(run_dir, name)) + except FileNotFoundError: + pass + + def _collect_startup_failure_detail(self, logs_dir: Path) -> str: + """Build a diagnostic string after a failed runtime startup. + + Captures the process exit code, subprocess stderr pipe, the + runtime stderr log file (written by :func:`~.runtime.run`), and + the most recent CloudXR native server log. + """ + _MAX_LOG_BYTES = 4096 + parts: list[str] = [] + proc = self._runtime_proc + if proc is not None: + exit_code = proc.poll() + if exit_code is not None: + parts.append(f"Process exited with code {exit_code}.") + stderr_pipe = getattr(proc, "stderr", None) + if stderr_pipe is not None: + try: + stderr_tail = stderr_pipe.read(_MAX_LOG_BYTES) + if stderr_tail: + parts.append( + f"stderr: {stderr_tail.decode(errors='replace').strip()}" + ) + except Exception: + pass + else: + parts.append("Process is still running but did not signal readiness.") + + for log_path in self._gather_diagnostic_logs(logs_dir): + try: + content = log_path.read_text(errors="replace").strip() + if not content: + continue + if len(content) > _MAX_LOG_BYTES: + content = "...\n" + content[-_MAX_LOG_BYTES:] + parts.append(f"{log_path.name}:\n{content}") + except Exception: + pass + + parts.append(f"Check logs under {logs_dir} for details.") + return " ".join(parts) + + @staticmethod + def _gather_diagnostic_logs(logs_dir: Path) -> list[Path]: + """Return log files useful for diagnosing a startup failure.""" + result: list[Path] = [] + + stderr_log = logs_dir / "runtime_stderr.log" + if stderr_log.is_file(): + result.append(stderr_log) + + cxr_logs = sorted(logs_dir.glob("cxr_server.*.log")) + if cxr_logs: + result.append(cxr_logs[-1]) + + return result + + def _is_runtime_alive(self) -> bool: + """Return whether the runtime subprocess is still running.""" + return self._runtime_proc is not None and self._runtime_proc.poll() is None + + def _terminate_runtime(self) -> None: + """Terminate the runtime subprocess and all its children. + + Because the subprocess is launched with ``start_new_session=True`` + it is the leader of its own process group. Sending the signal to + the negative PID kills the entire group (including Monado and any + other children), preventing stale processes from lingering. + """ + proc = self._runtime_proc + if proc is None or proc.poll() is not None: + return + + try: + pgid = os.getpgid(proc.pid) + except ProcessLookupError: + return + + try: + os.killpg(pgid, signal.SIGTERM) + except ProcessLookupError: + return + try: + proc.wait(timeout=RUNTIME_TERMINATE_TIMEOUT_SEC) + except subprocess.TimeoutExpired: + pass + + if proc.poll() is None: + try: + os.killpg(pgid, signal.SIGKILL) + except ProcessLookupError: + return + try: + proc.wait(timeout=RUNTIME_TERMINATE_TIMEOUT_SEC) + except subprocess.TimeoutExpired: + pass + + if proc.poll() is None: + raise RuntimeError("Failed to terminate or kill runtime process group") + + # ------------------------------------------------------------------ + # WSS proxy (background thread with its own event loop) + # ------------------------------------------------------------------ + + def _start_wss_proxy(self, log_path: Path) -> None: + """Launch the WSS proxy in a daemon thread.""" + from .wss import run as wss_run + + loop = asyncio.new_event_loop() + self._wss_loop = loop + stop_future = loop.create_future() + self._wss_stop_future = stop_future + + def _run_wss() -> None: + asyncio.set_event_loop(loop) + try: + loop.run_until_complete( + wss_run(log_file_path=log_path, stop_future=stop_future) + ) + except Exception: + logger.exception("WSS proxy thread exited with error") + finally: + loop.close() + + self._wss_thread = threading.Thread( + target=_run_wss, name="cloudxr-wss-proxy", daemon=True + ) + self._wss_thread.start() + + def _stop_wss_proxy(self) -> None: + """Signal the WSS proxy to shut down and wait for the thread.""" + if self._wss_loop is not None and self._wss_stop_future is not None: + loop = self._wss_loop + future = self._wss_stop_future + + def _set_result() -> None: + if not future.done(): + future.set_result(None) + + if not loop.is_closed(): + try: + loop.call_soon_threadsafe(_set_result) + except RuntimeError: + logger.debug( + "WSS event loop closed before stop signal; " + "proxy already shut down" + ) + + if self._wss_thread is not None: + self._wss_thread.join(timeout=5) + if self._wss_thread.is_alive(): + logger.warning("WSS proxy thread did not exit cleanly") + + self._wss_thread = None + self._wss_loop = None + self._wss_stop_future = None diff --git a/src/core/cloudxr/python/runtime.py b/src/core/cloudxr/python/runtime.py index 4cd0de8f5..bb050ea29 100644 --- a/src/core/cloudxr/python/runtime.py +++ b/src/core/cloudxr/python/runtime.py @@ -9,6 +9,8 @@ import signal import sys import threading +import time +from collections.abc import Callable from .env_config import get_env_config @@ -16,8 +18,15 @@ _EULA_URL = ( "https://github.com/NVIDIA/IsaacTeleop/blob/main/deps/cloudxr/CLOUDXR_LICENSE" ) -_RUNTIME_JOIN_TIMEOUT = 10 -_RUNTIME_STARTUP_TIMEOUT_SEC = 10 + +RUNTIME_STARTUP_TIMEOUT_SEC: float = 30 +"""Maximum time [s] to wait for the runtime ``runtime_started`` sentinel.""" + +RUNTIME_TERMINATE_TIMEOUT_SEC: float = 10 +"""Timeout [s] for each escalation step (SIGTERM, then SIGKILL) when stopping the runtime.""" + +RUNTIME_POLL_INTERVAL_SEC: float = 0.5 +"""Polling interval [s] used by :func:`wait_for_runtime_ready_sync`.""" def _write_eula_marker(marker: str) -> None: @@ -68,28 +77,71 @@ def _get_sdk_path() -> str | None: async def wait_for_runtime_ready( - process: multiprocessing.Process, - timeout_sec: float = _RUNTIME_STARTUP_TIMEOUT_SEC, + is_process_alive: Callable[[], bool], + timeout_sec: float = RUNTIME_STARTUP_TIMEOUT_SEC, + poll_interval_sec: float = RUNTIME_POLL_INTERVAL_SEC, ) -> bool: + """Poll for the ``runtime_started`` sentinel file. + + This is the canonical implementation used by both the async callers + and :func:`wait_for_runtime_ready_sync`. + + Args: + is_process_alive: Callable returning ``True`` while the + runtime process is still running. + timeout_sec: Maximum time to wait [s]. + poll_interval_sec: Polling interval [s]. + + Returns: + ``True`` when the runtime is ready, ``False`` on timeout or + if the process exits early. """ - Return True when runtime is ready (lock file runtime_started). Return False on timeout or if - the process exits early. - """ + lock_file = os.path.join(get_env_config().openxr_run_dir(), "runtime_started") loop = asyncio.get_running_loop() deadline = loop.time() + timeout_sec - lock_file = os.path.join(get_env_config().openxr_run_dir(), "runtime_started") - while loop.time() < deadline: - if not process.is_alive(): + if not is_process_alive(): return False - if os.path.isfile(lock_file): return True + await asyncio.sleep(poll_interval_sec) + + return False + + +def wait_for_runtime_ready_sync( + is_process_alive: Callable[[], bool], + timeout_sec: float = RUNTIME_STARTUP_TIMEOUT_SEC, + poll_interval_sec: float = RUNTIME_POLL_INTERVAL_SEC, +) -> bool: + """Synchronous poll for the ``runtime_started`` sentinel file. + + Unlike :func:`wait_for_runtime_ready`, this implementation uses + :func:`time.monotonic` and :func:`time.sleep` so it is safe to call + from threads or processes that already have a running asyncio event + loop (e.g. Omniverse Kit / Isaac Sim). + + Args: + is_process_alive: Callable returning ``True`` while the + runtime process is still running. + timeout_sec: Maximum time to wait [s]. + poll_interval_sec: Polling interval [s]. + + Returns: + ``True`` when the runtime is ready, ``False`` on timeout or + if the process exits early. + """ + lock_file = os.path.join(get_env_config().openxr_run_dir(), "runtime_started") + deadline = time.monotonic() + timeout_sec - await asyncio.sleep(1) + while time.monotonic() < deadline: + if not is_process_alive(): + return False + if os.path.isfile(lock_file): + return True + time.sleep(poll_interval_sec) - # Runtime startup timeout reached, assume the runtime is not ready return False @@ -114,13 +166,13 @@ def latest_runtime_log() -> str | None: def terminate_or_kill_runtime(process: multiprocessing.Process) -> None: - """Terminate or kill the runtime process.""" + """Terminate or kill a :class:`multiprocessing.Process` runtime.""" if process.is_alive(): process.terminate() - process.join(timeout=_RUNTIME_JOIN_TIMEOUT) + process.join(timeout=RUNTIME_TERMINATE_TIMEOUT_SEC) if process.is_alive(): process.kill() - process.join(timeout=_RUNTIME_JOIN_TIMEOUT) + process.join(timeout=RUNTIME_TERMINATE_TIMEOUT_SEC) if process.is_alive(): raise RuntimeError("Failed to terminate or kill runtime process") @@ -140,7 +192,7 @@ def _setup_openxr_dir(sdk_path: str, run_dir: str) -> str: raise RuntimeError(f"CloudXR SDK missing {name} at {src}. ") shutil.copy2(src, os.path.join(openxr_dir, name)) - for stale in ("ipc_cloudxr", "runtime_started"): + for stale in ("ipc_cloudxr", "runtime_started", "monado.pid", "cloudxr.pid"): p = os.path.join(run_dir, stale) if os.path.exists(p): os.remove(p) @@ -173,8 +225,10 @@ def run() -> None: prev_ld = os.environ.get("LD_LIBRARY_PATH", "") os.environ["LD_LIBRARY_PATH"] = sdk_path + (f":{prev_ld}" if prev_ld else "") - # By default suppress native library console output (banner, StreamSDK redirect notice), so - # that we can have complete control over the console output. + # When file-logging is active the native library writes detailed logs to + # NV_CXR_OUTPUT_DIR. Suppress the console banner on stdout but redirect + # stderr to a file so that Vulkan-loader diagnostics, GPU-init errors, + # and Python tracebacks are preserved for post-mortem analysis. _file_logging = os.environ.get("NV_CXR_FILE_LOGGING", "yes") if _file_logging and _file_logging.lower() not in ( "false", @@ -184,10 +238,14 @@ def run() -> None: "f", "0", ): + logs_dir = cfg.ensure_logs_dir() + stderr_log = os.path.join(str(logs_dir), "runtime_stderr.log") devnull_fd = os.open(os.devnull, os.O_WRONLY) + stderr_fd = os.open(stderr_log, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o644) os.dup2(devnull_fd, sys.stdout.fileno()) - os.dup2(devnull_fd, sys.stderr.fileno()) + os.dup2(stderr_fd, sys.stderr.fileno()) os.close(devnull_fd) + os.close(stderr_fd) lib_path = os.path.join(sdk_path, "libcloudxr.so") deepbind = getattr(os, "RTLD_DEEPBIND", 0) diff --git a/src/core/cloudxr_tests/CMakeLists.txt b/src/core/cloudxr_tests/CMakeLists.txt new file mode 100644 index 000000000..004c5dd9a --- /dev/null +++ b/src/core/cloudxr_tests/CMakeLists.txt @@ -0,0 +1,9 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# ============================================================================== +# CloudXR Tests +# ============================================================================== + +# Python tests +add_subdirectory(python) diff --git a/src/core/cloudxr_tests/python/CMakeLists.txt b/src/core/cloudxr_tests/python/CMakeLists.txt new file mode 100644 index 000000000..185cf2102 --- /dev/null +++ b/src/core/cloudxr_tests/python/CMakeLists.txt @@ -0,0 +1,33 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# ============================================================================== +# CloudXR Python Tests +# ============================================================================== + +# Discover and add individual Python tests +# This makes each test show up separately in CTest output + +# First, find all test files (only in this directory, not recursively) +file(GLOB TEST_FILES + RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" + "${CMAKE_CURRENT_SOURCE_DIR}/test_*.py" +) + +# Add each test file as a separate CTest test +foreach(test_file ${TEST_FILES}) + # Get test name from filename (remove .py and path) + get_filename_component(test_name "${test_file}" NAME_WE) + + # Add the test + add_test( + NAME "cloudxr_${test_name}" + COMMAND uv run --python ${ISAAC_TELEOP_PYTHON_VERSION} --extra dev pytest -v --tb=short "${test_file}" + WORKING_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}" + ) + + # Set the environment to ensure the built package is available + set_tests_properties("cloudxr_${test_name}" PROPERTIES + ENVIRONMENT "PYTHONPATH=${CMAKE_BINARY_DIR}/python_package/$" + ) +endforeach() diff --git a/src/core/cloudxr_tests/python/pyproject.toml b/src/core/cloudxr_tests/python/pyproject.toml new file mode 100644 index 000000000..3850a5360 --- /dev/null +++ b/src/core/cloudxr_tests/python/pyproject.toml @@ -0,0 +1,26 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +[project] +name = "teleopcore-cloudxr-tests" +version = "0.0.0" # Internal tests - not versioned +description = "CloudXR launcher and runtime unit tests for TeleopCore" +requires-python = ">=3.10,<3.14" + +[project.optional-dependencies] +dev = [ + "pytest", + "numpy", +] +test = [ + "pytest", + "numpy", +] + +[tool.pytest.ini_options] +pythonpath = ["."] +python_files = ["test_*.py"] +python_classes = ["Test*"] +python_functions = ["test_*"] +# Prevent pytest from recursing into parent directories +norecursedirs = [".git", ".venv", "build", "dist", "*.egg", "__pycache__"] diff --git a/src/core/cloudxr_tests/python/test_launcher.py b/src/core/cloudxr_tests/python/test_launcher.py new file mode 100644 index 000000000..c7ffbcbee --- /dev/null +++ b/src/core/cloudxr_tests/python/test_launcher.py @@ -0,0 +1,318 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for isaacteleop.cloudxr.launcher — CloudXRLauncher lifecycle.""" + +import os +import signal +import subprocess +import sys +from contextlib import contextmanager +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from isaacteleop.cloudxr.launcher import CloudXRLauncher + +_posix_only = pytest.mark.skipif( + sys.platform == "win32", + reason="Process-group APIs (os.getpgid/os.killpg) are POSIX-only", +) + + +# ============================================================================ +# Helpers +# ============================================================================ + + +class _FakeEnvConfig: + """Minimal stand-in for EnvConfig.""" + + def __init__(self, run_dir: str, logs_dir: Path) -> None: + self._run_dir = run_dir + self._logs_dir = logs_dir + + @classmethod + def from_args(cls, install_dir, env_file=None): + raise NotImplementedError("Should be patched") + + def openxr_run_dir(self) -> str: + return self._run_dir + + def ensure_logs_dir(self) -> Path: + self._logs_dir.mkdir(parents=True, exist_ok=True) + return self._logs_dir + + +def _make_mock_popen(pid: int = 12345, poll_returns: list | None = None) -> MagicMock: + """Create a mock subprocess.Popen with configurable poll() behaviour.""" + proc = MagicMock(spec=subprocess.Popen) + proc.pid = pid + + if poll_returns is not None: + seq = list(poll_returns) + + def _poll(): + if seq: + return seq.pop(0) + return 0 + + proc.poll = MagicMock(side_effect=_poll) + else: + proc.poll = MagicMock(return_value=None) + + return proc + + +@contextmanager +def mock_launcher_deps(tmp_path, ready=True): + """Patch all heavy external dependencies so CloudXRLauncher construction runs without I/O. + + Yields a dict of the mock objects for assertion. + """ + run_dir = str(tmp_path / "run") + logs_dir = tmp_path / "logs" + fake_cfg = _FakeEnvConfig(run_dir, logs_dir) + + mock_proc = _make_mock_popen() + + mocks = {} + with ( + patch( + "isaacteleop.cloudxr.launcher.EnvConfig.from_args", + return_value=fake_cfg, + ) as m_from_args, + patch( + "isaacteleop.cloudxr.launcher.check_eula", + ) as m_eula, + patch( + "isaacteleop.cloudxr.launcher.wait_for_runtime_ready_sync", + return_value=ready, + ) as m_wait, + patch( + "isaacteleop.cloudxr.launcher.subprocess.Popen", + return_value=mock_proc, + ) as m_popen, + patch.object( + CloudXRLauncher, + "_start_wss_proxy", + ) as m_wss, + patch.object( + CloudXRLauncher, + "_cleanup_stale_runtime", + ) as m_cleanup, + patch( + "isaacteleop.cloudxr.launcher.atexit", + ) as m_atexit, + ): + mocks["from_args"] = m_from_args + mocks["check_eula"] = m_eula + mocks["wait"] = m_wait + mocks["popen"] = m_popen + mocks["proc"] = mock_proc + mocks["wss"] = m_wss + mocks["cleanup"] = m_cleanup + mocks["atexit"] = m_atexit + mocks["env_cfg"] = fake_cfg + yield mocks + + +# ============================================================================ +# TestLauncherConstruction +# ============================================================================ + + +class TestLauncherConstruction: + """Tests for CloudXRLauncher construction (which starts the runtime).""" + + def test_construction_stores_parameters(self, tmp_path): + """Constructor stores install_dir, env_config, and accept_eula.""" + with mock_launcher_deps(tmp_path, ready=True): + launcher = CloudXRLauncher( + install_dir="/opt/cloudxr", + env_config="/etc/cloudxr.env", + accept_eula=True, + ) + assert launcher._install_dir == "/opt/cloudxr" + assert launcher._env_config == "/etc/cloudxr.env" + assert launcher._accept_eula is True + + def test_construction_launches_runtime_and_wss(self, tmp_path): + """Successful construction calls Popen and WSS proxy.""" + with mock_launcher_deps(tmp_path, ready=True) as mocks: + CloudXRLauncher() + + mocks["popen"].assert_called_once() + mocks["wss"].assert_called_once() + mocks["check_eula"].assert_called_once() + mocks["cleanup"].assert_called_once() + + def test_construction_raises_on_runtime_failure(self, tmp_path): + """RuntimeError when the runtime fails to become ready.""" + with mock_launcher_deps(tmp_path, ready=False) as mocks: + mocks["proc"].poll.return_value = 1 + + with pytest.raises(RuntimeError, match="failed to start"): + CloudXRLauncher() + + def test_wss_log_path_set_after_construction(self, tmp_path): + """wss_log_path is a Path after successful construction.""" + with mock_launcher_deps(tmp_path, ready=True): + launcher = CloudXRLauncher() + + assert launcher.wss_log_path is not None + assert isinstance(launcher.wss_log_path, Path) + assert "wss." in str(launcher.wss_log_path) + + +# ============================================================================ +# TestLauncherStop +# ============================================================================ + + +class TestLauncherStop: + """Tests for CloudXRLauncher.stop().""" + + @_posix_only + def test_stop_terminates_runtime(self, tmp_path): + """stop() sends SIGTERM to the runtime process group.""" + with mock_launcher_deps(tmp_path, ready=True) as mocks: + launcher = CloudXRLauncher() + + proc = mocks["proc"] + poll_seq = [None, 0] + proc.poll = MagicMock( + side_effect=lambda: poll_seq.pop(0) if poll_seq else 0 + ) + proc.wait = MagicMock() + + with ( + patch( + "isaacteleop.cloudxr.launcher.os.getpgid", return_value=99 + ) as m_getpgid, + patch("isaacteleop.cloudxr.launcher.os.killpg") as m_killpg, + ): + launcher.stop() + + m_getpgid.assert_called_once_with(proc.pid) + m_killpg.assert_called_once_with(99, signal.SIGTERM) + + def test_stop_idempotent(self, tmp_path): + """Calling stop() twice does not raise.""" + with mock_launcher_deps(tmp_path, ready=True) as mocks: + launcher = CloudXRLauncher() + + mocks["proc"].poll.return_value = 0 + + launcher.stop() + launcher.stop() + + @_posix_only + def test_stop_escalates_to_sigkill(self, tmp_path): + """stop() sends SIGKILL when SIGTERM doesn't work.""" + with mock_launcher_deps(tmp_path, ready=True) as mocks: + launcher = CloudXRLauncher() + + proc = mocks["proc"] + poll_seq = [None, None, 0] + proc.poll = MagicMock( + side_effect=lambda: poll_seq.pop(0) if poll_seq else 0 + ) + proc.wait = MagicMock(side_effect=subprocess.TimeoutExpired("cmd", 10)) + + with ( + patch("isaacteleop.cloudxr.launcher.os.getpgid", return_value=99), + patch("isaacteleop.cloudxr.launcher.os.killpg") as m_killpg, + ): + launcher.stop() + + calls = m_killpg.call_args_list + assert len(calls) == 2 + assert calls[0].args == (99, signal.SIGTERM) + assert calls[1].args == (99, signal.SIGKILL) + + +# ============================================================================ +# TestLauncherContextManager +# ============================================================================ + + +class TestLauncherContextManager: + """Tests for CloudXRLauncher used as a context manager.""" + + def test_context_manager_stops_on_exit(self, tmp_path): + """__exit__ calls stop(), cleaning up the runtime.""" + with mock_launcher_deps(tmp_path, ready=True) as mocks: + with CloudXRLauncher() as launcher: + mocks["popen"].assert_called_once() + mocks["proc"].poll.return_value = 0 + + assert launcher._runtime_proc is None + + +# ============================================================================ +# TestCleanupStaleRuntime +# ============================================================================ + + +class TestCleanupStaleRuntime: + """Tests for CloudXRLauncher._cleanup_stale_runtime.""" + + def test_removes_stale_sentinel_files(self, tmp_path): + """Stale ipc_cloudxr, runtime_started, and pidfiles are removed.""" + run_dir = str(tmp_path / "run") + os.makedirs(run_dir) + ipc_socket = os.path.join(run_dir, "ipc_cloudxr") + sentinel = os.path.join(run_dir, "runtime_started") + cloudxr_pid = os.path.join(run_dir, "cloudxr.pid") + Path(ipc_socket).touch() + Path(sentinel).touch() + Path(cloudxr_pid).touch() + + fake_cfg = _FakeEnvConfig(run_dir, tmp_path / "logs") + + with patch( + "isaacteleop.cloudxr.launcher.subprocess.run", + return_value=MagicMock(returncode=1), + ): + CloudXRLauncher._cleanup_stale_runtime(fake_cfg) + + assert not os.path.exists(ipc_socket) + assert not os.path.exists(sentinel) + assert not os.path.exists(cloudxr_pid) + + def test_noop_when_no_stale_files(self, tmp_path): + """No errors when the run directory has no stale files.""" + run_dir = str(tmp_path / "run") + os.makedirs(run_dir) + + fake_cfg = _FakeEnvConfig(run_dir, tmp_path / "logs") + CloudXRLauncher._cleanup_stale_runtime(fake_cfg) + + def test_handles_missing_fuser(self, tmp_path): + """Sentinel files are still cleaned up when fuser is not found.""" + run_dir = str(tmp_path / "run") + os.makedirs(run_dir) + ipc_socket = os.path.join(run_dir, "ipc_cloudxr") + sentinel = os.path.join(run_dir, "runtime_started") + cloudxr_pid = os.path.join(run_dir, "cloudxr.pid") + Path(ipc_socket).touch() + Path(sentinel).touch() + Path(cloudxr_pid).touch() + + fake_cfg = _FakeEnvConfig(run_dir, tmp_path / "logs") + + with patch( + "isaacteleop.cloudxr.launcher.subprocess.run", + side_effect=FileNotFoundError("fuser not found"), + ): + CloudXRLauncher._cleanup_stale_runtime(fake_cfg) + + assert not os.path.exists(ipc_socket) + assert not os.path.exists(sentinel) + assert not os.path.exists(cloudxr_pid) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/src/core/cloudxr_tests/python/test_runtime.py b/src/core/cloudxr_tests/python/test_runtime.py new file mode 100644 index 000000000..a3dda8558 --- /dev/null +++ b/src/core/cloudxr_tests/python/test_runtime.py @@ -0,0 +1,194 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for isaacteleop.cloudxr.runtime — wait_for_runtime_ready_sync and +terminate_or_kill_runtime.""" + +import os +import threading +import time + +import pytest +from unittest.mock import MagicMock, patch + +from isaacteleop.cloudxr.runtime import ( + terminate_or_kill_runtime, + wait_for_runtime_ready_sync, +) + + +# ============================================================================ +# Helpers +# ============================================================================ + + +class _FakeEnvConfig: + """Minimal stand-in for EnvConfig that redirects openxr_run_dir to a tmp path.""" + + def __init__(self, run_dir: str) -> None: + self._run_dir = run_dir + + def openxr_run_dir(self) -> str: + return self._run_dir + + +# ============================================================================ +# TestWaitForRuntimeReadySync +# ============================================================================ + + +class TestWaitForRuntimeReadySync: + """Tests for the synchronous sentinel-file polling helper.""" + + def test_returns_true_when_sentinel_exists(self, tmp_path): + """Immediately returns True when runtime_started already exists.""" + run_dir = str(tmp_path / "run") + os.makedirs(run_dir) + (tmp_path / "run" / "runtime_started").touch() + + fake_cfg = _FakeEnvConfig(run_dir) + with patch("isaacteleop.cloudxr.runtime.get_env_config", return_value=fake_cfg): + result = wait_for_runtime_ready_sync( + is_process_alive=lambda: True, + timeout_sec=1.0, + poll_interval_sec=0.05, + ) + + assert result is True + + def test_returns_false_on_timeout(self, tmp_path): + """Returns False when sentinel never appears within the timeout.""" + run_dir = str(tmp_path / "run") + os.makedirs(run_dir) + + fake_cfg = _FakeEnvConfig(run_dir) + with patch("isaacteleop.cloudxr.runtime.get_env_config", return_value=fake_cfg): + start = time.monotonic() + result = wait_for_runtime_ready_sync( + is_process_alive=lambda: True, + timeout_sec=0.2, + poll_interval_sec=0.05, + ) + elapsed = time.monotonic() - start + + assert result is False + assert elapsed >= 0.2 + + def test_returns_false_when_process_dies(self, tmp_path): + """Returns False immediately when is_process_alive reports dead.""" + run_dir = str(tmp_path / "run") + os.makedirs(run_dir) + + fake_cfg = _FakeEnvConfig(run_dir) + with patch("isaacteleop.cloudxr.runtime.get_env_config", return_value=fake_cfg): + start = time.monotonic() + result = wait_for_runtime_ready_sync( + is_process_alive=lambda: False, + timeout_sec=5.0, + poll_interval_sec=0.05, + ) + elapsed = time.monotonic() - start + + assert result is False + assert elapsed < 1.0 + + def test_detects_sentinel_created_mid_wait(self, tmp_path): + """Returns True when sentinel appears partway through the wait.""" + run_dir = str(tmp_path / "run") + os.makedirs(run_dir) + sentinel = tmp_path / "run" / "runtime_started" + + def _create_sentinel_later(): + time.sleep(0.15) + sentinel.touch() + + threading.Thread(target=_create_sentinel_later, daemon=True).start() + + fake_cfg = _FakeEnvConfig(run_dir) + with patch("isaacteleop.cloudxr.runtime.get_env_config", return_value=fake_cfg): + result = wait_for_runtime_ready_sync( + is_process_alive=lambda: True, + timeout_sec=2.0, + poll_interval_sec=0.05, + ) + + assert result is True + + def test_respects_custom_timeout_and_poll_interval(self, tmp_path): + """Completes quickly with a tiny timeout, honouring custom values.""" + run_dir = str(tmp_path / "run") + os.makedirs(run_dir) + + fake_cfg = _FakeEnvConfig(run_dir) + with patch("isaacteleop.cloudxr.runtime.get_env_config", return_value=fake_cfg): + start = time.monotonic() + result = wait_for_runtime_ready_sync( + is_process_alive=lambda: True, + timeout_sec=0.1, + poll_interval_sec=0.02, + ) + elapsed = time.monotonic() - start + + assert result is False + assert elapsed < 0.5 + + +# ============================================================================ +# TestTerminateOrKillRuntime +# ============================================================================ + + +def _make_mock_process(alive_sequence: list[bool]) -> MagicMock: + """Create a mock multiprocessing.Process whose is_alive() returns values from a sequence. + + Each call to is_alive() pops the next value; once exhausted it always returns False. + """ + proc = MagicMock() + seq = list(alive_sequence) + + def _is_alive(): + if seq: + return seq.pop(0) + return False + + proc.is_alive = MagicMock(side_effect=_is_alive) + return proc + + +class TestTerminateOrKillRuntime: + """Tests for the multiprocessing.Process termination helper.""" + + def test_terminates_cleanly(self): + """Process exits after terminate() — no kill needed.""" + proc = _make_mock_process([True, False]) + terminate_or_kill_runtime(proc) + + proc.terminate.assert_called_once() + proc.kill.assert_not_called() + + def test_escalates_to_kill(self): + """Process survives terminate(), exits after kill().""" + # is_alive() is called 3 times: before terminate, before kill, final check + proc = _make_mock_process([True, True, False]) + terminate_or_kill_runtime(proc) + + proc.terminate.assert_called_once() + proc.kill.assert_called_once() + + def test_raises_if_unkillable(self): + """RuntimeError when process stays alive after both terminate and kill.""" + proc = _make_mock_process([True, True, True, True, True]) + with pytest.raises(RuntimeError, match="Failed to terminate or kill"): + terminate_or_kill_runtime(proc) + + def test_noop_if_already_dead(self): + """No terminate/kill calls when the process is already dead.""" + proc = _make_mock_process([False]) + terminate_or_kill_runtime(proc) + + proc.terminate.assert_not_called() + proc.kill.assert_not_called() + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])