diff --git a/python/Dockerfile b/python/Dockerfile index e7ea16864..eef74e679 100644 --- a/python/Dockerfile +++ b/python/Dockerfile @@ -7,11 +7,19 @@ RUN dnf install -y make git && \ COPY --from=uv /uv /uvx /bin/ FROM fedora:43 AS product -RUN dnf install -y python3 ustreamer libusb1 android-tools python3-libgpiod && \ +RUN dnf install -y python3 ustreamer libusb1 android-tools python3-libgpiod curl && \ dnf clean all && \ rm -rf /var/cache/dnf COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/ +ARG FLS_VERSION=0.1.9 +RUN ARCH=$(uname -m) && \ + URL="https://github.com/jumpstarter-dev/fls/releases/download/${FLS_VERSION}/fls-${ARCH}-linux" && \ + TEMP_FILE="/tmp/fls-${ARCH}-linux.tmp" && \ + curl -fsSL "${URL}" -o "${TEMP_FILE}" && \ + mv "${TEMP_FILE}" /usr/local/bin/fls && \ + chmod +x /usr/local/bin/fls + FROM builder AS wheels ADD . /src RUN make -C /src/python build diff --git a/python/packages/jumpstarter-driver-flashers/jumpstarter_driver_flashers/client.py b/python/packages/jumpstarter-driver-flashers/jumpstarter_driver_flashers/client.py index 4a19b9891..e1271c1d6 100644 --- a/python/packages/jumpstarter-driver-flashers/jumpstarter_driver_flashers/client.py +++ b/python/packages/jumpstarter-driver-flashers/jumpstarter_driver_flashers/client.py @@ -25,6 +25,7 @@ from jumpstarter.client.decorators import driver_click_group from jumpstarter.common.exceptions import ArgumentError, JumpstarterException +from jumpstarter.common.fls import get_fls_github_url class FlashError(JumpstarterException): @@ -533,8 +534,8 @@ def _flash_with_fls( self._download_fls_binary(console, prompt, fls_binary_url, f"Failed to download FLS from {fls_binary_url}") elif fls_version != "": self.logger.info(f"Downloading FLS version {fls_version} from GitHub releases") - # Download fls binary to the target device (until it is available on the target device) - fls_url = f"https://github.com/jumpstarter-dev/fls/releases/download/{fls_version}/fls-aarch64-linux" + # Download fls binary to the target device (always aarch64 for target devices) + fls_url = get_fls_github_url(fls_version, arch="aarch64") self._download_fls_binary(console, prompt, fls_url, f"Failed to download FLS from {fls_url}") # Flash the image diff --git a/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client.py b/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client.py index 2a557eeca..39e43b4fa 100644 --- a/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client.py +++ b/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client.py @@ -1,7 +1,9 @@ +import os from dataclasses import dataclass from pathlib import Path from typing import Dict, Optional +import click from jumpstarter_driver_composite.client import CompositeClient from jumpstarter_driver_opendal.client import FlasherClient, operator_for_path from jumpstarter_driver_power.client import PowerClient @@ -68,7 +70,7 @@ def flash_images(self, partitions: Dict[str, str], operators: Optional[Dict[str, detection_result = self.call("detect_fastboot_device", 5, 2.0) if detection_result["status"] != "device_found": - raise RuntimeError("No fastboot devices found. Make sure device is in fastboot mode.") + raise click.ClickException("No fastboot devices found. Make sure device is in fastboot mode.") device_id = detection_result["device_id"] self.logger.info(f"found fastboot device: {device_id}") @@ -77,6 +79,43 @@ def flash_images(self, partitions: Dict[str, str], operators: Optional[Dict[str, return flash_result + def _is_oci_path(self, path: str) -> bool: + """Return True if path looks like an OCI image reference.""" + return path.startswith("oci://") or ( + ":" in path and "/" in path and not path.startswith("/") and not path.startswith(("http://", "https://")) + ) + + def _validate_partition_mappings(self, partitions: Dict[str, str] | None) -> None: + """Validate partition mappings; raise ValueError if any path is empty.""" + if partitions is None: + return + for partition_name, file_path in partitions.items(): + if not file_path or not file_path.strip(): + raise ValueError( + f"Partition '{partition_name}' has an empty file path. " + f"Please provide a valid file path (e.g., -t {partition_name}:/path/to/image)" + ) + + def _power_off_if_available(self) -> None: + """Power off device if power child is present.""" + if "power" in self.children: + self.power.off() + self.logger.info("device powered off") + else: + self.logger.info("device left running") + + def _execute_flash_operation(self, operation_func, *args, **kwargs): + """Common wrapper for flash operations with logging and power management.""" + self.logger.info("Starting RideSX flash operation") + self.boot_to_fastboot() + + try: + result = operation_func(*args, **kwargs) + self.logger.info("flash operation completed successfully") + return result + finally: + self._power_off_if_available() + def flash( self, path: str | Dict[str, str], @@ -85,41 +124,233 @@ def flash( operator: Operator | Dict[str, Operator] | None = None, compression=None, ): + """Flash image to DUT - supports both OCI and traditional paths. + + Args: + path: File path, URL, or OCI image reference (or dict of partition->path mappings) + target: Target partition (for single file mode) + operator: Optional operator for file access (usually auto-detected) + compression: Compression type + """ + # Auto-detect flash mode based on path type if isinstance(path, dict): - partitions = path - operators = operator if isinstance(operator, dict) else None + # Dictionary mode: {partition: file_path, ...} + operators_dict = operator if isinstance(operator, dict) else None + return self.flash_local(path, operators_dict) + + elif isinstance(path, str) and (path.startswith("oci://") or self._is_oci_path(path)): + # OCI mode: auto-detect partitions or use target as partition->filename mapping + if target and ":" in target: + # Target is "partition:filename" format for OCI explicit mapping + partition_name, filename = target.split(":", 1) + partitions = {partition_name: filename} + return self.flash_with_targets(path, partitions) + else: + # OCI auto-detection mode + return self.flash_oci_auto(path, None) + else: + # Traditional single file mode if target is None: raise ValueError( - "This driver requires a target partition.\n" - "Usage: j storage flash --target :\n" - "Example: j storage flash -t boot_a:aboot.img -t system_a:rootfs.simg -t system_b:qm_var.simg" + "This driver requires a target partition for non-OCI paths.\n" + "Usage: client.flash('/path/to/file.img', target='boot_a')\n" + "For OCI: client.flash('oci://registry.com/image:tag')\n" + "For dict: client.flash({'boot_a': '/path/to/file.img'})" ) + + # Use operator if provided, otherwise auto-detect + if operator is not None: + operators = {target: operator} if isinstance(operator, Operator) else operator + else: + operators = None + partitions = {target: path} - operators = {target: operator} if isinstance(operator, Operator) else None + return self.flash_local(partitions, operators) - for partition_name, file_path in partitions.items(): - if not file_path or not file_path.strip(): - raise ValueError( - f"Partition '{partition_name}' has an empty file path. " - f"Please provide a valid file path (e.g., -t {partition_name}:/path/to/image)" - ) + def flash_with_targets( + self, + oci_url: str, + partitions: Dict[str, str], + ): + """Flash OCI image with explicit partition mappings. - self.logger.info("Starting RideSX flash operation") + Args: + oci_url: OCI image URL (must start with oci://) + partitions: Mapping of partition name -> filename in OCI image - self.boot_to_fastboot() + Raises: + ValueError: If partitions is empty or None + """ + if not partitions: + raise ValueError( + "flash_with_targets requires a non-empty mapping of partition name -> filename. " + "Use flash() for auto-detection mode." + ) + self._validate_partition_mappings(partitions) - result = self.flash_images(partitions, operators) + self.logger.info(f"Using FLS OCI flash with explicit mapping for image: {oci_url}") - self.logger.info("flash operation completed successfully") + def _flash_operation(): + return self._flash_oci_auto_impl(oci_url, partitions) - if "power" in self.children: - self.power.off() - self.logger.info("device powered off") - else: - self.logger.info("device left running") + return self._execute_flash_operation(_flash_operation) + + def flash_local( + self, + partitions: Dict[str, str], + operators: Dict[str, Operator] | None = None, + ): + """Flash local files or URLs to partitions. + + Args: + partitions: Mapping of partition name -> file path or URL + operators: Optional mapping of partition name -> operator + """ + self._validate_partition_mappings(partitions) + + self.logger.info(f"Flashing local files: {list(partitions.keys())}") + + def _flash_operation(): + return self.flash_images(partitions, operators) + + return self._execute_flash_operation(_flash_operation) + + def _read_oci_credentials(self): + """Read OCI registry credentials from environment variables. - return result + Returns: + Tuple of (username, password), both None if not set. + + Raises: + click.ClickException: If only one of username/password is set. + """ + username = os.environ.get("OCI_USERNAME") + password = os.environ.get("OCI_PASSWORD") + + if bool(username) != bool(password): + raise click.ClickException( + "OCI authentication requires both OCI_USERNAME and OCI_PASSWORD environment variables" + ) + + return username, password + + def _flash_oci_auto_impl( + self, + oci_url: str, + partitions: Dict[str, str] | None = None, + ): + """Core implementation of OCI flash without wrapper logic.""" + oci_username, oci_password = self._read_oci_credentials() + + self.logger.info("Checking for fastboot devices on Exporter...") + detection_result = self.call("detect_fastboot_device", 5, 2.0) + + if detection_result["status"] != "device_found": + raise click.ClickException("No fastboot devices found. Make sure device is in fastboot mode.") + + device_id = detection_result["device_id"] + self.logger.info(f"Found fastboot device: {device_id}") + + flash_result = self.call( + "flash_oci_image", oci_url, partitions, + oci_username, oci_password, + ) + + # Display FLS output to user + if flash_result.get("status") == "success" and flash_result.get("output"): + self.logger.info("FLS fastboot completed successfully") + # Log the detailed output for user visibility + for line in flash_result["output"].strip().split("\n"): + if line.strip(): + self.logger.info(f"FLS: {line.strip()}") + + return flash_result + + def flash_oci_auto( + self, + oci_url: str, + partitions: Dict[str, str] | None = None, + ): + """Flash OCI image using auto-detection or explicit partition mapping + + Args: + oci_url: OCI image reference (e.g., "oci://registry.com/image:latest") + partitions: Optional mapping of partition -> filename inside OCI image + """ + # Normalize OCI URL + if not oci_url.startswith("oci://"): + if "://" in oci_url: + raise ValueError(f"Only oci:// URLs are supported, got: {oci_url}") + if ":" in oci_url and "/" in oci_url: + oci_url = f"oci://{oci_url}" + else: + raise ValueError(f"Invalid OCI URL format: {oci_url}") + + if partitions: + self.logger.info(f"Flashing OCI image with explicit mapping: {list(partitions.keys())}") + else: + self.logger.info(f"Auto-detecting partitions for OCI image: {oci_url}") + + def _flash_operation(): + return self._flash_oci_auto_impl(oci_url, partitions) + + return self._execute_flash_operation(_flash_operation) + + def _parse_target_specs(self, target_specs: tuple[str, ...]) -> dict[str, str]: + """Parse -t target specs into a partition->path mapping.""" + mapping: dict[str, str] = {} + for spec in target_specs: + if ":" not in spec: + raise click.ClickException(f"Invalid target spec '{spec}'. Expected format: partition:path") + name, path = spec.split(":", 1) + mapping[name] = path + return mapping + + def _parse_and_validate_targets(self, target_specs: tuple[str, ...]): + """Parse and validate target specifications, returning (mapping, single_target).""" + mapping = {} + single_target = None + + for spec in target_specs: + if ":" in spec: + # Multi-partition format: partition:path + partition, file_path = spec.split(":", 1) + mapping[partition] = file_path + else: + # Single partition format: just partition name + if single_target is not None: + raise click.ClickException("Cannot mix single-partition and multi-partition target specs") + single_target = spec + + if mapping and single_target: + raise click.ClickException("Cannot mix single-partition and multi-partition target specs") + + return mapping, single_target + + def _execute_flash_command(self, path, target_specs): + """Execute flash command logic with proper argument handling.""" + # Parse target specifications + if target_specs: + mapping, single_target = self._parse_and_validate_targets(target_specs) + + if mapping: + if path: + # Multi-partition mode with path: extract specific files from OCI image + self.flash_with_targets(path, mapping) + else: + # Multi-partition mode: use mapping as dict for local files + self.flash(mapping) + else: + # Single partition mode: use path with target + if not path: + raise click.ClickException("Path argument required when using single-partition target") + self.flash(path, target=single_target) + elif path: + # Path only - should be OCI for auto-detection + self.flash(path) + else: + raise click.ClickException("Provide a path or use -t to specify partition mappings") def cli(self): generic_cli = FlasherClient.cli(self) @@ -129,8 +360,49 @@ def base(): """RideSX storage operations""" pass + # Add all generic commands except 'flash' (we override it) for name, cmd in generic_cli.commands.items(): - base.add_command(cmd, name=name) + if name != "flash": + base.add_command(cmd, name=name) + + @base.command() + @click.argument("path", required=False) + @click.option( + "-t", + "--target", + "target_specs", + multiple=True, + help="Target spec as partition:path for multi-partition or just partition for single file", + ) + def flash(path, target_specs): + """Flash image to device. + + \b + Examples: + # OCI auto-detection + j storage flash oci://registry.com/image:tag + + # OCI with explicit partition->filename mapping + j storage flash -t boot_a:boot.img oci://registry.com/image:tag + + # OCI with registry credentials (via env vars) + OCI_USERNAME=user OCI_PASSWORD=pass j storage flash oci://registry.com/image:tag + + # Single file to partition + j storage flash /local/boot.img --target boot_a + + # Multiple files + j storage flash -t boot_a:/local/boot.img -t system_a:/local/system.img + + # HTTP URLs + j storage flash -t boot_a:https://example.com/boot.img + + \b + Environment variables: + OCI_USERNAME Registry username for private OCI images + OCI_PASSWORD Registry password for private OCI images + """ + self._execute_flash_command(path, target_specs) @base.command() def boot_to_fastboot(): diff --git a/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client_test.py b/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client_test.py new file mode 100644 index 000000000..f4d5d9e02 --- /dev/null +++ b/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client_test.py @@ -0,0 +1,91 @@ +import tempfile +from unittest.mock import patch + +import click +import pytest +from jumpstarter_driver_pyserial.driver import PySerial + +from .driver import RideSXDriver +from jumpstarter.common.utils import serve + + +@pytest.fixture(scope="session") +def temp_storage_dir(): + with tempfile.TemporaryDirectory() as temp_dir: + yield temp_dir + + +@pytest.fixture(scope="session") +def ridesx_driver(temp_storage_dir): + yield RideSXDriver( + storage_dir=temp_storage_dir, + children={ + "serial": PySerial(url="loop://"), + }, + ) + + +@pytest.fixture +def ridesx_client(ridesx_driver): + """Create a client instance for testing client-side methods""" + with serve(ridesx_driver) as client: + yield client + + +# Validate Partition Mappings Tests + + +def test_validate_partition_mappings(ridesx_client): + """Test partition mapping validation""" + # None is valid (auto-detect mode) + ridesx_client._validate_partition_mappings(None) + + # Valid mapping + ridesx_client._validate_partition_mappings({"boot": "/path/to/boot.img"}) + + # Empty path raises + with pytest.raises(ValueError, match="has an empty file path"): + ridesx_client._validate_partition_mappings({"boot": ""}) + + # Whitespace-only path raises + with pytest.raises(ValueError, match="has an empty file path"): + ridesx_client._validate_partition_mappings({"boot": " "}) + + +# Flash OCI Auto Tests + + +def test_flash_oci_auto_success(ridesx_client): + """Test successful flash_oci_auto call""" + with patch.object(ridesx_client, "call") as mock_call: + mock_call.side_effect = [ + None, # boot_to_fastboot call + {"status": "device_found", "device_id": "ABC123"}, + {"status": "success"}, + ] + + result = ridesx_client.flash_oci_auto("oci://quay.io/org/image:tag") + + assert result == {"status": "success"} + # Verify flash_oci_image was called with the OCI URL + flash_call = mock_call.call_args_list[2] + assert flash_call[0][0] == "flash_oci_image" + assert flash_call[0][1] == "oci://quay.io/org/image:tag" + + +def test_flash_oci_auto_error_cases(ridesx_client): + """Test flash_oci_auto error handling""" + # URL without oci:// scheme + with pytest.raises(ValueError, match="Only oci:// URLs are supported"): + ridesx_client.flash_oci_auto("docker://image:tag") + + # Invalid URL format + with pytest.raises(ValueError, match="Invalid OCI URL format"): + ridesx_client.flash_oci_auto("invalid-url") + + # No device found + with patch.object(ridesx_client, "call") as mock_call: + mock_call.return_value = {"status": "no_device_found", "device_id": None} + + with pytest.raises(click.ClickException, match="No fastboot devices found"): + ridesx_client.flash_oci_auto("oci://image:tag") diff --git a/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/driver.py b/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/driver.py index 7898f4aae..b240b6888 100644 --- a/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/driver.py +++ b/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/driver.py @@ -1,4 +1,5 @@ import asyncio +import os import subprocess import time from dataclasses import dataclass, field @@ -8,17 +9,32 @@ from jumpstarter_driver_opendal.driver import Opendal from jumpstarter.common.exceptions import ConfigurationError +from jumpstarter.common.fls import get_fls_binary from jumpstarter.driver import Driver, export @dataclass(kw_only=True) class RideSXDriver(Driver): """RideSX Driver""" - decompression_timeout: int = field(default=15 * 60) # 15 minutes - flash_timeout: int = field(default=30 * 60) # 30 minutes - continue_timeout: int = field(default=20 * 60) # 20 minutes + + decompression_timeout: int = field(default=15 * 60) # 15 minutes + flash_timeout: int = field(default=30 * 60) # 30 minutes + continue_timeout: int = field(default=20 * 60) # 20 minutes storage_dir: str = field(default="/var/lib/jumpstarter/ridesx") + # FLS configuration + fls_version: str | None = field(default=None) + fls_allow_custom_binaries: bool = field( + default=False, + metadata={ + "help": "⚠️ SECURITY WARNING: Enables downloading custom FLS binaries. Only use in trusted environments." + } + ) + fls_custom_binary_url: str | None = field( + default=None, + metadata={"help": "Custom URL for FLS binary download. Requires fls_allow_custom_binaries=True."} + ) + def __post_init__(self): if hasattr(super(), "__post_init__"): super().__post_init__() @@ -26,6 +42,14 @@ def __post_init__(self): if "serial" not in self.children: raise ConfigurationError("'serial' instance is required") + # Security warning for custom binary downloads + if self.fls_allow_custom_binaries: + self.logger.warning( + "⚠️ SECURITY WARNING: Custom FLS binary downloads are enabled. " + "This allows arbitrary code execution on the exporter host. " + "Only use this in trusted environments with verified binary sources." + ) + Path(self.storage_dir).mkdir(parents=True, exist_ok=True) self.children["storage"] = Opendal( scheme="fs", @@ -197,6 +221,88 @@ def flash_with_fastboot(self, device_id: str, partitions: Dict[str, str]): self.logger.warning(f"stdout: {e.stdout}") self.logger.warning(f"stderr: {e.stderr}") + def _build_fls_command(self, oci_url, partitions): + """Build FLS fastboot command and environment.""" + fls_binary = get_fls_binary( + fls_version=self.fls_version, + fls_binary_url=self.fls_custom_binary_url, + allow_custom_binaries=self.fls_allow_custom_binaries, + ) + fls_cmd = [fls_binary, "fastboot", oci_url] + + if partitions: + for partition_name, filename in sorted(partitions.items()): + if not filename or not filename.strip(): + raise ValueError( + f"Partition '{partition_name}' has an empty filename. " + "Each partition must have a non-empty filename." + ) + fls_cmd.extend(["-t", f"{partition_name}:{filename}"]) + + fls_cmd.extend(["--timeout", str(self.flash_timeout)]) + return fls_cmd + + @export + def flash_oci_image( + self, + oci_url: str, + partitions: Dict[str, str] | None = None, + oci_username: str | None = None, + oci_password: str | None = None, + ): + """Flash OCI image using FLS fastboot CLI + + Args: + oci_url: OCI image reference (e.g., "quay.io/bzlotnik/ridesx-image:latest") + partitions: Optional mapping of partition -> filename inside OCI image + oci_username: Registry username for OCI authentication + oci_password: Registry password for OCI authentication + """ + if not oci_url.startswith("oci://"): + raise ValueError(f"OCI URL must start with oci://, got: {oci_url}") + + if bool(oci_username) != bool(oci_password): + raise ValueError("OCI authentication requires both --username and --password") + + fls_cmd = self._build_fls_command(oci_url, partitions) + + fls_env = os.environ.copy() + if oci_username and oci_password: + fls_env["FLS_REGISTRY_USERNAME"] = oci_username + fls_env["FLS_REGISTRY_PASSWORD"] = oci_password + + self.logger.info(f"Running FLS fastboot: {' '.join(fls_cmd)}") + if oci_username: + self.logger.info("Using OCI registry credentials from environment") + + try: + result = subprocess.run( + fls_cmd, capture_output=True, text=True, + check=True, timeout=self.flash_timeout + 30, env=fls_env, + ) + + self.logger.info("FLS fastboot auto-detection completed successfully") + self.logger.debug(f"FLS stdout: {result.stdout}") + if result.stderr: + self.logger.debug(f"FLS stderr: {result.stderr}") + + return {"status": "success", "output": result.stdout} + + except subprocess.CalledProcessError as e: + self.logger.error(f"FLS fastboot failed - return code: {e.returncode}") + self.logger.error(f"stdout: {e.stdout}") + self.logger.error(f"stderr: {e.stderr}") + output = (e.stderr or e.stdout or "").strip() + raise RuntimeError(f"FLS fastboot failed: {output}") from e + + except subprocess.TimeoutExpired: + self.logger.error("FLS fastboot auto-detection timed out") + raise RuntimeError("FLS fastboot auto-detection timeout") from None + + except FileNotFoundError: + self.logger.error("FLS command not found - ensure FLS is installed and in PATH") + raise RuntimeError("FLS command not found") from None + @export async def boot_to_fastboot(self): """Boot device to fastboot mode""" diff --git a/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/driver_test.py b/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/driver_test.py index acfeb8f8b..34d056832 100644 --- a/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/driver_test.py +++ b/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/driver_test.py @@ -452,3 +452,151 @@ def test_power_rescue(ridesx_power_driver): with pytest.raises(NotImplementedError, match="Rescue mode not available"): client.call("rescue") + +# Flash OCI Image Tests +# Note: FLS download utilities are tested in jumpstarter.common.fls_test + + +def test_flash_oci_image_success(temp_storage_dir, ridesx_driver): + with serve(ridesx_driver) as client: + with patch("jumpstarter_driver_ridesx.driver.get_fls_binary", return_value="/usr/local/bin/fls"): + with patch("subprocess.run") as mock_subprocess: + mock_result = MagicMock() + mock_result.stdout = "Flashing complete" + mock_result.stderr = "" + mock_result.returncode = 0 + mock_subprocess.return_value = mock_result + + result = client.call("flash_oci_image", "oci://quay.io/image:tag", None) + + assert result["status"] == "success" + mock_subprocess.assert_called_once() + call_args = mock_subprocess.call_args[0][0] + assert call_args[0] == "/usr/local/bin/fls" + assert call_args[1] == "fastboot" + assert call_args[2] == "oci://quay.io/image:tag" + + +def test_flash_oci_image_with_partitions(temp_storage_dir, ridesx_driver): + with serve(ridesx_driver) as client: + with patch("jumpstarter_driver_ridesx.driver.get_fls_binary", return_value="fls"): + with patch("subprocess.run") as mock_subprocess: + mock_result = MagicMock() + mock_result.stdout = "Flashing complete" + mock_result.stderr = "" + mock_result.returncode = 0 + mock_subprocess.return_value = mock_result + + partitions = {"boot_a": "boot.img", "system_a": "rootfs.simg"} + result = client.call("flash_oci_image", "oci://image:tag", partitions) + + assert result["status"] == "success" + call_args = mock_subprocess.call_args[0][0] + # Check that -t flags are present for partitions + assert "-t" in call_args + assert "boot_a:boot.img" in call_args + assert "system_a:rootfs.simg" in call_args + + +def test_flash_oci_image_error_cases(temp_storage_dir, ridesx_driver): + """Test flash_oci_image error handling for various failure modes""" + from jumpstarter.client.core import DriverError + + with serve(ridesx_driver) as client: + # Reject non-oci:// schemes + with pytest.raises(DriverError, match="OCI URL must start with oci://"): + client.call("flash_oci_image", "docker://image:tag", None) + + with patch("jumpstarter_driver_ridesx.driver.get_fls_binary", return_value="fls"): + with patch("subprocess.run") as mock_subprocess: + # CalledProcessError + error = subprocess.CalledProcessError(1, "fls") + error.stdout = "" + error.stderr = "Flash failed" + mock_subprocess.side_effect = error + + with pytest.raises(DriverError, match="FLS fastboot failed: Flash failed"): + client.call("flash_oci_image", "oci://image:tag", None) + + # TimeoutExpired + mock_subprocess.side_effect = subprocess.TimeoutExpired("fls", 1800) + + with pytest.raises(DriverError, match="FLS fastboot auto-detection timeout"): + client.call("flash_oci_image", "oci://image:tag", None) + + # FileNotFoundError + mock_subprocess.side_effect = FileNotFoundError("fls not found") + + with pytest.raises(DriverError, match="FLS command not found"): + client.call("flash_oci_image", "oci://image:tag", None) + + +def test_flash_oci_image_with_credentials(temp_storage_dir, ridesx_driver): + """Test that OCI credentials are passed via env vars to FLS""" + with serve(ridesx_driver) as client: + with patch("jumpstarter_driver_ridesx.driver.get_fls_binary", return_value="fls"): + with patch("subprocess.run") as mock_subprocess: + mock_result = MagicMock() + mock_result.stdout = "Flashing complete" + mock_result.stderr = "" + mock_result.returncode = 0 + mock_subprocess.return_value = mock_result + + result = client.call( + "flash_oci_image", "oci://quay.io/private/image:tag", None, "myuser", "mypass" + ) + + assert result["status"] == "success" + # Credentials should NOT appear in the command args + call_args = mock_subprocess.call_args[0][0] + assert "-u" not in call_args + assert "-p" not in call_args + assert "myuser" not in call_args + assert "mypass" not in call_args + # Credentials should be passed via env vars + call_kwargs = mock_subprocess.call_args[1] + env = call_kwargs["env"] + assert env["FLS_REGISTRY_USERNAME"] == "myuser" + assert env["FLS_REGISTRY_PASSWORD"] == "mypass" + + +def test_flash_oci_image_partial_credentials_rejected(temp_storage_dir, ridesx_driver): + """Test that providing only username or only password is rejected""" + from jumpstarter.client.core import DriverError + + with serve(ridesx_driver) as client: + with pytest.raises(DriverError, match="OCI authentication requires both"): + client.call("flash_oci_image", "oci://image:tag", None, "myuser", None) + + with pytest.raises(DriverError, match="OCI authentication requires both"): + client.call("flash_oci_image", "oci://image:tag", None, None, "mypass") + + +def test_flash_oci_image_no_credentials(temp_storage_dir, ridesx_driver): + """Test that omitting credentials works (anonymous access)""" + with serve(ridesx_driver) as client: + with patch("jumpstarter_driver_ridesx.driver.get_fls_binary", return_value="fls"): + with patch("subprocess.run") as mock_subprocess: + mock_result = MagicMock() + mock_result.stdout = "Flashing complete" + mock_result.stderr = "" + mock_result.returncode = 0 + mock_subprocess.return_value = mock_result + + result = client.call("flash_oci_image", "oci://image:tag", None, None, None) + + assert result["status"] == "success" + call_kwargs = mock_subprocess.call_args[1] + env = call_kwargs["env"] + assert "FLS_REGISTRY_USERNAME" not in env + assert "FLS_REGISTRY_PASSWORD" not in env + + +def test_flash_oci_image_requires_oci_scheme(temp_storage_dir, ridesx_driver): + """Test that only oci:// URLs are accepted""" + from jumpstarter.client.core import DriverError + + with serve(ridesx_driver) as client: + # Bare registry URL should be rejected + with pytest.raises(DriverError, match="OCI URL must start with oci://"): + client.call("flash_oci_image", "quay.io/org/image:v1", None) diff --git a/python/packages/jumpstarter/jumpstarter/common/__init__.py b/python/packages/jumpstarter/jumpstarter/common/__init__.py index 13058cb09..5d76d7ce5 100644 --- a/python/packages/jumpstarter/jumpstarter/common/__init__.py +++ b/python/packages/jumpstarter/jumpstarter/common/__init__.py @@ -1,4 +1,13 @@ +from .fls import download_fls, get_fls_binary, get_fls_github_url from .metadata import Metadata from .tempfile import TemporarySocket, TemporaryTcpListener, TemporaryUnixListener -__all__ = ["Metadata", "TemporarySocket", "TemporaryUnixListener", "TemporaryTcpListener"] +__all__ = [ + "Metadata", + "TemporarySocket", + "TemporaryUnixListener", + "TemporaryTcpListener", + "download_fls", + "get_fls_binary", + "get_fls_github_url", +] diff --git a/python/packages/jumpstarter/jumpstarter/common/fls.py b/python/packages/jumpstarter/jumpstarter/common/fls.py new file mode 100644 index 000000000..0f61f7885 --- /dev/null +++ b/python/packages/jumpstarter/jumpstarter/common/fls.py @@ -0,0 +1,135 @@ +"""FLS (Flasher) binary utilities. + +This module provides functions for locating the FLS binary tool +used for flashing devices via fastboot with OCI image support. + +FLS is pre-installed in the container image for security and reliability, +with optional configuration-based overrides for testing and flexibility. +""" + +import logging +import os +import platform +import tempfile +import urllib.request +from pathlib import Path + +logger = logging.getLogger(__name__) + +FLS_GITHUB_REPO = "jumpstarter-dev/fls" + + +def get_fls_github_url(version: str, arch: str | None = None) -> str: + """Get GitHub release URL for FLS version. + + Args: + version: FLS version (e.g., "0.1.9") + arch: Target architecture (e.g., "aarch64", "x86_64"). If None, + auto-detects from current platform. + + Returns: + Download URL for the architecture-appropriate binary + """ + if arch is None: + arch = platform.machine().lower() + else: + arch = arch.lower() + if arch in ("aarch64", "arm64"): + binary_name = "fls-aarch64-linux" + elif arch in ("x86_64", "amd64"): + binary_name = "fls-x86_64-linux" + else: + binary_name = "fls-aarch64-linux" # Default to aarch64 + + return f"https://github.com/{FLS_GITHUB_REPO}/releases/download/{version}/{binary_name}" + + +def download_fls(url: str, timeout: float = 30.0) -> str: + """Download FLS binary from URL to a temp file with atomic operations. + + Args: + url: URL to download FLS binary from + timeout: Download timeout in seconds + + Returns: + Path to the downloaded binary + + Raises: + RuntimeError: If download fails + """ + fd, binary_path = tempfile.mkstemp(prefix="fls-") + os.close(fd) + temp_path = f"{binary_path}.part" + + try: + logger.info(f"Downloading FLS binary from: {url}") + with urllib.request.urlopen(url, timeout=timeout) as response: + with open(temp_path, 'wb') as f: + while True: + chunk = response.read(8192) + if not chunk: + break + f.write(chunk) + f.flush() + os.fsync(f.fileno()) + + # Set permissions on temp file before rename + Path(temp_path).chmod(0o755) + + # Atomic rename to final location + os.replace(temp_path, binary_path) + + logger.info(f"FLS binary downloaded to: {binary_path}") + return binary_path + + except Exception as e: + # Clean up temp files + Path(temp_path).unlink(missing_ok=True) + Path(binary_path).unlink(missing_ok=True) + logger.error(f"Failed to download FLS from {url}: {e}") + raise RuntimeError(f"Failed to download FLS from {url}: {e}") from e + + +def get_fls_binary( + fls_version: str | None = None, + fls_binary_url: str | None = None, + allow_custom_binaries: bool = False, +) -> str: + """Get path to FLS binary with configuration-based overrides. + + Args: + fls_version: Optional FLS version to download from GitHub releases + fls_binary_url: Custom URL to download FLS binary from + allow_custom_binaries: Whether custom binary URLs are allowed + + Returns: + Path to the FLS binary + + Raises: + RuntimeError: If custom binary URL provided but not allowed + + Note: + Priority order: + 1. fls_binary_url (if allow_custom_binaries=True) + 2. fls_version from GitHub releases + 3. Pre-installed system binary + """ + if fls_binary_url: + if not allow_custom_binaries: + raise RuntimeError( + "Custom FLS binary URLs are disabled for security. " + "Set allow_custom_binaries=True in driver configuration to enable." + ) + logger.warning( + f"⚠️ SECURITY: Downloading custom FLS binary from {fls_binary_url}. " + "Ensure this URL is trusted and secure." + ) + return download_fls(fls_binary_url) + + if fls_version: + github_url = get_fls_github_url(fls_version) + logger.warning(f"Downloading FLS version {fls_version} from GitHub: {github_url}") + return download_fls(github_url) + + logger.debug("Using pre-installed FLS from system PATH") + return "fls" diff --git a/python/packages/jumpstarter/jumpstarter/common/fls_test.py b/python/packages/jumpstarter/jumpstarter/common/fls_test.py new file mode 100644 index 000000000..5e5dd9022 --- /dev/null +++ b/python/packages/jumpstarter/jumpstarter/common/fls_test.py @@ -0,0 +1,98 @@ +from unittest.mock import patch + +import pytest + +from .fls import download_fls, get_fls_binary, get_fls_github_url + + +@pytest.mark.parametrize( + "arch,version,expected_binary", + [ + ("aarch64", "0.1.9", "fls-aarch64-linux"), + ("arm64", "0.1.9", "fls-aarch64-linux"), + ("x86_64", "0.2.0", "fls-x86_64-linux"), + ("amd64", "0.2.0", "fls-x86_64-linux"), + ("unknown", "0.1.9", "fls-aarch64-linux"), # defaults to aarch64 + ], +) +def test_get_fls_github_url_auto_detect(arch, version, expected_binary): + """Test architecture auto-detection from platform.machine()""" + with patch("platform.machine", return_value=arch): + url = get_fls_github_url(version) + assert url == f"https://github.com/jumpstarter-dev/fls/releases/download/{version}/{expected_binary}" + + +@pytest.mark.parametrize( + "arch,version,expected_binary", + [ + ("aarch64", "0.1.9", "fls-aarch64-linux"), + ("AARCH64", "0.1.9", "fls-aarch64-linux"), # case insensitive + ("x86_64", "0.2.0", "fls-x86_64-linux"), + ], +) +def test_get_fls_github_url_explicit_arch(arch, version, expected_binary): + """Test explicit architecture parameter (used by flashers for target device)""" + url = get_fls_github_url(version, arch=arch) + assert url == f"https://github.com/jumpstarter-dev/fls/releases/download/{version}/{expected_binary}" + + +def test_get_fls_binary_with_custom_url(): + with patch("jumpstarter.common.fls.download_fls", return_value="/tmp/custom-fls") as mock_download: + result = get_fls_binary(fls_binary_url="https://example.com/fls", allow_custom_binaries=True) + + mock_download.assert_called_once_with("https://example.com/fls") + assert result == "/tmp/custom-fls" + + +def test_get_fls_binary_custom_url_security_check(): + """Test that custom URLs are blocked when allow_custom_binaries=False.""" + with pytest.raises(RuntimeError, match="Custom FLS binary URLs are disabled for security"): + get_fls_binary(fls_binary_url="https://example.com/fls", allow_custom_binaries=False) + + +def test_get_fls_binary_with_version(): + with patch("jumpstarter.common.fls.download_fls", return_value="/tmp/fls-0.1.9") as mock_download: + with patch("jumpstarter.common.fls.get_fls_github_url", return_value="https://github.com/...") as mock_url: + result = get_fls_binary(fls_version="0.1.9") + + mock_url.assert_called_once_with("0.1.9") + mock_download.assert_called_once() + assert result == "/tmp/fls-0.1.9" + + +def test_get_fls_binary_falls_back_to_path(): + result = get_fls_binary() + assert result == "fls" + + +def test_download_fls_success(): + from unittest.mock import MagicMock, mock_open + + mock_response = MagicMock() + mock_response.read.side_effect = [b"binary data", b""] # Simulate chunked read + mock_response.__enter__ = MagicMock(return_value=mock_response) + mock_response.__exit__ = MagicMock(return_value=None) + + with patch("urllib.request.urlopen", return_value=mock_response) as mock_urlopen: + with patch("tempfile.mkstemp", return_value=(99, "/tmp/fls-test")): + with patch("os.close") as mock_close: + with patch("pathlib.Path.chmod") as mock_chmod: + with patch("os.replace") as mock_replace: + with patch("builtins.open", mock_open()): + with patch("os.fsync"): + result = download_fls("https://example.com/fls") + + mock_close.assert_called_once_with(99) + mock_urlopen.assert_called_once_with("https://example.com/fls", timeout=30.0) + mock_chmod.assert_called_once_with(0o755) + mock_replace.assert_called_once_with("/tmp/fls-test.part", "/tmp/fls-test") + assert result == "/tmp/fls-test" + + +def test_download_fls_failure(): + with patch("urllib.request.urlopen", side_effect=Exception("Network error")): + with patch("tempfile.mkstemp", return_value=(99, "/tmp/fls-test")): + with patch("os.close"): + with patch("pathlib.Path.unlink"): + with pytest.raises(RuntimeError, match="Failed to download FLS"): + download_fls("https://example.com/fls")