-
Notifications
You must be signed in to change notification settings - Fork 19
feat: add DDS driver using Eclipse CycloneDDS #393
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a1cf185
55c0e63
60c25ae
f573806
873b3d2
4280d73
b675791
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| ../../../../../packages/jumpstarter-driver-dds/README.md |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| __pycache__/ | ||
| .coverage | ||
| coverage.xml |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| # DDS Driver | ||
|
|
||
| `jumpstarter-driver-dds` provides DDS (Data Distribution Service) publish/subscribe | ||
| communication for Jumpstarter using [Eclipse CycloneDDS](https://cyclonedds.io/). | ||
|
|
||
| DDS is a middleware protocol standard (OMG DDS) for data-centric connectivity, widely used | ||
| in automotive (AUTOSAR Adaptive), robotics (ROS 2), and IoT applications. This driver | ||
| enables remote DDS domain participation, topic management, and pub/sub messaging. | ||
|
|
||
| ## Installation | ||
|
|
||
| ```shell | ||
| pip3 install --extra-index-url https://pkg.jumpstarter.dev/simple/ jumpstarter-driver-dds | ||
| ``` | ||
|
|
||
| ## Configuration | ||
|
|
||
| | Parameter | Type | Default | Description | | ||
| |------------------------|--------|--------------|------------------------------------------| | ||
| | `domain_id` | int | 0 | DDS domain ID | | ||
| | `default_reliability` | str | `RELIABLE` | Default QoS reliability (`RELIABLE` or `BEST_EFFORT`) | | ||
| | `default_durability` | str | `VOLATILE` | Default QoS durability (`VOLATILE`, `TRANSIENT_LOCAL`, `TRANSIENT`, `PERSISTENT`) | | ||
| | `default_history_depth`| int | 10 | Default history depth for topics | | ||
|
|
||
| ### Example exporter configuration | ||
|
|
||
| ```yaml | ||
| export: | ||
| dds: | ||
| type: jumpstarter_driver_dds.driver.Dds | ||
| config: | ||
| domain_id: 0 | ||
| default_reliability: RELIABLE | ||
| default_durability: VOLATILE | ||
| default_history_depth: 10 | ||
| ``` | ||
|
|
||
| ## Client API | ||
|
|
||
| ### Domain Lifecycle | ||
|
|
||
| | Method | Description | | ||
| |--------------------------|----------------------------------------------| | ||
| | `connect()` | Connect to the DDS domain, create participant | | ||
| | `disconnect()` | Disconnect and release all resources | | ||
| | `get_participant_info()` | Get domain participant information | | ||
|
|
||
| ### Topic Management | ||
|
|
||
| | Method | Description | | ||
| |--------------------------------------------------------|------------------------------------| | ||
| | `create_topic(name, fields, reliability, durability, history_depth)` | Create a topic with schema and QoS | | ||
| | `list_topics()` | List all registered topics | | ||
|
|
||
| ### Publish / Subscribe | ||
|
|
||
| | Method | Description | | ||
| |-------------------------------------|------------------------------------------| | ||
| | `publish(topic_name, data)` | Publish a data sample to a topic | | ||
| | `read(topic_name, max_samples=10)` | Read (take) samples from a topic | | ||
| | `monitor(topic_name)` | Stream samples from a topic as they arrive | | ||
|
|
||
| ### QoS Options | ||
|
|
||
| **Reliability:** | ||
| - `RELIABLE` -- Guaranteed delivery with acknowledgment | ||
| - `BEST_EFFORT` -- Fire-and-forget, lowest latency | ||
|
|
||
| **Durability:** | ||
| - `VOLATILE` -- Samples only delivered to currently connected readers | ||
| - `TRANSIENT_LOCAL` -- Late-joining readers receive cached samples | ||
| - `TRANSIENT` -- Samples survive writer restarts | ||
| - `PERSISTENT` -- Samples survive system restarts | ||
|
|
||
| ## Example Usage | ||
|
|
||
| ```python | ||
| from jumpstarter.common.utils import env | ||
|
|
||
| with env() as client: | ||
| dds = client.dds | ||
|
|
||
| # Connect to DDS domain | ||
| dds.connect() | ||
|
|
||
| # Create a topic with schema | ||
| dds.create_topic("sensor/temperature", ["value", "unit", "location"]) | ||
|
|
||
| # Publish data | ||
| dds.publish("sensor/temperature", { | ||
| "value": "22.5", | ||
| "unit": "C", | ||
| "location": "lab1", | ||
| }) | ||
|
|
||
| # Read samples | ||
| result = dds.read("sensor/temperature") | ||
| for sample in result.samples: | ||
| print(f"Temp: {sample.data['value']} {sample.data['unit']}") | ||
|
|
||
| # Stream samples (bounded read -- monitor is infinite by default) | ||
| for i, sample in enumerate(dds.monitor("sensor/temperature")): | ||
| print(f"Live: {sample.data}") | ||
| if i + 1 >= 10: | ||
| break | ||
|
|
||
| dds.disconnect() | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| apiVersion: jumpstarter.dev/v1alpha1 | ||
| kind: ExporterConfig | ||
| metadata: | ||
| namespace: default | ||
| name: dds-exporter | ||
| endpoint: "" | ||
| token: "" | ||
| export: | ||
| dds: | ||
| type: jumpstarter_driver_dds.driver.Dds | ||
| config: | ||
| domain_id: 0 | ||
| default_reliability: RELIABLE | ||
| default_durability: VOLATILE | ||
| default_history_depth: 10 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,163 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| from collections.abc import Generator | ||
| from dataclasses import dataclass | ||
| from typing import Any | ||
|
|
||
| import click | ||
|
|
||
| from .common import ( | ||
| DdsParticipantInfo, | ||
| DdsPublishResult, | ||
| DdsReadResult, | ||
| DdsSample, | ||
| DdsTopicInfo, | ||
| ) | ||
| from jumpstarter.client import DriverClient | ||
| from jumpstarter.client.decorators import driver_click_group | ||
|
|
||
|
|
||
| @dataclass(kw_only=True) | ||
| class DdsClient(DriverClient): | ||
| """Client interface for DDS (Data Distribution Service). | ||
|
|
||
| Provides methods to manage DDS domain participation, create topics, | ||
| publish and subscribe to data, and monitor topic streams via the | ||
| Jumpstarter remoting layer. | ||
| """ | ||
|
|
||
| def connect(self) -> DdsParticipantInfo: | ||
| """Connect to the DDS domain and create a domain participant.""" | ||
| return DdsParticipantInfo.model_validate(self.call("connect")) | ||
|
|
||
| def disconnect(self) -> None: | ||
| """Disconnect from the DDS domain.""" | ||
| self.call("disconnect") | ||
|
|
||
| def create_topic( | ||
| self, | ||
| name: str, | ||
| fields: list[str], | ||
| reliability: str | None = None, | ||
| durability: str | None = None, | ||
| history_depth: int | None = None, | ||
| ) -> DdsTopicInfo: | ||
| """Create a DDS topic with the given schema and QoS.""" | ||
| return DdsTopicInfo.model_validate( | ||
| self.call("create_topic", name, fields, reliability, durability, history_depth) | ||
| ) | ||
|
|
||
| def list_topics(self) -> list[DdsTopicInfo]: | ||
| """List all registered topics.""" | ||
| raw = self.call("list_topics") | ||
| if not isinstance(raw, list): | ||
| raise TypeError(f"Expected list from list_topics(), got {type(raw).__name__}") | ||
| return [DdsTopicInfo.model_validate(t) for t in raw] | ||
|
|
||
| def publish(self, topic_name: str, data: dict[str, Any]) -> DdsPublishResult: | ||
| """Publish a data sample to a DDS topic.""" | ||
| return DdsPublishResult.model_validate(self.call("publish", topic_name, data)) | ||
|
|
||
| def read(self, topic_name: str, max_samples: int = 10) -> DdsReadResult: | ||
| """Read (take) samples from a DDS topic.""" | ||
| return DdsReadResult.model_validate(self.call("read", topic_name, max_samples)) | ||
|
|
||
| def get_participant_info(self) -> DdsParticipantInfo: | ||
| """Get information about the DDS domain participant.""" | ||
| return DdsParticipantInfo.model_validate(self.call("get_participant_info")) | ||
|
|
||
| def monitor(self, topic_name: str) -> Generator[DdsSample, None, None]: | ||
| """Stream data samples from a topic as they arrive.""" | ||
| for v in self.streamingcall("monitor", topic_name): | ||
| yield DdsSample.model_validate(v) | ||
|
Comment on lines
+70
to
+73
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [MEDIUM] Suggested fix: add def monitor(self, topic_name: str, max_iterations: int = 0) -> Generator[DdsSample, None, None]:
for v in self.streamingcall("monitor", topic_name, max_iterations):
yield DdsSample.model_validate(v)AI-generated, human reviewed |
||
|
|
||
| def _register_lifecycle_commands(self, base): | ||
| """Register connect, disconnect, topics, and info CLI commands.""" | ||
|
|
||
| @base.command(name="connect") | ||
| def connect_cmd(): | ||
| """Connect to DDS domain""" | ||
| info = self.connect() | ||
| click.echo(f"Connected to DDS domain {info.domain_id}") | ||
|
|
||
| @base.command(name="disconnect") | ||
| def disconnect_cmd(): | ||
| """Disconnect from DDS domain""" | ||
| self.disconnect() | ||
| click.echo("Disconnected from DDS domain") | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| @base.command() | ||
| def topics(): | ||
| """List registered topics""" | ||
| topic_list = self.list_topics() | ||
| if not topic_list: | ||
| click.echo("No topics registered") | ||
| return | ||
| for t in topic_list: | ||
| click.echo( | ||
| f" {t.name}: fields={t.fields} reliability={t.qos.reliability.value} samples={t.sample_count}" | ||
| ) | ||
|
|
||
| @base.command() | ||
| def info(): | ||
| """Show DDS participant info""" | ||
| pinfo = self.get_participant_info() | ||
| click.echo(f"Domain ID: {pinfo.domain_id}") | ||
| click.echo(f"Connected: {pinfo.is_connected}") | ||
| click.echo(f"Topic count: {pinfo.topic_count}") | ||
|
|
||
| def _register_data_commands(self, base): | ||
raballew marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Register create-topic, publish, read, and monitor CLI commands.""" | ||
|
|
||
| @base.command(name="create-topic") | ||
| @click.argument("name") | ||
| @click.argument("fields", nargs=-1, required=True) | ||
| @click.option("--reliability", "-r", default=None, help="QoS reliability") | ||
| @click.option("--durability", "-d", default=None, help="QoS durability") | ||
| @click.option("--history-depth", "-H", default=None, type=int, help="History depth") | ||
| def create_topic_cmd(name, fields, reliability, durability, history_depth): | ||
| """Create a topic: create-topic NAME FIELD1 FIELD2 ...""" | ||
| topic = self.create_topic(name, list(fields), reliability, durability, history_depth) | ||
| click.echo(f"Created topic '{topic.name}' with fields {topic.fields}") | ||
|
|
||
| @base.command(name="publish") | ||
| @click.argument("topic_name") | ||
| @click.argument("data_json") | ||
| def publish_cmd(topic_name, data_json): | ||
| """Publish JSON data to a topic: publish TOPIC '{"key": "val"}'""" | ||
| data = json.loads(data_json) | ||
| result = self.publish(topic_name, data) | ||
| click.echo(f"Published {result.samples_written} sample(s) to {topic_name}") | ||
|
|
||
| @base.command(name="read") | ||
| @click.argument("topic_name") | ||
| @click.option("--max-samples", "-n", default=10, help="Max samples to read") | ||
| def read_cmd(topic_name, max_samples): | ||
| """Read samples from a topic""" | ||
| result = self.read(topic_name, max_samples) | ||
| click.echo(f"Read {result.sample_count} samples from {topic_name}:") | ||
| for s in result.samples: | ||
| click.echo(f" {s.data}") | ||
|
|
||
| @base.command(name="monitor") | ||
| @click.argument("topic_name") | ||
| @click.option("--count", "-n", default=10, help="Number of events") | ||
raballew marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| def monitor_cmd(topic_name, count): | ||
| """Monitor samples from a topic""" | ||
| for i, sample in enumerate(self.monitor(topic_name)): | ||
| click.echo(f"[{sample.topic_name}] {sample.data}") | ||
| if i + 1 >= count: | ||
| break | ||
|
Comment on lines
+148
to
+151
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [MEDIUM] CLI Suggested fix: add if count <= 0:
returnAI-generated, human reviewed
Comment on lines
+78
to
+151
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [MEDIUM] CLI command execution is untested; only registration is verified. Neither Suggested fix: add tests using Click's AI-generated, human reviewed |
||
|
|
||
| def cli(self): | ||
| """Build and return the Click command group for this driver.""" | ||
|
|
||
| @driver_click_group(self) | ||
| def base(): | ||
| """DDS pub/sub communication""" | ||
| pass | ||
|
|
||
| self._register_lifecycle_commands(base) | ||
| self._register_data_commands(base) | ||
| return base | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from enum import Enum | ||
| from typing import Any | ||
|
|
||
| from pydantic import BaseModel, Field, model_validator | ||
|
|
||
|
|
||
| class DdsReliability(str, Enum): | ||
| """DDS reliability QoS.""" | ||
|
|
||
| BEST_EFFORT = "BEST_EFFORT" | ||
| RELIABLE = "RELIABLE" | ||
|
|
||
|
|
||
| class DdsDurability(str, Enum): | ||
| """DDS durability QoS.""" | ||
|
|
||
| VOLATILE = "VOLATILE" | ||
| TRANSIENT_LOCAL = "TRANSIENT_LOCAL" | ||
| TRANSIENT = "TRANSIENT" | ||
| PERSISTENT = "PERSISTENT" | ||
|
|
||
|
|
||
| class DdsTopicQos(BaseModel): | ||
| """Quality of Service settings for a DDS topic.""" | ||
|
|
||
| reliability: DdsReliability = DdsReliability.RELIABLE | ||
| durability: DdsDurability = DdsDurability.VOLATILE | ||
| history_depth: int = Field(10, ge=1) | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [LOW] Suggested fix: add a reasonable upper bound, e.g. AI-generated, human reviewed |
||
|
|
||
| class DdsParticipantInfo(BaseModel): | ||
| """Information about the DDS domain participant.""" | ||
|
|
||
| domain_id: int | ||
| topic_count: int = 0 | ||
| is_connected: bool = False | ||
|
|
||
|
|
||
| class DdsTopicInfo(BaseModel): | ||
| """Information about a registered DDS topic.""" | ||
|
|
||
| name: str | ||
| fields: list[str] = [] | ||
| qos: DdsTopicQos = DdsTopicQos() | ||
| sample_count: int = 0 | ||
|
|
||
|
|
||
| class DdsSample(BaseModel): | ||
| """A single DDS data sample.""" | ||
|
|
||
| topic_name: str | ||
| data: dict[str, Any] | ||
| timestamp: float = 0.0 | ||
|
|
||
|
|
||
| class DdsPublishResult(BaseModel): | ||
| """Result of a publish operation. | ||
|
|
||
| Publish failures always raise exceptions; this model is only | ||
| returned on success. | ||
| """ | ||
|
|
||
| topic_name: str | ||
| samples_written: int = 0 | ||
|
|
||
|
|
||
| class DdsReadResult(BaseModel): | ||
| """Result of a read/take operation.""" | ||
|
|
||
| topic_name: str | ||
| samples: list[DdsSample] = [] | ||
| sample_count: int = 0 | ||
raballew marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| @model_validator(mode="after") | ||
| def _validate_sample_count(self) -> DdsReadResult: | ||
| """Ensure sample_count matches the actual number of samples.""" | ||
| if self.sample_count != len(self.samples): | ||
| raise ValueError(f"sample_count ({self.sample_count}) does not match len(samples) ({len(self.samples)})") | ||
| return self | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passing
name, fields, reliability, durability, history_depthas positional args toself.call()is fragile. If the server-side method signature changes parameter order, this silently breaks. Using keyword arguments would be more resilient across the gRPC remoting layer.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Valid point, though a change here would mean a framework change as DriverClient.call() has signature call(method, *args) — positional-only is the project-wide convention used by all drivers. This would need to support kwargs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets document this as a future enhancement issue and move one