Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/docs/source/reference/package-apis/drivers/dds.md
3 changes: 3 additions & 0 deletions python/docs/source/reference/package-apis/drivers/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ Drivers that provide various communication interfaces:
* **[BLE](ble.md)** (`jumpstarter-driver-ble`) - Bluetooth Low Energy communication
* **[CAN](can.md)** (`jumpstarter-driver-can`) - Controller Area Network
communication
* **[DDS](dds.md)** (`jumpstarter-driver-dds`) - DDS (Data Distribution Service)
pub/sub communication using Eclipse CycloneDDS
* **[HTTP](http.md)** (`jumpstarter-driver-http`) - HTTP communication
* **[Mitmproxy](mitmproxy.md)** (`jumpstarter-driver-mitmproxy`) - HTTP(S) interception, mocking, and traffic recording
* **[Network](network.md)** (`jumpstarter-driver-network`) - Network interfaces
Expand Down Expand Up @@ -117,6 +119,7 @@ androidemulator.md
ble.md
can.md
corellium.md
dds.md
doip.md
dutlink.md
energenie.md
Expand Down
3 changes: 3 additions & 0 deletions python/packages/jumpstarter-driver-dds/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__pycache__/
.coverage
coverage.xml
108 changes: 108 additions & 0 deletions python/packages/jumpstarter-driver-dds/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# DDS Driver

`jumpstarter-driver-dds` provides DDS (Data Distribution Service) publish/subscribe
communication for Jumpstarter using [Eclipse CycloneDDS](https://cyclonedds.io/).

DDS is a middleware protocol standard (OMG DDS) for data-centric connectivity, widely used
in automotive (AUTOSAR Adaptive), robotics (ROS 2), and IoT applications. This driver
enables remote DDS domain participation, topic management, and pub/sub messaging.

## Installation

```shell
pip3 install --extra-index-url https://pkg.jumpstarter.dev/simple/ jumpstarter-driver-dds
```

## Configuration

| Parameter | Type | Default | Description |
|------------------------|--------|--------------|------------------------------------------|
| `domain_id` | int | 0 | DDS domain ID |
| `default_reliability` | str | `RELIABLE` | Default QoS reliability (`RELIABLE` or `BEST_EFFORT`) |
| `default_durability` | str | `VOLATILE` | Default QoS durability (`VOLATILE`, `TRANSIENT_LOCAL`, `TRANSIENT`, `PERSISTENT`) |
| `default_history_depth`| int | 10 | Default history depth for topics |

### Example exporter configuration

```yaml
export:
dds:
type: jumpstarter_driver_dds.driver.Dds
config:
domain_id: 0
default_reliability: RELIABLE
default_durability: VOLATILE
default_history_depth: 10
```

## Client API

### Domain Lifecycle

| Method | Description |
|--------------------------|----------------------------------------------|
| `connect()` | Connect to the DDS domain, create participant |
| `disconnect()` | Disconnect and release all resources |
| `get_participant_info()` | Get domain participant information |

### Topic Management

| Method | Description |
|--------------------------------------------------------|------------------------------------|
| `create_topic(name, fields, reliability, durability, history_depth)` | Create a topic with schema and QoS |
| `list_topics()` | List all registered topics |

### Publish / Subscribe

| Method | Description |
|-------------------------------------|------------------------------------------|
| `publish(topic_name, data)` | Publish a data sample to a topic |
| `read(topic_name, max_samples=10)` | Read (take) samples from a topic |
| `monitor(topic_name)` | Stream samples from a topic as they arrive |

### QoS Options

**Reliability:**
- `RELIABLE` -- Guaranteed delivery with acknowledgment
- `BEST_EFFORT` -- Fire-and-forget, lowest latency

**Durability:**
- `VOLATILE` -- Samples only delivered to currently connected readers
- `TRANSIENT_LOCAL` -- Late-joining readers receive cached samples
- `TRANSIENT` -- Samples survive writer restarts
- `PERSISTENT` -- Samples survive system restarts

## Example Usage

```python
from jumpstarter.common.utils import env

with env() as client:
dds = client.dds

# Connect to DDS domain
dds.connect()

# Create a topic with schema
dds.create_topic("sensor/temperature", ["value", "unit", "location"])

# Publish data
dds.publish("sensor/temperature", {
"value": "22.5",
"unit": "C",
"location": "lab1",
})

# Read samples
result = dds.read("sensor/temperature")
for sample in result.samples:
print(f"Temp: {sample.data['value']} {sample.data['unit']}")

# Stream samples (bounded read -- monitor is infinite by default)
for i, sample in enumerate(dds.monitor("sensor/temperature")):
print(f"Live: {sample.data}")
if i + 1 >= 10:
break

dds.disconnect()
```
15 changes: 15 additions & 0 deletions python/packages/jumpstarter-driver-dds/examples/exporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: jumpstarter.dev/v1alpha1
kind: ExporterConfig
metadata:
namespace: default
name: dds-exporter
endpoint: ""
token: ""
export:
dds:
type: jumpstarter_driver_dds.driver.Dds
config:
domain_id: 0
default_reliability: RELIABLE
default_durability: VOLATILE
default_history_depth: 10
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
from __future__ import annotations

import json
from collections.abc import Generator
from dataclasses import dataclass
from typing import Any

import click

from .common import (
DdsParticipantInfo,
DdsPublishResult,
DdsReadResult,
DdsSample,
DdsTopicInfo,
)
from jumpstarter.client import DriverClient
from jumpstarter.client.decorators import driver_click_group


@dataclass(kw_only=True)
class DdsClient(DriverClient):
"""Client interface for DDS (Data Distribution Service).

Provides methods to manage DDS domain participation, create topics,
publish and subscribe to data, and monitor topic streams via the
Jumpstarter remoting layer.
"""

def connect(self) -> DdsParticipantInfo:
"""Connect to the DDS domain and create a domain participant."""
return DdsParticipantInfo.model_validate(self.call("connect"))

def disconnect(self) -> None:
"""Disconnect from the DDS domain."""
self.call("disconnect")

def create_topic(
self,
name: str,
fields: list[str],
reliability: str | None = None,
durability: str | None = None,
history_depth: int | None = None,
) -> DdsTopicInfo:
"""Create a DDS topic with the given schema and QoS."""
return DdsTopicInfo.model_validate(
self.call("create_topic", name, fields, reliability, durability, history_depth)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing name, fields, reliability, durability, history_depth as positional args to self.call() is fragile. If the server-side method signature changes parameter order, this silently breaks. Using keyword arguments would be more resilient across the gRPC remoting layer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid point, though a change here would mean a framework change as DriverClient.call() has signature call(method, *args) — positional-only is the project-wide convention used by all drivers. This would need to support kwargs.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets document this as a future enhancement issue and move one

)

def list_topics(self) -> list[DdsTopicInfo]:
"""List all registered topics."""
raw = self.call("list_topics")
if not isinstance(raw, list):
raise TypeError(f"Expected list from list_topics(), got {type(raw).__name__}")
return [DdsTopicInfo.model_validate(t) for t in raw]

def publish(self, topic_name: str, data: dict[str, Any]) -> DdsPublishResult:
"""Publish a data sample to a DDS topic."""
return DdsPublishResult.model_validate(self.call("publish", topic_name, data))

def read(self, topic_name: str, max_samples: int = 10) -> DdsReadResult:
"""Read (take) samples from a DDS topic."""
return DdsReadResult.model_validate(self.call("read", topic_name, max_samples))

def get_participant_info(self) -> DdsParticipantInfo:
"""Get information about the DDS domain participant."""
return DdsParticipantInfo.model_validate(self.call("get_participant_info"))

def monitor(self, topic_name: str) -> Generator[DdsSample, None, None]:
"""Stream data samples from a topic as they arrive."""
for v in self.streamingcall("monitor", topic_name):
yield DdsSample.model_validate(v)
Comment on lines +70 to +73
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] DdsClient.monitor() does not expose the max_iterations parameter, even though the server-side monitor() accepts it. This means the bounded-iteration feature is implemented but unreachable from the client API.

