diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 757b8f4..3fa0410 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -35,7 +35,28 @@ jobs:
- name: Run unit tests
run: |
- python -m unittest discover -s tests/unittest -t . -p "test*.py" -v
+ python -m unittest discover -s tests/unittest -t . -p "test*.py" --ignore-patterns "test_default_tools.py" -v
+
+ ros2_unittests:
+ name: ROS 2 unit tests (default tools)
+ runs-on: ubuntu-24.04
+ container:
+ image: eprosima/vulcanexus:kilted-desktop
+
+ steps:
+ - name: Sync repository
+ uses: eProsima/eProsima-CI/external/checkout@v0
+
+ - name: Install VulcanAI library
+ run: |
+ python3 -m pip install -U pip --break-system-packages
+ python3 -m pip install -e .[test] --break-system-packages
+
+ - name: Run ROS 2 default tools tests
+ shell: bash
+ run: |
+ source /opt/ros/jazzy/setup.bash
+ python3 -m unittest discover -s tests/unittest -t . -p "test_default_tools.py" -v
integration:
name: Integration tests (pytest)
diff --git a/pyproject.toml b/pyproject.toml
index a7ae25c..ee51a38 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -56,3 +56,6 @@ select = ["E", "F", "I"]
[tool.ruff.lint.isort]
known-first-party = ["vulcanai"]
+
+[project.entry-points."ros2_default_tools"]
+default_tools = "vulcanai.tools.default_tools"
diff --git a/src/vulcanai/console/console.py b/src/vulcanai/console/console.py
index b333a5a..e1d76f3 100644
--- a/src/vulcanai/console/console.py
+++ b/src/vulcanai/console/console.py
@@ -50,6 +50,7 @@ class VulcanConsole(App):
# Two panels: left (log + input) and right (history + variables)
# Right panel: 48 characters length
# Left panel: fills remaining space
+
CSS = """
Screen {
layout: horizontal;
@@ -74,6 +75,13 @@ class VulcanConsole(App):
border: tall #333333;
}
+ #streamcontent {
+ height: 0;
+ min-height: 0;
+ border: solid #56AA08;
+ display: none;
+ }
+
#llm_spinner {
height: 0;
display: none;
@@ -120,6 +128,8 @@ def __init__(
tools_from_entrypoints: str = "",
user_context: str = "",
main_node=None,
+ default_tools: bool = True,
+ debug: bool = False,
):
super().__init__() # Textual lib
@@ -137,10 +147,16 @@ def __init__(
self.model = model
# 'k' value for top_k tools selection
self.k = k
+ # Flag to indicate if default tools should be enabled
+ self.default_tools = default_tools
# Iterative mode
self.iterative = iterative
+ # Enable debug-only logs
+ self.debug_flag = debug
# CustomLogTextArea instance
- self.left_pannel = None
+ self.main_pannel = None
+ # Subprocess output panel
+ self.stream_pannel = None
# Logger instance
self.logger = VulcanAILogger.default()
self.logger.set_textualizer_console(TextualLogSink(self))
@@ -166,6 +182,8 @@ def __init__(
# Streaming task control
self.stream_task = None
+ # Route logger output to subprocess panel when needed.
+ self._route_logs_to_stream_panel = False
# Suggestion index for RadioListModal
self.suggestion_index = -1
self.suggestion_index_changed = threading.Event()
@@ -188,7 +206,8 @@ async def on_mount(self) -> None:
Function called when the console is mounted.
"""
- self.left_pannel = self.query_one("#logcontent", CustomLogTextArea)
+ self.main_pannel = self.query_one("#logcontent", CustomLogTextArea)
+ self.stream_pannel = self.query_one("#streamcontent", CustomLogTextArea)
self.spinner_status = self.query_one("#llm_spinner", SpinnerStatus)
self.hooks = SpinnerHook(self.spinner_status)
@@ -197,7 +216,7 @@ async def on_mount(self) -> None:
sys.stdout = StreamToTextual(self, "stdout")
sys.stderr = StreamToTextual(self, "stderr")
- if self.main_node is not None:
+ if self.main_node is not None or self.default_tools:
attach_ros_logger_to_console(self)
self.loop = asyncio.get_running_loop()
@@ -223,6 +242,9 @@ def compose(self) -> ComposeResult:
with Horizontal():
# Left
with Vertical(id="left"):
+ # Subprocess stream area (hidden by default, shown on-demand)
+ streamcontent = CustomLogTextArea(id="streamcontent")
+ yield streamcontent
# Log Area
logcontent = CustomLogTextArea(id="logcontent")
yield logcontent
@@ -248,7 +270,6 @@ async def bootstrap(self) -> None:
Blocking operations (file I/O) run in executor, non-blocking in event loop.
"""
- # Initialize manager (potentially blocking, run in executor)
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self.init_manager)
@@ -259,6 +280,7 @@ async def bootstrap(self) -> None:
"/edit_tools": self.cmd_edit_tools,
"/change_k": self.cmd_change_k,
"/history": self.cmd_history_index,
+ "/debug": self.cmd_debug,
"/show_history": self.cmd_show_history,
"/clear_history": self.cmd_clear_history,
"/plan": self.cmd_plan,
@@ -278,6 +300,15 @@ async def bootstrap(self) -> None:
except Exception:
pass
+ # Load a default ROS 2 node if default tools are enabled but no node is provided
+ if self.default_tools and self.main_node is None:
+ try:
+ from vulcanai.tools.default_tools import ROS2DefaultToolNode
+
+ self.main_node = ROS2DefaultToolNode()
+ except ImportError:
+ self.logger.log_console("Unable to load ROS 2 default node for default tools.")
+
# -- Register tools (file I/O - run in executor) --
# File paths tools
for tool_file_path in self.register_from_file:
@@ -298,7 +329,8 @@ async def bootstrap(self) -> None:
self.is_ready = True
self.logger.log_console("VulcanAI Interactive Console")
- self.logger.log_console("Use 'Ctrl+Q' to quit.")
+ self.logger.log_console("Clipboard: select text and press F4 to copy. Use Ctrl+V or middle-click to paste.")
+ self.logger.log_console("Use '/exit' or press 'Ctrl+Q' to quit.")
# Activate the terminal input
self.set_input_enabled(True)
@@ -320,6 +352,7 @@ def worker(user_input: str = "") -> None:
self.set_input_enabled(False)
try:
+ bb_before_ids = {k: id(v) for k, v in self.manager.bb.items()}
images = []
# Add the images
@@ -340,13 +373,20 @@ def worker(user_input: str = "") -> None:
# Print the backboard state
bb_ret = result.get("blackboard", None)
- if bb_ret:
- bb_ret = bb_ret.text_snapshot().replace("<", "'").replace(">", "'")
- self.logger.log_console(f"Output of plan: {bb_ret}")
+ if bb_ret and self.debug_flag:
+ plan_ret = result.get("plan", None)
+ updated_keys = self._updated_blackboard_keys(bb_before_ids, bb_ret)
+ if not updated_keys:
+ updated_keys = self._plan_tool_keys(plan_ret)
+ bb_ret_str = self._format_blackboard_subset(bb_ret, updated_keys)
+ self.logger.log_console(f"Output of plan: {bb_ret_str}")
except KeyboardInterrupt:
- self.logger.log_msg("Exiting...")
- return
+ if self.stream_task is None:
+ self.logger.log_msg("Exiting...")
+ else:
+ self.stream_task.cancel() # triggers CancelledError in the task
+ self.stream_task = None
except EOFError:
self.logger.log_msg("Exiting...")
return
@@ -359,6 +399,47 @@ def worker(user_input: str = "") -> None:
# region Utilities
+ def _updated_blackboard_keys(self, before_ids: dict[str, int], bb_after) -> list[str]:
+ """
+ Return blackboard keys that were created or replaced during the latest execution.
+ """
+ updated_keys = []
+ for key, value in bb_after.items():
+ if key not in before_ids or before_ids[key] != id(value):
+ updated_keys.append(key)
+ return updated_keys
+
+ def _plan_tool_keys(self, plan) -> list[str]:
+ """
+ Return ordered unique tool names declared in the main plan path.
+ """
+ if plan is None or not hasattr(plan, "plan"):
+ return []
+
+ seen = set()
+ keys = []
+ for node in getattr(plan, "plan", []):
+ for step in getattr(node, "steps", []):
+ tool_name = getattr(step, "tool", None)
+ if tool_name and tool_name not in seen:
+ seen.add(tool_name)
+ keys.append(tool_name)
+ return keys
+
+ def _format_blackboard_subset(self, bb, keys: list[str]) -> str:
+ """
+ Format a subset of blackboard keys for debug logging in Textual console.
+ """
+ if not keys:
+ return "{}"
+
+ subset = {k: bb.get(k) for k in keys if k in bb}
+ if not subset:
+ return "{}"
+
+ # Keep output on a single line while avoiding Textual tag parsing issues.
+ return repr(subset).replace("<", "'").replace(">", "'")
+
def _apply_history_to_input(self) -> None:
"""
Function used to apply the current history index to the input box.
@@ -447,17 +528,20 @@ async def open_checklist(self, tools_list: list[str], active_tools_num: int) ->
self.logger.log_console(f"Deactivated tool '{tool}'")
@work
- async def open_radiolist(self, option_list: list[str], tool: str = "") -> str:
+ async def open_radiolist(
+ self, option_list: list[str], tool: str = "", category: str = "", input_string: str = ""
+ ) -> str:
"""
Function used to open a RadioList ModalScreen in the console.
Used in the tool suggestion selection, for default tools.
"""
# Create the checklist dialog
- selected = await self.push_screen_wait(RadioListModal(option_list))
+ selected = await self.push_screen_wait(RadioListModal(option_list, category, input_string))
if selected is None:
self.logger.log_tool("Suggestion cancelled", tool_name=tool)
self.suggestion_index = -2
+ self.suggestion_index_changed.set()
return
self.logger.log_tool(f'Selected suggestion: "{option_list[selected]}"', tool_name=tool)
@@ -481,10 +565,12 @@ def cmd_help(self, _) -> None:
" or show the current value if no 'int' is provided\n"
"/history 'int' - Change the history depth or show the current value if no"
" 'int' is provided\n"
+ "/debug 'bool' - Set debug mode to true/false, or show current value if no"
+ " bool is provided\n"
"/show_history - Show the current history\n"
"/clear_history - Clear the history\n"
"/plan - Show the last generated plan\n"
- "/rerun - Rerun the last plan\n"
+ "/rerun 'int' - Rerun the last plan or the specified plan by index\n"
"/bb - Show the last blackboard state\n"
"/clear - Clears the console screen\n"
"/exit - Exit the console\n"
@@ -496,7 +582,7 @@ def cmd_help(self, _) -> None:
"Available keybinds:\n"
"‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾\n"
"F2 - Show this help message\n"
- "F3 - Copy selection area\n"
+ "F4 - Copy selection area\n"
"Ctrl+Q - Exit the console\n"
"Ctrl+L - Clears the console screen\n"
"Ctrl+U - Clears the entire command line input\n"
@@ -558,6 +644,23 @@ def cmd_history_index(self, args) -> None:
# Update right panel info
self._update_variables_panel()
+ def cmd_debug(self, args) -> None:
+ if len(args) == 0:
+ self.logger.log_console(f"Current 'debug' is {self.debug_flag}")
+ return
+
+ if len(args) != 1:
+ self.logger.log_console(f"Usage: /debug 'bool' - Actual 'debug' is {self.debug_flag}")
+ return
+
+ value = args[0].strip().lower()
+ if value not in ("true", "false"):
+ self.logger.log_console(f"Usage: /debug 'bool' - Actual 'debug' is {self.debug_flag}")
+ return
+
+ self.debug_flag = value == "true"
+ self.logger.log_console(f"Set 'debug' to {self.debug_flag}")
+
def cmd_show_history(self, _) -> None:
if not self.manager.history:
self.logger.log_console("No history available.")
@@ -624,12 +727,15 @@ async def _rerun_worker(self, args) -> None:
else:
selected_plan = int(args[0])
if selected_plan < -1:
- self.logger.log_console("Usage: /rerun 'int' [int >= -1].")
+ self.logger.log_console("Usage: /rerun 'int' [int > -1].")
return
if not self.plans_list:
self.logger.log_console("No plan to rerun.")
return
+ elif selected_plan >= len(self.plans_list):
+ self.logger.log_console("Selected Plan index do not exists. selected_plan >= len(executed_plans).")
+ return
self.logger.log_console(f"Rerunning {selected_plan}-th plan...")
@@ -642,7 +748,8 @@ async def _rerun_worker(self, args) -> None:
# UI updates must happen on the app thread:
def apply_result():
self.last_bb = last_bb
- self.logger.log_console(f"Output of rerun: {last_bb_parsed}")
+ if self.debug_flag:
+ self.logger.log_console(f"Output of rerun: {last_bb_parsed}")
self.call_from_thread(apply_result)
@@ -657,7 +764,9 @@ def cmd_blackboard_state(self, _) -> None:
self.logger.log_console("No blackboard available.")
def cmd_clear(self, _) -> None:
- self.left_pannel.clear_console()
+ if self.stream_pannel is not None:
+ self.stream_pannel.clear_console()
+ self.main_pannel.clear_console()
def cmd_quit(self, _) -> None:
self.exit()
@@ -666,6 +775,56 @@ def cmd_quit(self, _) -> None:
# region Logging
+ def show_subprocess_panel(self) -> None:
+ """
+ Show the dedicated subprocess output panel at the top of the main panel.
+ """
+ if self.stream_pannel is None:
+ return
+
+ self.logger.log_tool("Streaming terminal opened. Press Ctrl+C to stop the running process.", color="tool")
+
+ self.stream_pannel.clear_console()
+ self.stream_pannel.display = True
+ self.stream_pannel.styles.height = 12
+ self.stream_pannel.refresh(layout=True)
+ self.refresh(layout=True)
+
+ def change_route_logs(self, value: bool = False) -> None:
+ """
+ Route logger sink output to stream panel.
+
+ value = True -> Stream panel
+ value = False -> Main panel
+ """
+
+ self._route_logs_to_stream_panel = value
+
+ def hide_subprocess_panel(self) -> None:
+ """
+ Hide the subprocess output panel and return space to the main log panel.
+ """
+ if self.stream_pannel is None:
+ return
+
+ self.stream_pannel.display = False
+ self.stream_pannel.styles.height = 0
+ self.stream_pannel.refresh(layout=True)
+ self.refresh(layout=True)
+
+ def add_subprocess_line(self, input: str) -> None:
+ """
+ Write output into the dedicated subprocess panel.
+ """
+ if self.stream_pannel is None:
+ self.add_line(input)
+ return
+
+ lines = input.splitlines()
+ for line in lines:
+ if not self.stream_pannel.append_line(line):
+ self.logger.log_console("Warning: Trying to add an empty subprocess line.")
+
def add_line(self, input: str, color: str = "", subprocess_flag: bool = False) -> None:
"""
Function used to write an input in the VulcanAI terminal.
@@ -679,20 +838,24 @@ def add_line(self, input: str, color: str = "", subprocess_flag: bool = False) -
color_begin = f"<{color}>"
color_end = f"{color}>"
+ target_panel = self.main_pannel
+ if self._route_logs_to_stream_panel and self.stream_pannel is not None and self.stream_pannel.display:
+ target_panel = self.stream_pannel
+
# Append each line; deque automatically truncates old ones
for line in lines:
line_processed = line
if subprocess_flag:
line_processed = escape(line)
text = f"{color_begin}{line_processed}{color_end}"
- if not self.left_pannel.append_line(text):
+ if not target_panel.append_line(text):
self.logger.log_console("Warning: Trying to add an empty line.")
def delete_last_line(self):
"""
Function used to remove the last line in the VulcanAI terminal.
"""
- self.left_pannel.delete_last_row()
+ self.main_pannel.delete_last_row()
# endregion
@@ -961,7 +1124,7 @@ async def on_key(self, event: events.Key) -> None:
def set_stream_task(self, input_stream):
"""
Function used in the tools to set the current streaming task.
- with this variable the user can finish the execution of the
+ With this variable the user can finish the execution of the
task by using the signal "Ctrl + C"
"""
self.stream_task = input_stream
@@ -1023,6 +1186,15 @@ def action_stop_streaming_task(self) -> None:
# Cancel the streaming task
self.stream_task.cancel() # Triggers CancelledError in the task
self.stream_task = None
+ # Close popup terminal explicitly on user Ctrl+C.
+ self.change_route_logs(False)
+ self.hide_subprocess_panel()
+
+ elif self.stream_pannel is not None and self.stream_pannel.display:
+ # No active stream, but popup is still visible (e.g. stopped by limits).
+ # Let Ctrl+C close it explicitly.
+ self.change_route_logs(False)
+ self.hide_subprocess_panel()
else:
# No streaming task running, just notify the user
@@ -1088,7 +1260,7 @@ def init_manager(self) -> None:
self.logger.log_console(f"Initializing Manager '{ConsoleManager.__name__}'...")
- self.manager = ConsoleManager(model=self.model, k=self.k, logger=self.logger)
+ self.manager = ConsoleManager(model=self.model, k=self.k, logger=self.logger, default_tools=self.default_tools)
self.logger.log_console(f"Manager initialized with model '{self.model.replace('ollama-', '')}'")
# Update right panel info
@@ -1133,6 +1305,7 @@ def main() -> None:
parser.add_argument(
"-i", "--iterative", action="store_true", default=False, help="Enable Iterative Manager (default: off)"
)
+ parser.add_argument("--debug", action="store_true", default=False, help="Enable debug logs (default: off)")
args = parser.parse_args()
@@ -1142,6 +1315,7 @@ def main() -> None:
model=args.model,
k=args.k,
iterative=args.iterative,
+ debug=args.debug,
)
console.run_console()
diff --git a/src/vulcanai/console/logger.py b/src/vulcanai/console/logger.py
index a6b6f88..d3ce661 100644
--- a/src/vulcanai/console/logger.py
+++ b/src/vulcanai/console/logger.py
@@ -45,8 +45,8 @@ class VulcanAILogger:
"executor": "#15B606",
"vulcanai": "#56AA08",
"user": "#91DD16",
- "validator": "#C49C00",
- "tool": "#EB921E",
+ "validator": "#9600C4",
+ "tool": "#C49C00",
"error": "#FF0000",
"console": "#8F6296",
"warning": "#D8C412",
@@ -107,6 +107,20 @@ def parse_rich_markup(self, msg: str) -> str:
msg = re.sub(r"<(/?)(#[0-9a-fA-F]{6})>", r"[\1\2]", msg)
return msg
+ def _apply_color_to_each_line(self, msg: str, color_begin: str, color_end: str) -> str:
+ """Wrap each logical line with color tags, preserving original line breaks."""
+ if msg == "":
+ return f"{color_begin}{msg}{color_end}"
+
+ colored_lines = []
+ for line in msg.splitlines(keepends=True):
+ # Keep newlines outside style tags so each line has a complete ... pair.
+ line_content = line.rstrip("\r\n")
+ line_break = line[len(line_content) :]
+ colored_lines.append(f"{color_begin}{line_content}{color_end}{line_break}")
+
+ return "".join(colored_lines)
+
def process_msg(self, msg: str, prefix: str = "", color: str = "") -> str:
"""Process the message by adding prefix, applying color formatting and rich markup if enabled."""
color_begin = color_end = color
@@ -116,7 +130,9 @@ def process_msg(self, msg: str, prefix: str = "", color: str = "") -> str:
color_begin = f"<{color}>"
color_end = f"{color}>"
- msg = f"{prefix}{color_begin}{msg}{color_end}"
+ msg = self._apply_color_to_each_line(msg, color_begin, color_end)
+
+ msg = f"{prefix}{msg}"
return self.parse_rich_markup((self.parse_color(msg)))
@@ -158,6 +174,8 @@ def log_tool(self, msg: str, tool_name: str = "", error: bool = False, color: st
processed_msg = self.process_msg(msg, prefix=prefix, color=color)
self.sink.write(processed_msg)
+ return processed_msg
+
def log_registry(self, msg: str, error: bool = False, color: str = ""):
if error:
prefix = "[error][REGISTRY] [ERROR][/error] "
diff --git a/src/vulcanai/console/modal_screens.py b/src/vulcanai/console/modal_screens.py
index 829ad13..81bcb3b 100644
--- a/src/vulcanai/console/modal_screens.py
+++ b/src/vulcanai/console/modal_screens.py
@@ -15,6 +15,7 @@
from textual import events
from textual.app import ComposeResult
from textual.containers import Container, Horizontal, Vertical, VerticalScroll
+from textual.content import Content
from textual.screen import ModalScreen
from textual.widgets import Button, Checkbox, Input, Label, RadioButton, RadioSet
@@ -170,9 +171,16 @@ class CheckListModal(ModalScreen[list[str] | None]):
}
.btns {
- height: 3; /* give buttons row a fixed height */
- padding-top: 1;
- content-align: right middle;
+ height: auto;
+ width: 100%;
+ margin-top: 1;
+ padding: 0;
+ content-align: center middle;
+ align-horizontal: center;
+ }
+
+ .btns Button {
+ padding: 0 3;
}
"""
@@ -209,6 +217,18 @@ def on_mount(self) -> None:
class RadioListModal(ModalScreen[str | None]):
+ class SquareRadioButton(RadioButton):
+ # BUTTON_INNER = '●'
+ @property
+ def _button(self) -> Content:
+ button_style = self.get_visual_style("toggle--button")
+ symbol = "☒" if self.value else "☐"
+ return Content.assemble(
+ (" ", button_style),
+ (symbol, button_style),
+ (" ", button_style),
+ )
+
CSS = """
RadioListModal {
align: center middle;
@@ -218,7 +238,6 @@ class RadioListModal(ModalScreen[str | None]):
width: 60%;
max-width: 90%;
height: 40%;
- border: round $accent;
padding: 1 2;
background: $panel;
}
@@ -233,26 +252,36 @@ class RadioListModal(ModalScreen[str | None]):
}
.btns {
- height: 3;
- padding-top: 1;
- content-align: right middle;
+ height: auto;
+ width: 100%;
+ margin-top: 1;
+ padding: 0;
+ content-align: center middle;
+ align-horizontal: center;
+ }
+
+ .btns Button {
+ padding: 0 1;
}
"""
- def __init__(self, lines: list[str], default_index: int = 0) -> None:
+ def __init__(self, lines: list[str], category: str = "", input_string: str = "", default_index: int = 0) -> None:
super().__init__()
self.lines = lines
+ self.category = category
+ self.input_string = input_string
self.default_index = default_index
def compose(self) -> ComposeResult:
+ dialog_msg = f"{self.category} '{self.input_string}' does not exist. Choose a suggestion:"
with Vertical(classes="dialog"):
- yield Label("Pick one option", classes="title")
+ yield Label(dialog_msg, classes="title")
# One-select radio list
with VerticalScroll(classes="radio-list"):
with RadioSet(id="radio-set"):
for i, line in enumerate(self.lines):
- yield RadioButton(line, id=f"rb{i}", value=(i == self.default_index))
+ yield self.SquareRadioButton(line, id=f"rb{i}", value=(i == self.default_index))
# Buttons
with Horizontal(classes="btns"):
@@ -260,7 +289,7 @@ def compose(self) -> ComposeResult:
yield Button("Submit", variant="primary", id="submit")
def on_mount(self) -> None:
- first_rb = self.query_one(RadioButton)
+ first_rb = self.query_one(self.SquareRadioButton)
self.set_focus(first_rb)
def on_button_pressed(self, event: Button.Pressed) -> None:
diff --git a/src/vulcanai/console/utils.py b/src/vulcanai/console/utils.py
index a6e72b7..2e1da8b 100644
--- a/src/vulcanai/console/utils.py
+++ b/src/vulcanai/console/utils.py
@@ -1,4 +1,4 @@
-# Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima).
+# Copyright 2026 Proyectos y Sistemas de Mantenimiento SL (eProsima).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,16 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import asyncio
-import difflib
-import heapq
-import subprocess
import sys
import threading
-import time
-
-from textual.markup import escape # To remove potential errors in textual terminal
class StreamToTextual:
@@ -57,10 +49,18 @@ def __init__(self, spinner_status):
self.spinner_status = spinner_status
def on_request_start(self, text="Querying LLM..."):
- self.spinner_status.start(text)
+ app = getattr(self.spinner_status, "app", None)
+ if app is not None and threading.current_thread() is not threading.main_thread():
+ app.call_from_thread(self.spinner_status.start, text)
+ else:
+ self.spinner_status.start(text)
def on_request_end(self):
- self.spinner_status.stop()
+ app = getattr(self.spinner_status, "app", None)
+ if app is not None and threading.current_thread() is not threading.main_thread():
+ app.call_from_thread(self.spinner_status.stop)
+ else:
+ self.spinner_status.stop()
def attach_ros_logger_to_console(console):
@@ -148,204 +148,3 @@ def common_prefix(strings: str) -> str:
common_prefix = tmp
return common_prefix, commands
-
-
-async def run_streaming_cmd_async(
- console, args: list[str], max_duration: float = 60, max_lines: int = 1000, echo: bool = True, tool_name=""
-) -> str:
- # Unpack the command
- cmd, *cmd_args = args
-
- # Create the subprocess
- process = await asyncio.create_subprocess_exec(
- cmd,
- *cmd_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.STDOUT,
- )
-
- assert process.stdout is not None
-
- start_time = time.monotonic()
- line_count = 0
-
- try:
- # Subprocess main loop. Read line by line
- async for raw_line in process.stdout:
- line = raw_line.decode(errors="ignore").rstrip("\n")
-
- # Print the line
- if echo:
- line_processed = escape(line)
- console.add_line(line_processed)
-
- # Count the line
- line_count += 1
- if max_lines is not None and line_count >= max_lines:
- console.logger.log_tool(f"[tool]Stopping:[/tool] Reached max_lines = {max_lines}", tool_name=tool_name)
- console.set_stream_task(None)
- process.terminate()
- break
-
- # Check duration
- if max_duration and (time.monotonic() - start_time) >= max_duration:
- console.logger.log_tool(
- f"[tool]Stopping:[/tool] Exceeded max_duration = {max_duration}s", tool_name=tool_name
- )
- console.set_stream_task(None)
- process.terminate()
- break
-
- except asyncio.CancelledError:
- # Task was cancelled → stop the subprocess
- console.logger.log_tool("[tool]Cancellation received:[/tool] terminating subprocess...", tool_name=tool_name)
- process.terminate()
- raise
- # Not necessary, textual terminal get the keyboard input
- except KeyboardInterrupt:
- # Ctrl+C pressed → stop subprocess
- console.logger.log_tool("[tool]Ctrl+C received:[/tool] terminating subprocess...", tool_name=tool_name)
- process.terminate()
-
- finally:
- try:
- await asyncio.wait_for(process.wait(), timeout=3.0)
- except asyncio.TimeoutError:
- console.logger.log_tool("Subprocess didn't exit in time → killing it.", tool_name=tool_name, error=True)
- process.kill()
- await process.wait()
-
- return "Process stopped due to Ctrl+C"
-
-
-def execute_subprocess(console, tool_name, base_args, max_duration, max_lines):
- stream_task = None
-
- def _launcher() -> None:
- nonlocal stream_task
- # This always runs in the Textual event-loop thread
- loop = asyncio.get_running_loop()
- stream_task = loop.create_task(
- run_streaming_cmd_async(
- console,
- base_args,
- max_duration=max_duration,
- max_lines=max_lines,
- tool_name=tool_name, # tool_header_str
- )
- )
-
- def _on_done(task: asyncio.Task) -> None:
- if task.cancelled():
- # Normal path → don't log as an error
- # If you want a message, call UI methods directly here,
- # not via console.write (see Fix 2)
- return
-
- try:
- task.result()
- except Exception as e:
- console.logger.log_msg(f"Echo task error: {e!r}\n", error=True)
- # result["output"] = False
- return
-
- stream_task.add_done_callback(_on_done)
-
- try:
- # Are we already in the Textual event loop thread?
- asyncio.get_running_loop()
- except RuntimeError:
- # No loop here → probably ROS thread. Bounce into Textual thread.
- console.app.call_from_thread(_launcher)
- else:
- # We *are* in the loop → just launch directly.
- _launcher()
-
- # Store the task in the console to be able to cancel it later
- console.set_stream_task(stream_task)
- console.logger.log_tool("[tool]Subprocess created![tool]", tool_name=tool_name)
-
-
-def run_oneshot_cmd(args: list[str]) -> str:
- try:
- return subprocess.check_output(args, stderr=subprocess.STDOUT, text=True)
-
- except subprocess.CalledProcessError as e:
- raise Exception(f"Failed to run '{' '.join(args)}': {e.output}")
-
-
-def suggest_string(console, tool_name, string_name, input_string, real_string_list):
- ret = None
-
- def _similarity(a: str, b: str) -> float:
- """Return a similarity score between 0 and 1."""
- return difflib.SequenceMatcher(None, a, b).ratio()
-
- def _get_suggestions(real_string_list_comp: list[str], string_comp: str) -> tuple[str, list[str]]:
- """
- Function used to search for the most "similar" string in a list.
-
- Used in ROS2 cli commands to used the "simmilar" in case
- the queried topic does not exists.
-
- Example:
- real_string_list_comp = [
- "/parameter_events",
- "/rosout",
- "/turtle1/cmd_vel",
- "/turtle1/color_sensor",
- "/turtle1/pose",
- ]
- string_comp = "trtle1"
-
- @return
- str: the most "similar" string
- list[str] a sorted list by a similitud value
- """
-
- topic_list_pq = []
- n = len(string_comp)
-
- for string in real_string_list_comp:
- m = len(string)
- # Calculate the similitud
- score = _similarity(string_comp, string)
- # Give more value for the nearest size comparations.
- score -= abs(n - m) * 0.005
- # Max-heap (via negative score)
- heapq.heappush(topic_list_pq, (-score, string))
-
- # Pop ordered list
- ret_list: list[str] = []
- _, most_topic_similar = heapq.heappop(topic_list_pq)
-
- ret_list.append(most_topic_similar)
-
- while topic_list_pq:
- _, topic = heapq.heappop(topic_list_pq)
- ret_list.append(topic)
-
- return most_topic_similar, ret_list
-
- if input_string not in real_string_list:
- console.logger.log_tool(f'{string_name}: "{input_string}" does not exists', tool_name=tool_name)
-
- # Get the suggestions list sorted by similitud value
- _, topic_sim_list = _get_suggestions(real_string_list, input_string)
-
- # Open the ModalScreen
- console.open_radiolist(topic_sim_list, f"{tool_name}")
-
- # Wait for the user to select and item in the
- # RadioList ModalScreen
- console.suggestion_index_changed.wait()
-
- # Check if the user cancelled the suggestion
- if console.suggestion_index >= 0:
- ret = topic_sim_list[console.suggestion_index]
-
- # Reset suggestion index
- console.suggestion_index = -1
- console.suggestion_index_changed.clear()
-
- return ret
diff --git a/src/vulcanai/console/widget_custom_log_text_area.py b/src/vulcanai/console/widget_custom_log_text_area.py
index dd60f4c..4168a12 100644
--- a/src/vulcanai/console/widget_custom_log_text_area.py
+++ b/src/vulcanai/console/widget_custom_log_text_area.py
@@ -21,6 +21,8 @@
from rich.style import Style
from textual.widgets import TextArea
+from vulcanai.console.logger import VulcanAILogger
+
class CustomLogTextArea(TextArea):
"""
@@ -33,7 +35,7 @@ class CustomLogTextArea(TextArea):
"""
BINDINGS = [
- ("f3", "copy_selection", "Copy selection"),
+ ("f4", "copy_selection", "Copy selection"),
]
# Maximum number of lines to keep in the log
@@ -45,7 +47,10 @@ class CustomLogTextArea(TextArea):
TAG_TOKEN_RE = re.compile(r"?[^>]+>")
def __init__(self, **kwargs):
- super().__init__(read_only=True, **kwargs)
+ # Disable the TextArea cursor in this read-only log panel.
+ # This prevents Textual from auto-scrolling to keep cursor/selection visible
+ # every time new text is inserted.
+ super().__init__(read_only=True, show_cursor=False, **kwargs)
# Lock used to avoid data races in 'self._lines_styles'
# when VulcanAI and ROS threads writes at the same time
@@ -68,6 +73,17 @@ def __init__(self, **kwargs):
# region UTILS
+ def is_near_vertical_scroll_end(self, tolerance: int = 1) -> bool:
+ """
+ Return True if the viewport is at, or very close to, the vertical end.
+
+ A small tolerance avoids false negatives after layout changes where
+ scroll position can be off by one line.
+ """
+ if not self.size:
+ return True
+ return (self.max_scroll_y - self.scroll_offset.y) <= max(0, tolerance)
+
def _trim_highlights(self) -> None:
"""
Function used to trim the CustomLogTextArea to the maximum number of lines.
@@ -198,7 +214,7 @@ def _ensure_style_token(self, style: str) -> str:
# Gray color is not supported
if st == "gray":
- color = "#8D8D8D"
+ color = "#A5A2A2"
token_parts.append("hex_" + color[1:])
else:
# Assume named color like "red", "yellow"
@@ -279,6 +295,10 @@ def append_line(self, text: str) -> bool:
# [EXECUTOR] Invoking 'move_turtle' with args: ...
# [ROS] [INFO] Publishing message 1 to ...
with self._lock:
+ # Terminal-like behavior:
+ # keep following output only if the user was already at the bottom.
+ should_follow_output = self.is_near_vertical_scroll_end()
+
# Append via document API to keep row tracking consistent
# Only add a newline before the new line if there is already content
insert_text = ("\n" if self.document.text else "") + plain
@@ -301,8 +321,11 @@ def append_line(self, text: str) -> bool:
# Trim now
self._trim_highlights()
- # Scroll to end
- self.scroll_end(animate=False)
+ # Scroll to end only when the user was already at the bottom.
+ if should_follow_output:
+ self.scroll_end(animate=False, immediate=True, x_axis=False)
+ # Ensure we stay anchored after any pending layout updates.
+ self.call_after_refresh(self.scroll_end, animate=False, immediate=True, x_axis=False)
# Rebuild highlights and refresh
self._rebuild_highlights()
@@ -374,6 +397,12 @@ def action_copy_selection(self) -> None:
self.notify("No text selected to copy!")
return
- # Copy to clipboard, using pyperclip library
- pyperclip.copy(self.selected_text)
- self.notify("Selected area copied to clipboard!")
+ try:
+ # Copy to clipboard, using pyperclip library
+ pyperclip.copy(self.selected_text)
+ self.notify("Selected area copied to clipboard!")
+ except Exception as e:
+ error_color = VulcanAILogger.vulcanai_theme["error"]
+ self.append_line(f"<{error_color}>Clipboard error: {e}{error_color}>")
+ self.notify(f"Clipboard error: {e}")
+ return
diff --git a/src/vulcanai/console/widget_spinner.py b/src/vulcanai/console/widget_spinner.py
index a2019ed..b8c7a40 100644
--- a/src/vulcanai/console/widget_spinner.py
+++ b/src/vulcanai/console/widget_spinner.py
@@ -46,9 +46,14 @@ def _log_is_filling_space(self) -> bool:
"""
visible = max(1, self.logcontent.size.height)
lines = getattr(self.logcontent.document, "line_count", 0)
- return lines > visible
+ return lines >= visible
def start(self, text: str = "Querying LLM...") -> None:
+ # Keep the log anchored at bottom if the user was already following output.
+ if hasattr(self.logcontent, "is_near_vertical_scroll_end"):
+ was_at_bottom = self.logcontent.is_near_vertical_scroll_end()
+ else:
+ was_at_bottom = self.logcontent.is_vertical_scroll_end
self._spinner.text = Text(text, style="#0d87c0")
self.display = True
self.styles.height = 1
@@ -59,10 +64,19 @@ def start(self, text: str = "Querying LLM...") -> None:
else:
self._forced_compact = False
self.refresh(layout=True)
+ if was_at_bottom:
+ self.logcontent.scroll_end(animate=False, immediate=True, x_axis=False)
+ # Re-anchor after layout has settled.
+ self.call_after_refresh(self.logcontent.scroll_end, animate=False, immediate=True, x_axis=False)
self._timer.resume()
def stop(self) -> None:
+ # Keep the log anchored at bottom if the user was already following output.
+ if hasattr(self.logcontent, "is_near_vertical_scroll_end"):
+ was_at_bottom = self.logcontent.is_near_vertical_scroll_end()
+ else:
+ was_at_bottom = self.logcontent.is_vertical_scroll_end
self._timer.pause()
self.display = False
self.styles.height = 0
@@ -71,5 +85,9 @@ def stop(self) -> None:
self._forced_compact = False
self.refresh(layout=True)
self.logcontent.refresh(layout=True)
+ if was_at_bottom:
+ self.logcontent.scroll_end(animate=False, immediate=True, x_axis=False)
+ # Re-anchor after layout has settled.
+ self.call_after_refresh(self.logcontent.scroll_end, animate=False, immediate=True, x_axis=False)
self.update("")
diff --git a/src/vulcanai/core/executor.py b/src/vulcanai/core/executor.py
index 9b3010b..61d13f7 100644
--- a/src/vulcanai/core/executor.py
+++ b/src/vulcanai/core/executor.py
@@ -277,9 +277,9 @@ def _call_tool(
msg += "'{"
for key, value in arg_dict.items():
if first:
- msg += f"[validator]'{key}'[/validator]: " + f"[registry]'{value}'[/registry]"
+ msg += f"[tool]'{key}'[/tool]: " + f"[registry]'{value}'[/registry]"
else:
- msg += f", [validator]'{key}'[/validator]: " + f"[registry]'{value}'[/registry]"
+ msg += f", [tool]'{key}'[/tool]: " + f"[registry]'{value}'[/registry]"
first = False
msg += "}'"
self.logger.log_executor(msg)
@@ -315,15 +315,6 @@ def _call_tool(
+ "with result:"
)
- if isinstance(result, dict):
- for key, value in result.items():
- if key == "ros2":
- continue
- self.logger.log_msg(f"{key}")
- self.logger.log_msg(value)
- else:
- self.logger.log_msg(result)
-
return True, result
except concurrent.futures.TimeoutError:
self.logger.log_executor(
diff --git a/src/vulcanai/core/manager.py b/src/vulcanai/core/manager.py
index 66936d3..43988e3 100644
--- a/src/vulcanai/core/manager.py
+++ b/src/vulcanai/core/manager.py
@@ -33,12 +33,13 @@ def __init__(
k: int = 10,
hist_depth: int = 3,
logger: Optional[VulcanAILogger] = None,
+ default_tools: bool = True,
):
# Logger default to a stdout logger if none is provided (StdoutLogSink)
self.logger = logger or VulcanAILogger.default()
self.llm = Agent(model, logger=self.logger)
self.k = k
- self.registry = registry or ToolRegistry(logger=self.logger)
+ self.registry = registry or ToolRegistry(logger=self.logger, default_tools=default_tools)
self.validator = validator
self.executor = PlanExecutor(self.registry, logger=self.logger)
self.bb = Blackboard()
diff --git a/src/vulcanai/core/manager_iterator.py b/src/vulcanai/core/manager_iterator.py
index 8414792..e940b6d 100644
--- a/src/vulcanai/core/manager_iterator.py
+++ b/src/vulcanai/core/manager_iterator.py
@@ -43,8 +43,9 @@ def __init__(
logger=None,
max_iters: int = 5,
step_timeout_ms: Optional[int] = None,
+ default_tools: bool = True,
):
- super().__init__(model, registry, validator, k, max(3, hist_depth), logger)
+ super().__init__(model, registry, validator, k, max(3, hist_depth), logger, default_tools)
self.iter: int = 0
self.max_iters: int = int(max_iters)
diff --git a/src/vulcanai/core/manager_plan.py b/src/vulcanai/core/manager_plan.py
index 78b3cbb..32bfbdc 100644
--- a/src/vulcanai/core/manager_plan.py
+++ b/src/vulcanai/core/manager_plan.py
@@ -30,8 +30,17 @@ def __init__(
k: int = 5,
hist_depth: int = 3,
logger=None,
+ default_tools=True,
):
- super().__init__(model, registry=registry, validator=validator, k=k, hist_depth=hist_depth, logger=logger)
+ super().__init__(
+ model,
+ registry=registry,
+ validator=validator,
+ k=k,
+ hist_depth=hist_depth,
+ logger=logger,
+ default_tools=default_tools,
+ )
def _get_prompt_template(self) -> str:
"""
diff --git a/src/vulcanai/core/plan_types.py b/src/vulcanai/core/plan_types.py
index bdf915f..72d4fd7 100644
--- a/src/vulcanai/core/plan_types.py
+++ b/src/vulcanai/core/plan_types.py
@@ -89,7 +89,7 @@ def __str__(self) -> str:
lines.append(f"- Plan Summary: {self.summary}\n")
color_tool = VulcanAILogger.vulcanai_theme["executor"]
- color_variable = VulcanAILogger.vulcanai_theme["validator"]
+ color_variable = VulcanAILogger.vulcanai_theme["tool"]
color_value = VulcanAILogger.vulcanai_theme["registry"]
color_error = VulcanAILogger.vulcanai_theme["error"]
diff --git a/src/vulcanai/tools/default_tools.py b/src/vulcanai/tools/default_tools.py
new file mode 100644
index 0000000..5274a69
--- /dev/null
+++ b/src/vulcanai/tools/default_tools.py
@@ -0,0 +1,1151 @@
+# Copyright 2026 Proyectos y Sistemas de Mantenimiento SL (eProsima).
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""
+This file contains the default tools given by VulcanAI.
+It contains atomic tools used to call ROS2 CLI.
+"""
+
+import importlib
+import json
+import threading
+import time
+from concurrent.futures import Future
+
+from vulcanai import AtomicTool, vulcanai_tool
+from vulcanai.tools.utils import execute_subprocess, run_oneshot_cmd, suggest_string, print_tool_output, log_tool_in_stream_and_main
+
+# ROS2 imports
+try:
+ import rclpy
+ from rclpy.node import Node
+ from rclpy.task import Future
+except ImportError:
+ raise ImportError("Unable to load default tools because no ROS 2 installation was found.")
+
+
+# This class contains a ROS 2 node that will be loaded if none is provided to launch ROS 2 default tools
+class ROS2DefaultToolNode(Node):
+ def __init__(self, name: str = "vulcanai_ros2_default_tools_node"):
+ if not rclpy.ok():
+ rclpy.init()
+ super().__init__(name)
+ # Dictionary to store created clients
+ self._vulcan_clients = {}
+ # Dictionary to store created publishers
+ self._vulcan_publishers = {}
+
+ # Ensure entities creation is thread-safe.
+ self.node_lock = threading.Lock()
+
+ def get_client(self, srv_type, srv_name):
+ """
+ Get a cached client for the specified service type and name or
+ create a new one if it doesn't exist.
+ """
+ key = (srv_type, srv_name)
+ with self.node_lock:
+ if key not in self._vulcan_clients:
+ client = self.create_client(srv_type, srv_name)
+ self._vulcan_clients[key] = client
+ self.get_logger().info(f"Created new client for {srv_name}")
+ return self._vulcan_clients[key]
+
+ def get_publisher(self, msg_type, topic_name):
+ """
+ Get a cached publisher for the specified message type and topic name or
+ create a new one if it doesn't exist.
+ """
+ key = (msg_type, topic_name)
+ with self.node_lock:
+ if key not in self._vulcan_publishers:
+ publisher = self.create_publisher(msg_type, topic_name, 10)
+ self._vulcan_publishers[key] = publisher
+ self.get_logger().info(f"Created new publisher for {topic_name}")
+ return self._vulcan_publishers[key]
+
+ def wait_for_message(self, msg_type, topic: str, timeout_sec: float = None):
+ """
+ Block until a message is received or timeout expires.
+ Subscriptions are created on demand and destroyed after use to avoid
+ handling spins and callbacks in a separate thread.
+ """
+ future = Future()
+
+ def callback(msg):
+ if not future.done():
+ future.set_result(msg)
+
+ sub = self.create_subscription(msg_type, topic, callback, 10)
+
+ rclpy.spin_until_future_complete(self, future, timeout_sec=timeout_sec)
+ self.destroy_subscription(sub)
+
+ if future.done():
+ return future.result()
+ return None
+
+
+"""
+Available ROS 2 CLI commands that can be run with the tools in this file:
+
+- ros2 node
+ Commands:
+ info Output information about a node
+ list Output a list of available nodes
+
+- ros2 topic
+ Commands:
+ bw Display bandwidth used by topic
+ delay Display delay of topic from timestamp in header
+ echo Output messages from a topic
+ find Output a list of available topics of a given type
+ hz Print the average receiving rate to screen
+ info Print information about a topic
+ list Output a list of available topics
+ pub Publish a message to a topic
+ type Print a topic's type
+
+- ros2 service
+ Commands:
+ call Call a service
+ echo Echo a service
+ find Output a list of available services of a given type
+ info Print information about a service
+ list Output a list of available services
+ type Output a service's type
+
+- ros2 action
+ Commands:
+ info Print information about an action
+ list Output a list of action names
+ send_goal Send an action goal
+ type Print a action's type
+ echo Echo a action
+ find Find actions from type
+
+
+- ros2 param
+ Commands:
+ delete Delete parameter
+ describe Show descriptive information about declared parameters
+ dump Show all of the parameters of a node in a YAML file format
+ get Get parameter
+ list Output a list of available parameters
+ load Load parameter file for a node
+ set Set parameter
+
+
+- ros2 pkg
+ Commands:
+ executables Output a list of package specific executables
+ list Output a list of available packages
+ prefix Output the prefix path of a package
+ xml Output the XML of the package manifest or a specific ta
+
+
+- ros2 interfaces
+ Commands:
+ list List all interface types available
+ package Output a list of available interface types within one package
+ packages Output a list of packages that provide interfaces
+"""
+
+
+@vulcanai_tool
+class Ros2NodeTool(AtomicTool):
+ name = "ros2_node"
+ description = (
+ "Wrapper for `ros2 node` CLI."
+ "Run any subcommand: 'list', 'info'"
+ "With an optional argument 'node_name' for 'info' subcommand."
+ )
+ tags = ["ros2", "nodes", "cli", "info", "diagnostics"]
+
+ # - `command` lets you pick a single subcommand (list/info).
+ input_schema = [
+ ("command", "string"), # Command
+ ("topic_name", "string?"), # (optional) Topic name. (info/bw/delay/hz/type/pub)
+ ]
+
+ output_schema = {
+ "output": "string", # list of ros2 nodes or info of a node.
+ }
+
+ def run(self, **kwargs):
+ # Used in the suggestion string
+ console = self.bb.get("console", None)
+ if console is None:
+ raise Exception("Could not find console, aborting...")
+
+ command = kwargs.get("command", None) # optional explicit subcommand
+ node_name = kwargs.get("node_name", "")
+
+ result = {
+ "output": "",
+ }
+
+ command = command.lower()
+
+ # -- Node name suggestions --
+ node_name_list_str = run_oneshot_cmd(["ros2", "node", "list"])
+ node_name_list = node_name_list_str.splitlines()
+
+ # -- Run `ros2 node list` ---------------------------------------------
+ if command == "list":
+ result["output"] = node_name_list_str
+
+ # -- Run `ros2 node info ` --------------------------------------
+ else:
+ # Check if the topic is not available ros2 topic list
+ # if it is not create a window for the user to choose a correct topic name
+ suggested_topic = suggest_string(console, self.name, "Node", node_name, node_name_list)
+ if suggested_topic is not None:
+ node_name = suggested_topic
+
+ if not node_name:
+ raise ValueError("`command='{}'` requires `node_name`.".format("info"))
+
+ info_output = run_oneshot_cmd(["ros2", "node", "info", node_name])
+ result["output"] = info_output
+
+ print_tool_output(console, result["output"], self.name)
+ return result
+
+
+@vulcanai_tool
+class Ros2TopicTool(AtomicTool):
+ name = "ros2_topic"
+ description = (
+ "Wrapper for `ros2 topic` CLI."
+ "Run a subcommand from: 'list', 'info', 'find', 'type', 'bw', 'delay', 'hz'."
+ "With optional arguments like 'topic_name', 'message_type', 'max_duration' or 'max_lines'"
+ )
+ tags = ["ros2", "topics", "cli", "info"]
+
+ # - `command` lets you pick a single subcommand (bw/hz/delay/find/pub/type).
+ input_schema = [
+ ("command", "string"), # Command
+ ("topic_name", "string?"), # (optional) Topic name. (info/bw/delay/hz/type/pub)
+ ("msg_type", "string?"), # (optional) Message type (`find` , `pub` )
+ ("max_duration", "number?"), # (optional) Seconds for streaming commands (bw/hz/delay)
+ ("max_lines", "int?"), # (optional) Cap number of lines for streaming commands
+ ]
+
+ output_schema = {
+ "output": "string", # output
+ }
+
+ def run(self, **kwargs):
+ # Used in the suggestion string
+ console = self.bb.get("console", None)
+ if console is None:
+ raise Exception("Could not find console, aborting...")
+
+ command = kwargs.get("command", None) # optional explicit subcommand
+ topic_name = kwargs.get("topic_name", None)
+ msg_type = kwargs.get("msg_type", None)
+ # Streaming commands variables
+ max_duration = kwargs.get("max_duration", None)
+ max_lines = kwargs.get("max_lines", None)
+
+ result = {
+ "output": "",
+ }
+
+ command = command.lower()
+
+ topic_name_list_str = run_oneshot_cmd(["ros2", "topic", "list"])
+ topic_name_list = topic_name_list_str.splitlines()
+
+ # -- Topic name suggestions --
+ if command == "find":
+ # TODO?
+ """suggested_type = suggest_string(console, self.name, "Topic", msg_type, topic_name_list)
+ if suggested_type is not None:
+ msg_type = suggested_type"""
+ elif command != "list":
+ # Check if the topic is not available ros2 topic list
+ # if it is not create a window for the user to choose a correct topic name
+ suggested_topic_name = suggest_string(console, self.name, "Topic", topic_name, topic_name_list)
+ if suggested_topic_name is not None:
+ topic_name = suggested_topic_name
+
+ # Check if the topic_name is null (suggest_string() failed)
+ if not topic_name:
+ raise ValueError("`command='{}'` requires `topic_name`.".format(command))
+
+ # -- Commands --
+ # -- ros2 topic list --------------------------------------------------
+ if command == "list":
+ result["output"] = topic_name_list_str
+
+ # -- ros2 topic info -------------------------------------
+ elif command == "info":
+ info_output = run_oneshot_cmd(["ros2", "topic", "info", topic_name])
+ result["output"] = info_output
+
+ # -- ros2 topic find -------------------------------------------
+ elif command == "find":
+ find_output = run_oneshot_cmd(["ros2", "topic", "find", msg_type])
+ find_topics = [line.strip() for line in find_output.splitlines() if line.strip()]
+ result["output"] = ", ".join(find_topics)
+
+ # -- ros2 topic type -------------------------------------
+ elif command == "type":
+ type_output = run_oneshot_cmd(["ros2", "topic", "type", topic_name])
+ result["output"] = type_output
+
+ # -- ros2 topic bw ---------------------------------------
+ elif command == "bw":
+ base_args = ["ros2", "topic", "bw", topic_name]
+ ret = execute_subprocess(console, self.name, base_args, max_duration, max_lines)
+ result["output"] = ret#last_output_lines(console, self.name, ret, n_lines=10)
+
+ # -- ros2 topic delay ------------------------------------
+ elif command == "delay":
+ base_args = ["ros2", "topic", "delay", topic_name]
+ ret = execute_subprocess(console, self.name, base_args, max_duration, max_lines)
+ result["output"] = ret#last_output_lines(console, self.name, ret, n_lines=10)
+
+ # -- ros2 topic hz ---------------------------------------
+ elif command == "hz":
+ base_args = ["ros2", "topic", "hz", topic_name]
+ ret = execute_subprocess(console, self.name, base_args, max_duration, max_lines)
+ result["output"] = ret#last_output_lines(console, self.name, ret, n_lines=10)
+
+ # -- unknown ----------------------------------------------------------
+ else:
+ raise ValueError(
+ f"Unknown command '{command}'. Expected one of: list, info, echo, bw, delay, hz, find, pub, type."
+ )
+
+ print_tool_output(console, result["output"], self.name)
+ return result
+
+
+@vulcanai_tool
+class Ros2ServiceTool(AtomicTool):
+ name = "ros2_service"
+ description = (
+ "Wrapper for `ros2 service` CLI."
+ "Run any subcommand: 'list', 'info', 'type', 'find', 'call', 'echo'."
+ "With optional arguments like 'service_name', 'service_type', "
+ "'call', 'args', 'max_duration' or 'max_lines'"
+ )
+ tags = ["ros2", "services", "cli", "info", "call"]
+
+ # - `command` = "list", "info", "type", "call", "echo", "find"
+ input_schema = [
+ ("command", "string"), # Command
+ ("service_name", "string?"), # (optional) Service name. "info", "type", "call", "echo"
+ ("service_type", "string?"), # (optional) Service type. "find", "call"
+ ("call", "bool?"), # (optional) backwards-compatible call flag
+ ("args", "string?"), # (optional) YAML/JSON-like request data for `call`
+ ("max_duration", "number?"), # (optional) Maximum duration
+ ("max_lines", "int?"), # (optional) Maximum lines
+ ]
+
+ output_schema = {
+ "output": "string", # `ros2 service list`
+ }
+
+ def run(self, **kwargs):
+ # Used in the suggestion string
+ console = self.bb.get("console", None)
+ if console is None:
+ raise Exception("Could not find console, aborting...")
+
+ command = kwargs.get("command", None)
+ service_name = kwargs.get("service_name", None)
+ service_type = kwargs.get("service_type", None)
+ call_args = kwargs.get("args", None)
+ # Streaming commands variables
+ max_duration = kwargs.get("max_duration", None)
+ max_lines = kwargs.get("max_lines", None)
+
+ result = {
+ "output": "",
+ }
+
+ command = command.lower()
+
+ service_name_list_str = run_oneshot_cmd(["ros2", "service", "list"])
+ service_name_list = service_name_list_str.splitlines()
+
+ # -- Service name suggestions --
+ if command == "find":
+ # TODO?
+ """suggested_type = suggest_string(console, self.name, "Service_Type", service_type, service_name_list)
+ if suggested_type is not None:
+ service_type = suggested_type"""
+
+ elif command != "list":
+ # Check if the topic is not available ros2 topic list
+ # if it is not create a window for the user to choose a correct topic name
+ suggested_service_name = suggest_string(console, self.name, "Service", service_name, service_name_list)
+ if suggested_service_name is not None:
+ service_name = suggested_service_name
+
+ # Check if the service_name is null (suggest_string() failed)
+ if not service_name:
+ raise ValueError("`command='{}'` requires `service_name`.".format(command))
+
+ # -- ros2 service list ------------------------------------------------
+ if command == "list":
+ result["output"] = service_name_list_str
+
+ # -- ros2 service info ---------------------------------
+ elif command == "info":
+ info_output = run_oneshot_cmd(["ros2", "service", "info", service_name])
+ result["output"] = info_output
+
+ # -- ros2 service type ---------------------------------
+ elif command == "type":
+ type_output = run_oneshot_cmd(["ros2", "service", "type", service_name])
+ result["output"] = type_output.strip()
+
+ # -- ros2 service find -----------------------------------------
+ elif command == "find":
+ find_output = run_oneshot_cmd(["ros2", "service", "find", service_type])
+ result["output"] = find_output
+
+ # -- ros2 service call service_name service_type ----------------------
+ elif command == "call":
+ if call_args is None:
+ raise ValueError("`command='call'` requires `args`.")
+
+ # If service_type not given, detect it
+ if not service_type:
+ type_output = run_oneshot_cmd(["ros2", "service", "type", service_name])
+ service_type = type_output.strip()
+
+ call_output = run_oneshot_cmd(["ros2", "service", "call", service_name, service_type, call_args])
+ result["output"] = call_output
+
+ # -- ros2 service echo service_name -----------------------------------
+ elif command == "echo":
+ base_args = ["ros2", "service", "echo", service_name]
+ ret = execute_subprocess(console, self.name, base_args, max_duration, max_lines)
+ result["output"] = ret#last_output_lines(console, self.name, ret, n_lines=10)
+
+ # -- unknown ------------------------------------------------------------
+ else:
+ raise ValueError(f"Unknown command '{command}'. Expected one of: list, info, type, call, echo, find.")
+
+ print_tool_output(console, result["output"], self.name)
+ return result
+
+
+@vulcanai_tool
+class Ros2ActionTool(AtomicTool):
+ name = "ros2_action"
+ description = (
+ "Wrapper for `ros2 action` CLI."
+ "Run any subcommand: 'list', 'info', 'type', 'send_goal'."
+ "With optional arguments like 'action_name', 'action_type', "
+ "'goal_args'"
+ )
+ tags = ["ros2", "actions", "cli", "info", "goal"]
+
+ # - `command`: "list", "info", "type", "send_goal"
+ input_schema = [
+ ("command", "string"), # Command
+ ("action_name", "string?"), # (optional) Action name
+ ("action_type", "string?"), # (optional) Action type. "find"
+ ("send_goal", "bool?"), # (optional) legacy flag (backwards compatible)
+ ("goal_args", "string?"), # (optional) goal YAML, e.g. '{order: 5}'
+ ]
+
+ output_schema = {
+ "output": "string", # `ros2 action list`
+ }
+
+ def run(self, **kwargs):
+ # Used in the suggestion string
+ console = self.bb.get("console", None)
+ if console is None:
+ raise Exception("Could not find console, aborting...")
+
+ command = kwargs.get("command", None)
+ action_name = kwargs.get("action_name", None)
+ action_type = kwargs.get("action_type", None)
+ goal_args = kwargs.get("goal_args", None)
+
+ result = {
+ "output": "",
+ }
+
+ command = command.lower()
+
+ action_name_list_str = run_oneshot_cmd(["ros2", "action", "list"])
+ action_name_list = action_name_list_str.splitlines()
+
+ # -- Action name suggestions --
+ if command != "list":
+ # Check if the topic is not available ros2 topic list
+ suggested_action_name = suggest_string(console, self.name, "Action", action_name, action_name_list)
+ if suggested_action_name is not None:
+ action_name = suggested_action_name
+
+ # Check if the action_name is null (suggest_string() failed)
+ if not action_name:
+ raise ValueError("`command='{}'` requires `action_name`.".format(command))
+
+ # -- ros2 action list -------------------------------------------------
+ if command == "list":
+ result["output"] = action_name_list_str
+
+ # -- ros2 action info -----------------------------------
+ elif command == "info":
+ info_output = run_oneshot_cmd(["ros2", "action", "info", action_name])
+ result["output"] = info_output
+
+ # -- ros2 action type ------------------------------------
+ elif command == "get":
+ get_output = run_oneshot_cmd(["ros2", "param", "get", node_name, param_name])
+ result["output"] = get_output
+
+ # -- ros2 param describe -------------------------------
+ elif command == "describe":
+ describe_output = run_oneshot_cmd(["ros2", "param", "describe", node_name, param_name])
+ result["output"] = describe_output
+
+ # -- ros2 param set ------------------------
+ elif command == "set":
+ if set_value is None:
+ raise ValueError("`command='set'` requires `set_value`.")
+
+ set_output = run_oneshot_cmd(["ros2", "param", "set", node_name, param_name, set_value])
+ result["output"] = set_output
+
+ # -- ros2 param delete ----------------------------------
+ elif command == "delete":
+ delete_output = run_oneshot_cmd(["ros2", "param", "delete", node_name, param_name])
+ result["output"] = delete_output
+
+ # -- ros2 param dump [file_path] -------------------------------
+ elif command == "dump":
+ # Two modes:
+ # - If file_path given, write to file with --output-file
+ # - Otherwise, capture YAML from stdout
+ if file_path:
+ dump_output = run_oneshot_cmd(["ros2", "param", "dump", node_name, "--output-file", file_path])
+ # CLI usually prints a line like "Saved parameters to file..."
+ # so we just expose that.
+ result["output"] = dump_output or f"Dumped parameters to {file_path}"
+ else:
+ dump_output = run_oneshot_cmd(["ros2", "param", "dump", node_name])
+ result["output"] = dump_output
+
+ # -- ros2 param load -------------------------------
+ elif command == "load":
+ if not file_path:
+ raise ValueError("`command='load'` `file_path`.")
+
+ load_output = run_oneshot_cmd(["ros2", "param", "load", node_name, file_path])
+ result["output"] = load_output
+
+ # -- unknown ----------------------------------------------------------
+ else:
+ raise ValueError(
+ f"Unknown command '{command}'. Expected one of: list, get, describe, set, delete, dump, load."
+ )
+
+ print_tool_output(console, result["output"], self.name)
+ return result
+
+
+@vulcanai_tool
+class Ros2PkgTool(AtomicTool):
+ name = "ros2_pkg"
+ description = "Wrapper for `ros2 pkg` CLI.Run any subcommand: 'list', 'executables'."
+ tags = ["ros2", "pkg", "packages", "cli", "introspection"]
+
+ # If package_name is not provided, runs: `ros2 pkg list`
+ # If provided, runs: `ros2 pkg executables `
+ input_schema = [
+ ("command", "string"), # Command
+ ]
+
+ output_schema = {
+ "output": "string", # list of packages or list of executables for a package.
+ }
+
+ def run(self, **kwargs):
+ # Used in the suggestion string
+ console = self.bb.get("console", None)
+ if console is None:
+ raise Exception("Could not find console, aborting...")
+
+ # Get the package name if provided by the query
+ command = kwargs.get("command", None)
+ result = {
+ "output": "",
+ }
+
+ command = command.lower()
+
+ # -- Run `ros2 pkg list` ----------------------------------------------
+ if command == "list":
+ pkg_name_list = run_oneshot_cmd(["ros2", "pkg", "list"])
+ result["output"] = pkg_name_list
+
+ # -- Run `ros2 pkg executables` ---------------------------------------
+ elif command == "executables":
+ pkg_name_list = run_oneshot_cmd(["ros2", "pkg", "executables"])
+ result["output"] = pkg_name_list
+
+ # -- unknown ----------------------------------------------------------
+ else:
+ raise ValueError(f"Unknown command '{command}'. Expected one of: list, executables, prefix, xml")
+
+ print_tool_output(console, result["output"], self.name)
+ return result
+
+
+@vulcanai_tool
+class Ros2InterfaceTool(AtomicTool):
+ name = "ros2_interface"
+ description = (
+ "Wrapper for `ros2 interface` CLI."
+ "Run any subcommand: 'list', 'packages', 'package', 'show'."
+ "With optional arguments like 'interface_name'."
+ )
+ tags = ["ros2", "interface", "msg", "srv", "cli", "introspection"]
+
+ # - `command` lets you pick a single subcommand (list/packages/package).
+ input_schema = [
+ ("command", "string"), # Command
+ ("interface_name", "string?"), # (optional) Name of the interface, e.g. "std_msgs/msg/String".
+ # If not provided, the command is `ros2 interface list`.
+ # Otherwise `ros2 interface show `.
+ ]
+
+ output_schema = {
+ "output": "string", # list of interfaces (as list of strings) or full interface definition.
+ }
+
+ def run(self, **kwargs):
+ # Used in the suggestion string
+ console = self.bb.get("console", None)
+ if console is None:
+ raise Exception("Could not find console, aborting...")
+
+ # Get the interface name if provided by the query
+ command = kwargs.get("command", None)
+ interface_name = kwargs.get("interface_name", None)
+
+ result = {
+ "output": "",
+ }
+
+ command = command.lower()
+
+ interface_name_list_str = run_oneshot_cmd(["ros2", "interface", "list"])
+ interface_name_list = interface_name_list_str.splitlines()
+
+ package_name_list_str = run_oneshot_cmd(["ros2", "interface", "packages"])
+ package_name_list = package_name_list_str.splitlines()
+
+ # -- ros2 interface list ----------------------------------------------
+ if interface_name is None:
+ result["output"] = interface_name_list_str
+
+ # -- ros2 interface packages ------------------------------------------
+ elif command == "packages":
+ result["output"] = package_name_list_str
+
+ # -- ros2 interface package --------------------------------
+ elif command == "package":
+ package_name = interface_name
+ # Check if the topic is not available ros2 topic list
+ # if it is not create a window for the user to choose a correct topic name
+ suggested_package_name = suggest_string(console, self.name, "Interface", package_name, package_name_list)
+ if suggested_package_name is not None:
+ package_name = suggested_package_name
+
+ # Check if the interface_name is null (suggest_string() failed)
+ if not interface_name:
+ raise ValueError("`command='{}'` requires `interface_name`.".format(command))
+
+ info_output = run_oneshot_cmd(["ros2", "topic", "package", package_name])
+ result["output"] = info_output
+
+ # -- ros2 interface show --------------------------------
+ elif command == "show":
+ # Check if the topic is not available ros2 topic list
+ # if it is not create a window for the user to choose a correct topic name
+ suggested_interface_name = suggest_string(
+ console, self.name, "Interface", interface_name, interface_name_list
+ )
+ if suggested_interface_name is not None:
+ interface_name = suggested_interface_name
+
+ # Check if the interface_name is null (suggest_string() failed)
+ if not interface_name:
+ raise ValueError("`command='{}'` requires `interface_name`.".format(command))
+
+ info_output = run_oneshot_cmd(["ros2", "topic", "show", interface_name])
+ result["output"] = info_output
+
+ # -- unknown ----------------------------------------------------------
+ else:
+ raise ValueError(
+ f"Unknown command '{command}'. Expected one of: list, info, echo, bw, delay, hz, find, pub, type."
+ )
+
+ print_tool_output(console, result["output"], self.name)
+ return result
+
+
+def import_msg_type(type_str: str, node):
+ """
+ Dynamically import a ROS 2 message type from its string identifier.
+
+ This function resolves a ROS 2 message type expressed as a string
+ (e.g. `"std_msgs/msg/String"`) into the corresponding Python message class
+ (`std_msgs.msg.String`).
+ """
+ info_list = type_str.split("/")
+
+ if len(info_list) != 3:
+ pkg = "std_msgs"
+ msg_name = info_list[-1]
+ node.get_logger().warn(
+ f"Cannot import ROS message type '{type_str}'. " + "Adding default pkg 'std_msgs' instead."
+ )
+ else:
+ pkg, _, msg_name = info_list
+
+ module = importlib.import_module(f"{pkg}.msg")
+
+ return getattr(module, msg_name)
+
+
+@vulcanai_tool
+class Ros2PublishTool(AtomicTool):
+ name = "ros_publish"
+ description = (
+ "Publish one or more messages to a given ROS 2 topic [topic_name]. "
+ "Or execute 'ros2 topic pub [topic_name]'. "
+ "Supports both simple string messages (for std_msgs/msg/String) and custom message types. "
+ "For custom types, pass message_data as a JSON object with field names and values. "
+ "By default it keeps publishing messages until Ctrl+C is pressed. "
+ "with type 'std_msgs/msg/String' in topic '/chatter' "
+ "with 0.1 seconds of delay between messages to publish"
+ 'Example for custom type: msg_type=\'my_pkg/msg/MyMessage\', message_data=\'{"index": 1, "message": "Hello"}\''
+ )
+ tags = ["ros2", "publish", "message", "std_msgs"]
+
+ input_schema = [
+ ("topic", "string"), # e.g. "/chatter"
+ ("message_data", "string?"), # (optional) payload - string for std_msgs/String or JSON for custom types
+ ("msg_type", "string?"), # (optional) e.g. "std_msgs/msg/String" or "my_pkg/msg/CustomMsg"
+ ("max_lines", "int?"), # (optional) number of messages to publish
+ ("max_duration", "int?"), # (optional) stop after this seconds
+ ("period_sec", "float?"), # (optional) delay between publishes (in seconds)
+ ("message", "string?"), # (deprecated) use message_data instead
+ ]
+
+ output_schema = {
+ "published": "bool",
+ "count": "int",
+ "topic": "string",
+ "output": "string",
+ }
+
+ def msg_from_dict(self, msg, values: dict):
+ """
+ Populate a ROS 2 message instance from a Python dictionary.
+
+ This function recursively assigns values from a dictionary to the
+ corresponding fields of a ROS 2 message instance.
+
+ Supports:
+ - Primitive fields (int, float, bool, string)
+ - Nested ROS 2 messages
+
+ """
+ for field, value in values.items():
+ attr = getattr(msg, field)
+ if hasattr(attr, "__slots__"):
+ self.msg_from_dict(attr, value)
+ else:
+ setattr(msg, field, value)
+
+ def run(self, **kwargs):
+ # Ros2 node to create the Publisher and print the log information
+ node = self.bb.get("main_node", None)
+ if node is None:
+ raise Exception("Could not find shared node, aborting...")
+ # Optional console handle to route logs to the subprocess panel.
+ console = self.bb.get("console", None)
+
+ result = {
+ "published": "False",
+ "count": "0",
+ "topic": "",
+ "output": "",
+ }
+
+ panel_enabled = console is not None and hasattr(console, "show_subprocess_panel")
+ if panel_enabled:
+ console.call_from_thread(console.show_subprocess_panel)
+ if hasattr(console, "change_route_logs"):
+ console.call_from_thread(console.change_route_logs, True)
+
+ topic_name = kwargs.get("topic", "/chatter")
+ # Support both 'message_data' (new) and 'message' (deprecated)
+ message_data = kwargs.get("message_data", kwargs.get("message", "Hello from VulcanAI PublishTool!"))
+ msg_type_str = kwargs.get("msg_type", "std_msgs/msg/String")
+
+ max_duration = kwargs.get("max_duration", None)
+ if max_duration is not None and not isinstance(max_duration, (int, float)):
+ max_duration = None
+ if max_duration is not None and max_duration <= 0:
+ max_duration = None
+
+ max_lines = kwargs.get("max_lines", None)
+ if max_lines is not None and not isinstance(max_lines, int):
+ max_lines = None
+
+ period_sec = kwargs.get("period_sec", 0.1)
+
+ qos_depth = 10
+
+ if console is None:
+ print("[ERROR] Console not is None")
+
+ return result
+
+ published_msgs = []
+ output_lines = []
+ publisher = None
+ cancel_token = None
+
+ try:
+ if not topic_name:
+ console.call_from_thread(console.logger.log_msg, "[ROS] [ERROR] No topic provided.")
+ return result
+
+ result["topic"] = topic_name
+
+ if max_lines is not None and max_lines <= 0:
+ # No messages to publish
+ console.call_from_thread(
+ console.logger.log_msg, "[ROS] [WARN] max_lines <= 0, nothing to publish."
+ )
+ return result
+
+ MsgType = import_msg_type(msg_type_str, node)
+ publisher = node.create_publisher(MsgType, topic_name, qos_depth)
+ cancel_token = Future()
+ console.set_stream_task(cancel_token)
+ console.logger.log_tool("[tool]Publisher created![tool]", tool_name=self.name)
+ #output_lines.append(f"[TOOL {self.name}] Publisher created!")
+ log_tool_in_stream_and_main(
+ console,
+ "[tool]Publisher created![tool]",
+ tool_name=self.name,
+ )
+
+ start_time = time.monotonic()
+ published_count = 0
+
+ while True:
+ if cancel_token.cancelled():
+ log_tool_in_stream_and_main(
+ console,
+ "[tool]Ctrl+C received:[/tool] stopping publish...",
+ tool_name=self.name,
+ )
+ break
+ if max_lines is not None and published_count >= max_lines:
+ log_tool_in_stream_and_main(
+ console,
+ f"[tool]Stopping:[/tool] Reached max_lines = {max_lines}",
+ tool_name=self.name,
+ )
+ break
+ if max_duration is not None and (time.monotonic() - start_time) >= max_duration:
+ log_tool_in_stream_and_main(
+ console,
+ f"[tool]Stopping:[/tool] Exceeded max_duration = {max_duration}s",
+ tool_name=self.name,
+ )
+ break
+
+ msg = MsgType()
+
+ # Try to populate message based on message type
+ if hasattr(msg, "data"):
+ # Standard message type with a 'data' field (e.g., std_msgs/msg/String)
+ msg.data = message_data
+ else:
+ # Custom message type - parse message_data as JSON
+ try:
+ payload = json.loads(message_data)
+ self.msg_from_dict(msg, payload)
+ except json.JSONDecodeError as e:
+ console.call_from_thread(
+ console.logger.log_msg,
+ "[ROS] [ERROR] Failed to parse message_data as JSON for custom type"
+ + f"'{msg_type_str}': {e}",
+ )
+ return result
+
+ if hasattr(msg, "data"):
+ publish_line = f"[ROS] [INFO] Publishing: '{msg.data}'"
+ console.call_from_thread(
+ console.logger.log_msg, f"{publish_line}"
+ )
+ else:
+ publish_line = f"[ROS] [INFO] Publishing custom message to '{topic_name}'"
+ console.call_from_thread(
+ console.logger.log_msg,
+ f"{publish_line}",
+ )
+ output_lines.append(publish_line)
+ publisher.publish(msg)
+ published_msgs.append(msg.data if hasattr(msg, "data") else str(msg))
+ published_count += 1
+
+ rclpy.spin_once(node, timeout_sec=0.05)
+
+ if period_sec and period_sec > 0.0:
+ time.sleep(period_sec)
+
+ finally:
+ console.set_stream_task(None)
+ if panel_enabled:
+ if hasattr(console, "change_route_logs"):
+ console.call_from_thread(console.change_route_logs, False)
+ if publisher is not None:
+ try:
+ node.destroy_publisher(publisher)
+ except Exception:
+ pass
+
+ result["output"] = "\n".join(output_lines)
+
+ if published_msgs is not None:
+ result["published"] = "True"
+ result["count"] = len(published_msgs)
+
+ print_tool_output(console, result["output"], self.name)
+
+ # if panel_enabled:
+ # console.logger.log_msg(result["output"], color="gray")
+ # console.logger.log_msg("\n")
+ # else:
+ # print_tool_output(console, result["output"], self.name)
+ return result
+
+
+@vulcanai_tool
+class Ros2SubscribeTool(AtomicTool):
+ name = "ros_subscribe"
+ description = (
+ "Subscribe to a topic [topic] or execute 'ros2 topic echo [topic]' "
+ "and stop after receiving N messages or max duration."
+ )
+ tags = ["ros2", "subscribe", "topic", "std_msgs"]
+
+ input_schema = [
+ ("topic", "string"), # topic name
+ ("max_lines", "int?"), # (optional) stop after this number of messages
+ ("max_duration", "int?"), # (optional) stop after this seconds
+ ]
+
+ output_schema = {
+ "subscribed": "bool",
+ "count": "int",
+ "topic": "string",
+ "output": "string",
+ }
+
+ def run(self, **kwargs):
+ # Ros2 node to create the Publisher and print the log information
+ node = self.bb.get("main_node", None)
+ if node is None:
+ raise Exception("Could not find shared node, aborting...")
+ # Optional console handle to support Ctrl+C cancellation.
+ console = self.bb.get("console", None)
+
+ result = {
+ "subscribed": "False",
+ "count": "0",
+ "topic": "",
+ "output": "",
+ }
+
+ topic_name = kwargs.get("topic", None)
+ max_duration = kwargs.get("max_duration", None)
+ if max_duration is not None and not isinstance(max_duration, (int, float)):
+ max_duration = None
+
+ max_lines = kwargs.get("max_lines", None)
+ if max_lines is not None and not isinstance(max_lines, int):
+ max_lines = None
+
+ output_lines = []
+ panel_enabled = console is not None and hasattr(console, "change_route_logs")
+ if panel_enabled:
+ console.call_from_thread(console.change_route_logs, True)
+
+ # Start line (stream panel/main panel with tool color)
+ console.logger.log_tool("[tool]Subscriber created![/tool]", tool_name=self.name)
+
+ # "--field data" prints only the data field from each message
+ # instead of the full YAML message
+ # "--no-arr" do not print array fields of messages
+ base_args = ["ros2", "topic", "echo", topic_name, "--field", "data", "--no-arr"]
+ ret = execute_subprocess(console, self.name, base_args, max_duration, max_lines, log_created=False)
+
+ ret_lines = ret.splitlines() if isinstance(ret, str) and ret else []
+
+
+ result["output"] = "\n".join(ret_lines)
+
+ if ret is not None:
+ result["subscribed"] = "True"
+ result["count"] = len(ret_lines)
+ result["topic"] = topic_name
+
+ print_tool_output(console, result["output"], self.name)
+
+ # if panel_enabled:
+ # console.call_from_thread(console.change_route_logs, False)
+ # if ret_lines:
+ # console.logger.log_msg("\n".join(f"{line}" for line in ret_lines))
+ # # if stop_reason:
+ # # if max_lines is not None and len(ret_lines) >= max_lines:
+ # # console.logger.log_tool(
+ # # f"[tool]Stopping:[/tool] Reached max_lines = {max_lines}",
+ # # tool_name=self.name,
+ # # )
+ # # elif max_duration is not None:
+ # # console.logger.log_tool(
+ # # f"[tool]Stopping:[/tool] Exceeded max_duration = {max_duration}s",
+ # # tool_name=self.name,
+ # # )
+ # console.logger.log_msg("\n")
+ # else:
+ # print_tool_output(console, result["output"], self.name)
+ return result
diff --git a/src/vulcanai/tools/tool_registry.py b/src/vulcanai/tools/tool_registry.py
index 686c8af..2ea5743 100644
--- a/src/vulcanai/tools/tool_registry.py
+++ b/src/vulcanai/tools/tool_registry.py
@@ -78,7 +78,7 @@ def run(self, **kwargs):
class ToolRegistry:
"""Holds all known tools and performs vector search over metadata."""
- def __init__(self, embedder=None, logger=None):
+ def __init__(self, embedder=None, logger=None, default_tools=True):
# Logging function from the class VulcanConsole
self.logger = logger or VulcanAILogger.default()
# Dictionary of tools (name -> tool instance)
@@ -97,6 +97,14 @@ def __init__(self, embedder=None, logger=None):
# Validation tools list to retrieve validation tools separately
self.validation_tools: List[str] = []
+ # Default tools
+ if default_tools:
+ try:
+ self.discover_tools_from_entry_points("ros2_default_tools")
+ except ImportError as e:
+ self.logger.log_msg(f"[error]{e}[/error]")
+ raise
+
def register_tool(self, tool: ITool, solve_deps: bool = True):
"""Register a single tool instance."""
# Avoid duplicates
diff --git a/src/vulcanai/tools/utils.py b/src/vulcanai/tools/utils.py
new file mode 100644
index 0000000..d643793
--- /dev/null
+++ b/src/vulcanai/tools/utils.py
@@ -0,0 +1,318 @@
+# Copyright 2026 Proyectos y Sistemas de Mantenimiento SL (eProsima).
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import asyncio
+import difflib
+import heapq
+import subprocess
+import threading
+import time
+
+from textual.markup import escape # To remove potential errors in textual terminal
+
+
+async def run_streaming_cmd_async(
+ console,
+ args: list[str],
+ max_duration: float | None = None,
+ max_lines: int | None = None,
+ echo: bool = True,
+ tool_name="",
+) -> str:
+ # Unpack the command
+ cmd, *cmd_args = args
+
+ captured_lines: list[str] = []
+ process = None
+ try:
+ # Create the subprocess
+ process = await asyncio.create_subprocess_exec(
+ cmd,
+ *cmd_args,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.STDOUT,
+ )
+
+ assert process.stdout is not None
+
+ start_time = time.monotonic()
+ line_count = 0
+
+ # Subprocess main loop. Read line by line
+ async for raw_line in process.stdout:
+ line = raw_line.decode(errors="ignore").rstrip("\n")
+ captured_line = line
+ display_line = line
+
+ # Print the line
+ if echo:
+ if args[:3] == ["ros2", "topic", "echo"] and line:
+ msg = line.strip()
+ if msg == "---":
+ continue
+ msg = msg.strip("'\"")
+ captured_line = f"[ROS] [INFO] I heard: '{msg}'"
+ display_line = f"{captured_line}"
+
+ captured_lines.append(captured_line)
+ line_processed = display_line if display_line != captured_line else escape(display_line)
+ if hasattr(console, "add_subprocess_line"):
+ console.add_subprocess_line(line_processed)
+ else:
+ console.add_line(line_processed)
+
+ # Count the line
+ line_count += 1
+ if max_lines is not None and line_count >= max_lines:
+ log_tool_in_stream_and_main(
+ console,
+ f"[tool]Stopping:[/tool] Reached max_lines = {max_lines}",
+ tool_name=tool_name,
+ )
+ console.set_stream_task(None)
+ process.terminate()
+ break
+
+ # Check duration
+ if max_duration is not None and (time.monotonic() - start_time) >= max_duration:
+ log_tool_in_stream_and_main(
+ console,
+ f"[tool]Stopping:[/tool] Exceeded max_duration = {max_duration}s", tool_name=tool_name
+ )
+ console.set_stream_task(None)
+ process.terminate()
+ break
+
+ except asyncio.CancelledError:
+ # Task was cancelled → stop the subprocess
+ log_tool_in_stream_and_main(
+ console,
+ "[tool]Cancellation received:[/tool] terminating subprocess...",
+ tool_name=tool_name,
+ )
+ if process is not None:
+ process.terminate()
+
+ # Not necessary, textual terminal get the keyboard input
+ except KeyboardInterrupt:
+ # Ctrl+C pressed → stop subprocess
+ log_tool_in_stream_and_main(
+ console,
+ "[tool]Ctrl+C received:[/tool] terminating subprocess...",
+ tool_name=tool_name,
+ )
+ if process is not None:
+ process.terminate()
+
+ finally:
+ try:
+ if process is not None:
+ await asyncio.wait_for(process.wait(), timeout=3.0)
+ except asyncio.TimeoutError:
+ log_tool_in_stream_and_main(
+ console,
+ "Subprocess didn't exit in time → killing it.",
+ tool_name=tool_name,
+ error=True,
+ )
+ if process is not None:
+ process.kill()
+ await process.wait()
+ finally:
+ # Always restore default logging route and close the popup panel
+ # when a streaming subprocess ends. The popup stays visible and is
+ # closed explicitly by user Ctrl+C in console actions.
+ if hasattr(console, "change_route_logs"):
+ console.change_route_logs(False)
+ console.set_stream_task(None)
+ return "\n".join(captured_lines)
+
+
+def execute_subprocess(console, tool_name, base_args, max_duration, max_lines, log_created: bool = True):
+ stream_task = None
+ done_event = threading.Event()
+ result = {"output": ""}
+
+ def _launcher() -> None:
+ nonlocal stream_task
+ if hasattr(console, "show_subprocess_panel"):
+ console.show_subprocess_panel()
+
+ # This always runs in the Textual event-loop thread
+ loop = asyncio.get_running_loop()
+ stream_task = loop.create_task(
+ run_streaming_cmd_async(
+ console,
+ base_args,
+ max_duration=max_duration,
+ max_lines=max_lines,
+ tool_name=tool_name, # tool_header_str
+ )
+ )
+ # Keep the real task reference so Ctrl+C can cancel it.
+ console.set_stream_task(stream_task)
+
+ def _on_done(task: asyncio.Task) -> None:
+ try:
+ if not task.cancelled():
+ result["output"] = task.result() or ""
+ except Exception as e:
+ console.logger.log_msg(f"Echo task error: {e!r}\n", error=True)
+ finally:
+ done_event.set()
+
+ stream_task.add_done_callback(_on_done)
+
+ # `/rerun` workers can have their own asyncio loop in a non-UI thread.
+ # Route UI/task creation to Textual app thread unless we are already there.
+ if threading.current_thread() is threading.main_thread():
+ _launcher()
+ else:
+ # `console.app` is your Textual App instance.
+ console.app.call_from_thread(_launcher)
+
+ if log_created:
+ console.logger.log_tool("[tool]Subprocess created![tool]", tool_name=tool_name)
+ # Wait for streaming command to finish and return collected lines.
+ # In UI thread we avoid blocking to prevent deadlocks.
+ if threading.current_thread() is threading.main_thread():
+ return ""
+ done_event.wait()
+ return result["output"]
+
+
+def run_oneshot_cmd(args: list[str]) -> str:
+ try:
+ return subprocess.check_output(args, stderr=subprocess.STDOUT, text=True)
+
+ except subprocess.CalledProcessError as e:
+ raise Exception(f"Failed to run '{' '.join(args)}': {e.output}")
+
+
+def suggest_string(console, tool_name, string_name, input_string, real_string_list):
+ ret = None
+
+ def _similarity(a: str, b: str) -> float:
+ """Return a similarity score between 0 and 1."""
+ return difflib.SequenceMatcher(None, a, b).ratio()
+
+ def _get_suggestions(real_string_list_comp: list[str], string_comp: str) -> tuple[str, list[str]]:
+ """
+ Function used to search for the most "similar" string in a list.
+
+ Used in ROS2 cli commands to used the "simmilar" in case
+ the queried topic does not exists.
+
+ Example:
+ real_string_list_comp = [
+ "/parameter_events",
+ "/rosout",
+ "/turtle1/cmd_vel",
+ "/turtle1/color_sensor",
+ "/turtle1/pose",
+ ]
+ string_comp = "trtle1"
+
+ @return
+ str: the most "similar" string
+ list[str] a sorted list by a similitud value
+ """
+
+ topic_list_pq = []
+ n = len(string_comp)
+
+ for string in real_string_list_comp:
+ m = len(string)
+ # Calculate the similitud
+ score = _similarity(string_comp, string)
+ # Give more value for the nearest size comparations.
+ score -= abs(n - m) * 0.005
+ # Max-heap (via negative score)
+ heapq.heappush(topic_list_pq, (-score, string))
+
+ # Pop ordered list
+ ret_list: list[str] = []
+ _, most_topic_similar = heapq.heappop(topic_list_pq)
+
+ ret_list.append(most_topic_similar)
+
+ while topic_list_pq:
+ _, topic = heapq.heappop(topic_list_pq)
+ ret_list.append(topic)
+
+ return most_topic_similar, ret_list
+
+ # Add '/' for Topic, service, action, node
+ ros_categories_list = ["Topic", "Service", "Action", "Node"]
+ if string_name in ros_categories_list and len(input_string) > 0 and input_string[0] != "/":
+ input_string = f"/{input_string}"
+ ret = input_string
+
+ if input_string not in real_string_list:
+ console.logger.log_tool(f'{string_name}: "{input_string}" does not exists', tool_name=tool_name)
+
+ # Get the suggestions list sorted by similitud value
+ _, topic_sim_list = _get_suggestions(real_string_list, input_string)
+
+ # Open the ModalScreen
+ console.open_radiolist(topic_sim_list, tool_name, string_name, input_string)
+
+ # Wait for the user to select and item in the
+ # RadioList ModalScreen
+ console.suggestion_index_changed.wait()
+
+ # Check if the user cancelled the suggestion
+ if console.suggestion_index >= 0:
+ ret = topic_sim_list[console.suggestion_index]
+
+ # Reset suggestion index
+ console.suggestion_index = -1
+ console.suggestion_index_changed.clear()
+
+ return ret
+
+
+def last_output_lines(console, tool_name: str, output: str, n_lines: int = 10) -> str:
+ """
+ Keep only the last `max_lines` lines in tool output and log this behavior.
+ """
+ lines = output.splitlines()
+ if console is not None and hasattr(console, "logger"):
+ console.logger.log_tool(
+ f"Returning only the last {n_lines} lines in result['output'].",
+ tool_name=tool_name,
+ )
+ return "\n".join(lines[-n_lines:])
+
+def print_tool_output(console, result, name):
+ console.logger.log_tool("[tool]Output:[tool]", tool_name=name)
+ console.logger.log_msg(result, color="gray")
+ console.logger.log_msg("\n")
+
+def log_tool_in_stream_and_main(console, msg: str, tool_name: str = "", error: bool = False, color: str = "") -> None:
+ """
+ Log a tool message in both Logs (streaming and main)
+ """
+ processed_msg = console.logger.log_tool(msg, tool_name=tool_name, error=error, color=color)
+
+ if not getattr(console, "_route_logs_to_stream_panel", False):
+ return
+
+ main_panel = getattr(console, "main_pannel", None)
+ if main_panel is None:
+ return
+
+ for line in processed_msg.splitlines():
+ main_panel.append_line(line)
\ No newline at end of file
diff --git a/tests/unittest/test_default_tools.py b/tests/unittest/test_default_tools.py
new file mode 100644
index 0000000..58c4871
--- /dev/null
+++ b/tests/unittest/test_default_tools.py
@@ -0,0 +1,438 @@
+# Copyright 2026 Proyectos y Sistemas de Mantenimiento SL (eProsima).
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Unit tests for the ROS 2 default tools (ros2_topic).
+
+These tests require a working ROS 2 environment (rclpy importable and
+ros2 CLI available). Tests are automatically skipped when ROS 2 is not
+installed.
+
+Tests call the tool's ``run()`` method directly with a minimal blackboard
+containing a mock console that implements the methods the tool relies on
+(logging, suggestion handling, subprocess panel, stream task).
+Background ``ros2 topic pub`` processes are used to create observable topics.
+"""
+
+import asyncio
+import importlib
+import os
+import signal
+import subprocess
+import sys
+import threading
+import time
+import unittest
+
+# ---------------------------------------------------------------------------
+# Skip entire module when ROS 2 is not available
+# ---------------------------------------------------------------------------
+try:
+ import rclpy # noqa: F401
+
+ ROS2_AVAILABLE = True
+except ImportError:
+ ROS2_AVAILABLE = False
+
+# Also check that the ros2 CLI is on PATH
+try:
+ subprocess.check_output(["ros2", "topic", "list"], stderr=subprocess.STDOUT, timeout=10)
+ ROS2_CLI_AVAILABLE = True
+except Exception:
+ ROS2_CLI_AVAILABLE = False
+
+
+# Make src-layout importable
+CURRENT_DIR = os.path.dirname(__file__)
+SRC_DIR = os.path.abspath(os.path.join(CURRENT_DIR, os.path.pardir, os.path.pardir, "src"))
+if SRC_DIR not in sys.path:
+ sys.path.insert(0, SRC_DIR)
+
+
+# ---------------------------------------------------------------------------
+# Mock console — implements only the interface used by tools and utils
+# ---------------------------------------------------------------------------
+class _MockLogger:
+ """Collects log calls for optional inspection."""
+
+ def __init__(self):
+ self.messages = []
+
+ def log_tool(self, msg, **kwargs):
+ self.messages.append(msg)
+
+ def log_msg(self, msg, **kwargs):
+ self.messages.append(msg)
+
+ def log_console(self, msg, **kwargs):
+ self.messages.append(msg)
+
+
+class _MockApp:
+ """Provides an asyncio event loop on a background thread,
+ mimicking Textual's ``app.call_from_thread()`` for
+ ``execute_subprocess``."""
+
+ def __init__(self):
+ self._loop = asyncio.new_event_loop()
+ self._thread = threading.Thread(target=self._loop.run_forever, daemon=True)
+ self._thread.start()
+
+ def call_from_thread(self, fn, *args, **kwargs):
+ self._loop.call_soon_threadsafe(fn)
+
+ def stop(self):
+ self._loop.call_soon_threadsafe(self._loop.stop)
+ self._thread.join(timeout=5)
+
+
+class MockConsole:
+ """Implements the console interface expected by tools and their helpers.
+
+ Covers:
+ - ``logger`` — used for all tool logging
+ - ``app`` — used by execute_subprocess
+ (provides asyncio event loop)
+ - ``set_stream_task`` — used by execute_subprocess
+ - ``show/hide_subprocess_panel`` — used by execute_subprocess
+ - ``add_line / add_subprocess_line`` — used by run_streaming_cmd_async
+ - ``change_route_logs`` — used by streaming commands
+ - ``suggestion_index*`` — used by suggest_string when a topic
+ is not found (auto-selects first match)
+ - ``open_radiolist`` — used by suggest_string modal
+ """
+
+ def __init__(self):
+ self.logger = _MockLogger()
+ self.app = _MockApp()
+ # suggest_string support: auto-accept the first suggestion
+ self.suggestion_index = 0
+ self.suggestion_index_changed = threading.Event()
+ self.suggestion_index_changed.set() # unblock immediately
+ self._stream_task = None
+
+ def set_stream_task(self, task):
+ self._stream_task = task
+
+ def show_subprocess_panel(self):
+ pass
+
+ def hide_subprocess_panel(self):
+ pass
+
+ def change_route_logs(self, value):
+ pass
+
+ def add_line(self, text):
+ pass
+
+ def add_subprocess_line(self, text):
+ pass
+
+ def open_radiolist(self, items, tool_name, string_name, input_string):
+ # Auto-select index 0 (the best match) without user interaction
+ pass
+
+ def stop(self):
+ self.app.stop()
+
+
+# ---------------------------------------------------------------------------
+# Helper: background ROS 2 publisher
+# ---------------------------------------------------------------------------
+def start_background_publisher(
+ topic: str,
+ msg_type: str = "std_msgs/msg/String",
+ rate: float = 10.0,
+ message: str = "hello_test",
+):
+ """Launch ``ros2 topic pub`` in a subprocess and return the Popen handle."""
+ proc = subprocess.Popen(
+ [
+ "ros2",
+ "topic",
+ "pub",
+ topic,
+ msg_type,
+ f"{{data: '{message}'}}",
+ "--rate",
+ str(rate),
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ )
+ # Give the publisher a moment to register with the ROS graph
+ time.sleep(2)
+ return proc
+
+
+def stop_background_publisher(proc: subprocess.Popen):
+ """Terminate a background publisher gracefully."""
+ if proc.poll() is None:
+ proc.send_signal(signal.SIGINT)
+ try:
+ proc.wait(timeout=5)
+ except subprocess.TimeoutExpired:
+ proc.kill()
+ proc.wait()
+
+
+# ===========================================================================
+# Test class
+# ===========================================================================
+@unittest.skipUnless(
+ ROS2_AVAILABLE and ROS2_CLI_AVAILABLE,
+ "ROS 2 (rclpy + ros2 CLI) not available — skipping",
+)
+class TestRos2TopicTool(unittest.TestCase):
+ """Direct tests for ``Ros2TopicTool.run()``.
+
+ Each test instantiates the tool, injects a blackboard with a
+ MockConsole and a ROS2DefaultToolNode, and calls ``run()`` directly.
+ """
+
+ # -----------------------------------------------------------------------
+ # Fixtures
+ # -----------------------------------------------------------------------
+ @classmethod
+ def setUpClass(cls):
+ """Import the default tools module and the ROS2DefaultToolNode."""
+ default_tools_mod = importlib.import_module("vulcanai.tools.default_tools")
+
+ cls.Ros2TopicTool = default_tools_mod.Ros2TopicTool
+ cls.ROS2DefaultToolNode = default_tools_mod.ROS2DefaultToolNode
+
+ def setUp(self):
+ self.console = MockConsole()
+ self.node = self.ROS2DefaultToolNode()
+ self._bg_publishers = []
+
+ def tearDown(self):
+ for proc in self._bg_publishers:
+ stop_background_publisher(proc)
+ self._bg_publishers.clear()
+ self.node.destroy_node()
+ self.console.stop()
+
+ @classmethod
+ def tearDownClass(cls):
+ if rclpy.ok():
+ rclpy.shutdown()
+
+ # -----------------------------------------------------------------------
+ # Helpers
+ # -----------------------------------------------------------------------
+ def _run_topic(self, **kwargs):
+ """Create a Ros2TopicTool, inject a blackboard, and call run()."""
+ tool = self.Ros2TopicTool()
+ tool.bb = {"console": self.console, "main_node": self.node}
+ return tool.run(**kwargs)
+
+ def _run_topic_threaded(self, **kwargs):
+ """Run the tool from a worker thread.
+
+ Streaming commands (bw, hz, delay) use ``execute_subprocess`` which
+ requires a non-main thread so it can call
+ ``console.app.call_from_thread()`` and then block on a done-event.
+ """
+ result = {}
+ error = {}
+
+ def worker():
+ try:
+ result["value"] = self._run_topic(**kwargs)
+ except Exception as e:
+ error["exc"] = e
+
+ t = threading.Thread(target=worker)
+ t.start()
+ t.join(timeout=30)
+ if "exc" in error:
+ raise error["exc"]
+ return result.get("value")
+
+ # -----------------------------------------------------------------------
+ # Tests — ros2 topic list
+ # -----------------------------------------------------------------------
+ def test_topic_list(self):
+ """`list` should succeed and contain /rosout."""
+ result = self._run_topic(command="list")
+
+ self.assertIn("output", result)
+ self.assertIn("/rosout", result["output"])
+
+ def test_topic_list_with_background_pub(self):
+ """After starting a background publisher, `list` should include
+ that topic."""
+ topic = "/vulcan_test_topic_list"
+ proc = start_background_publisher(topic)
+ self._bg_publishers.append(proc)
+
+ result = self._run_topic(command="list")
+
+ self.assertIn(topic, result["output"])
+
+ # -----------------------------------------------------------------------
+ # Tests — ros2 topic info
+ # -----------------------------------------------------------------------
+ def test_topic_info(self):
+ """`info` on a published topic returns type and publisher count."""
+ topic = "/vulcan_test_topic_info"
+ proc = start_background_publisher(topic)
+ self._bg_publishers.append(proc)
+
+ result = self._run_topic(command="info", topic_name=topic)
+
+ self.assertIn("std_msgs/msg/String", result["output"])
+ self.assertIn("Publisher count:", result["output"])
+
+ # -----------------------------------------------------------------------
+ # Tests — ros2 topic type
+ # -----------------------------------------------------------------------
+ def test_topic_type(self):
+ """`type` returns the message type of a topic."""
+ topic = "/vulcan_test_topic_type"
+ proc = start_background_publisher(topic)
+ self._bg_publishers.append(proc)
+
+ result = self._run_topic(command="type", topic_name=topic)
+
+ self.assertIn("std_msgs/msg/String", result["output"])
+
+ # -----------------------------------------------------------------------
+ # Tests — ros2 topic find
+ # -----------------------------------------------------------------------
+ def test_topic_find(self):
+ """`find` locates topics by message type."""
+ topic = "/vulcan_test_topic_find"
+ proc = start_background_publisher(topic)
+ self._bg_publishers.append(proc)
+
+ result = self._run_topic(command="find", msg_type="std_msgs/msg/String")
+
+ self.assertIn(topic, result["output"])
+
+ # -----------------------------------------------------------------------
+ # Tests — ros2 topic bw (streaming)
+ # -----------------------------------------------------------------------
+ def test_topic_bw(self):
+ """`bw` measures bandwidth on an active topic."""
+ topic = "/vulcan_test_topic_bw"
+ proc = start_background_publisher(topic, rate=10.0)
+ self._bg_publishers.append(proc)
+
+ result = self._run_topic_threaded(
+ command="bw",
+ topic_name=topic,
+ max_duration=5.0,
+ max_lines=20,
+ )
+
+ self.assertIsNotNone(result)
+ self.assertIn("output", result)
+ self.assertIsInstance(result["output"], str)
+
+ # -----------------------------------------------------------------------
+ # Tests — ros2 topic hz (streaming)
+ # -----------------------------------------------------------------------
+ def test_topic_hz(self):
+ """`hz` measures the publishing rate of a topic."""
+ topic = "/vulcan_test_topic_hz"
+ proc = start_background_publisher(topic, rate=10.0)
+ self._bg_publishers.append(proc)
+
+ result = self._run_topic_threaded(
+ command="hz",
+ topic_name=topic,
+ max_duration=5.0,
+ max_lines=20,
+ )
+
+ self.assertIsNotNone(result)
+ self.assertIn("output", result)
+ self.assertIsInstance(result["output"], str)
+
+ # -----------------------------------------------------------------------
+ # Tests — ros2 topic delay (streaming)
+ # -----------------------------------------------------------------------
+ def test_topic_delay(self):
+ """`delay` runs without error on an active topic.
+
+ Note: ``ros2 topic delay`` requires messages with a header stamp.
+ With ``std_msgs/msg/String`` (no header) the output may be empty
+ or contain a warning, but the command should not crash.
+ """
+ topic = "/vulcan_test_topic_delay"
+ proc = start_background_publisher(topic, rate=10.0)
+ self._bg_publishers.append(proc)
+
+ result = self._run_topic_threaded(
+ command="delay",
+ topic_name=topic,
+ max_duration=5.0,
+ max_lines=20,
+ )
+
+ self.assertIsNotNone(result)
+ self.assertIn("output", result)
+ self.assertIsInstance(result["output"], str)
+
+ # -----------------------------------------------------------------------
+ # Tests — streaming commands with custom max_duration / max_lines
+ # -----------------------------------------------------------------------
+ def test_topic_hz_custom_limits(self):
+ """`hz` respects custom max_duration and max_lines."""
+ topic = "/vulcan_test_topic_hz_limits"
+ proc = start_background_publisher(topic, rate=10.0)
+ self._bg_publishers.append(proc)
+
+ result = self._run_topic_threaded(
+ command="hz",
+ topic_name=topic,
+ max_duration=3.0,
+ max_lines=5,
+ )
+
+ self.assertIsNotNone(result)
+ self.assertIn("output", result)
+
+ # -----------------------------------------------------------------------
+ # Tests — error cases
+ # -----------------------------------------------------------------------
+ def test_topic_unknown_command_raises(self):
+ """An unknown subcommand should raise ValueError."""
+ with self.assertRaises((ValueError, TypeError)):
+ self._run_topic(command="nonexistent")
+
+ with self.assertRaises(ValueError):
+ self._run_topic(command="nonexistent", topic_name="/rosout")
+
+ def test_topic_info_missing_topic_name_raises(self):
+ """`info` without a topic_name should raise ValueError."""
+ with self.assertRaises((ValueError, TypeError)):
+ self._run_topic(command="info")
+
+ def test_topic_type_missing_topic_name_raises(self):
+ """`type` without a topic_name should raise ValueError."""
+ with self.assertRaises((ValueError, TypeError)):
+ self._run_topic(command="type")
+
+ def test_topic_find_missing_msg_type_raises(self):
+ """`find` without a msg_type should raise an exception."""
+ with self.assertRaises(Exception):
+ self._run_topic(command="find")
+
+
+if __name__ == "__main__":
+ unittest.main()