Suggested fix: add max_iterations to the client method and forward it:

def monitor(self, topic_name: str, max_iterations: int = 0) -> Generator[DdsSample, None, None]:
    for v in self.streamingcall("monitor", topic_name, max_iterations):
        yield DdsSample.model_validate(v)

AI-generated, human reviewed


def _register_lifecycle_commands(self, base):
"""Register connect, disconnect, topics, and info CLI commands."""

@base.command(name="connect")
def connect_cmd():
"""Connect to DDS domain"""
info = self.connect()
click.echo(f"Connected to DDS domain {info.domain_id}")

@base.command(name="disconnect")
def disconnect_cmd():
"""Disconnect from DDS domain"""
self.disconnect()
click.echo("Disconnected from DDS domain")

@base.command()
def topics():
"""List registered topics"""
topic_list = self.list_topics()
if not topic_list:
click.echo("No topics registered")
return
for t in topic_list:
click.echo(
f" {t.name}: fields={t.fields} reliability={t.qos.reliability.value} samples={t.sample_count}"
)

@base.command()
def info():
"""Show DDS participant info"""
pinfo = self.get_participant_info()
click.echo(f"Domain ID: {pinfo.domain_id}")
click.echo(f"Connected: {pinfo.is_connected}")
click.echo(f"Topic count: {pinfo.topic_count}")

def _register_data_commands(self, base):
"""Register create-topic, publish, read, and monitor CLI commands."""

@base.command(name="create-topic")
@click.argument("name")
@click.argument("fields", nargs=-1, required=True)
@click.option("--reliability", "-r", default=None, help="QoS reliability")
@click.option("--durability", "-d", default=None, help="QoS durability")
@click.option("--history-depth", "-H", default=None, type=int, help="History depth")
def create_topic_cmd(name, fields, reliability, durability, history_depth):
"""Create a topic: create-topic NAME FIELD1 FIELD2 ..."""
topic = self.create_topic(name, list(fields), reliability, durability, history_depth)
click.echo(f"Created topic '{topic.name}' with fields {topic.fields}")

@base.command(name="publish")
@click.argument("topic_name")
@click.argument("data_json")
def publish_cmd(topic_name, data_json):
"""Publish JSON data to a topic: publish TOPIC '{"key": "val"}'"""
data = json.loads(data_json)
result = self.publish(topic_name, data)
click.echo(f"Published {result.samples_written} sample(s) to {topic_name}")

@base.command(name="read")
@click.argument("topic_name")
@click.option("--max-samples", "-n", default=10, help="Max samples to read")
def read_cmd(topic_name, max_samples):
"""Read samples from a topic"""
result = self.read(topic_name, max_samples)
click.echo(f"Read {result.sample_count} samples from {topic_name}:")
for s in result.samples:
click.echo(f" {s.data}")

@base.command(name="monitor")
@click.argument("topic_name")
@click.option("--count", "-n", default=10, help="Number of events")
def monitor_cmd(topic_name, count):
"""Monitor samples from a topic"""
for i, sample in enumerate(self.monitor(topic_name)):
click.echo(f"[{sample.topic_name}] {sample.data}")
if i + 1 >= count:
break
Comment on lines +148 to +151
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] CLI monitor --count 0 prints one sample instead of zero. When count=0, the first iteration prints the sample, then 0 + 1 >= 0 is True and the loop breaks. A user passing --count 0 would expect no output.

Suggested fix: add type=click.IntRange(min=1) to the --count option, or check before entering the loop:

if count <= 0:
    return

AI-generated, human reviewed

Comment on lines +78 to +151
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] CLI command execution is untested; only registration is verified. Neither test_cli_interface nor test_all_expected_commands_registered invokes any CLI command. The Click command functions contain logic (JSON parsing in publish_cmd, -H option handling in create_topic_cmd, output formatting) that is never exercised.

Suggested fix: add tests using Click's CliRunner to invoke at least create-topic, publish, and read commands and assert their output.

AI-generated, human reviewed


def cli(self):
"""Build and return the Click command group for this driver."""

@driver_click_group(self)
def base():
"""DDS pub/sub communication"""
pass

self._register_lifecycle_commands(base)
self._register_data_commands(base)
return base
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from __future__ import annotations

from enum import Enum
from typing import Any

from pydantic import BaseModel, Field, model_validator


class DdsReliability(str, Enum):
"""DDS reliability QoS."""

BEST_EFFORT = "BEST_EFFORT"
RELIABLE = "RELIABLE"


class DdsDurability(str, Enum):
"""DDS durability QoS."""

VOLATILE = "VOLATILE"
TRANSIENT_LOCAL = "TRANSIENT_LOCAL"
TRANSIENT = "TRANSIENT"
PERSISTENT = "PERSISTENT"


class DdsTopicQos(BaseModel):
"""Quality of Service settings for a DDS topic."""

reliability: DdsReliability = DdsReliability.RELIABLE
durability: DdsDurability = DdsDurability.VOLATILE
history_depth: int = Field(10, ge=1)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[LOW] history_depth has a ge=1 constraint but no upper bound. An authenticated client could call create_topic("t", ["f"], history_depth=2_000_000_000), causing a very large internal buffer allocation.

Suggested fix: add a reasonable upper bound, e.g. history_depth: int = Field(10, ge=1, le=10000).

AI-generated, human reviewed


class DdsParticipantInfo(BaseModel):
"""Information about the DDS domain participant."""

domain_id: int
topic_count: int = 0
is_connected: bool = False


class DdsTopicInfo(BaseModel):
"""Information about a registered DDS topic."""

name: str
fields: list[str] = []
qos: DdsTopicQos = DdsTopicQos()
sample_count: int = 0


class DdsSample(BaseModel):
"""A single DDS data sample."""

topic_name: str
data: dict[str, Any]
timestamp: float = 0.0


class DdsPublishResult(BaseModel):
"""Result of a publish operation.

Publish failures always raise exceptions; this model is only
returned on success.
"""

topic_name: str
samples_written: int = 0


class DdsReadResult(BaseModel):
"""Result of a read/take operation."""

topic_name: str
samples: list[DdsSample] = []
sample_count: int = 0

@model_validator(mode="after")
def _validate_sample_count(self) -> DdsReadResult:
"""Ensure sample_count matches the actual number of samples."""
if self.sample_count != len(self.samples):
raise ValueError(f"sample_count ({self.sample_count}) does not match len(samples) ({len(self.samples)})")
return self
Loading