diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index dac3605d..9c6d7f47 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -2,7 +2,7 @@ `Insert description` ## Fixes -Issue Number: `number` +Closes: `number` ## Checklist - [ ] Update CHANGELOG.md diff --git a/.github/workflows/galactic.yml b/.github/workflows/galactic.yml index 8472b672..4e703f50 100644 --- a/.github/workflows/galactic.yml +++ b/.github/workflows/galactic.yml @@ -8,6 +8,8 @@ on: env: HOME: /root + # Coupled with our use of the test_log crate this should give us good CI output on failure + RUST_LOG: debug jobs: galactic: @@ -22,6 +24,8 @@ jobs: uses: actions/checkout@v3 with: submodules: 'true' + - name: Verify rust version + run: source /root/.cargo/env; rustc --version - name: Lint run: source /root/.cargo/env; cargo fmt --all -- --check - name: Build Main Lib diff --git a/.github/workflows/humble.yml b/.github/workflows/humble.yml index 7c55d89a..53831e37 100644 --- a/.github/workflows/humble.yml +++ b/.github/workflows/humble.yml @@ -8,6 +8,8 @@ on: env: HOME: /root + # Coupled with our use of the test_log crate this should give us good CI output on failure + RUST_LOG: debug jobs: humble: @@ -22,6 +24,8 @@ jobs: uses: actions/checkout@v3 with: submodules: 'true' + - name: Verify rust version + run: source /root/.cargo/env; rustc --version - name: Lint run: source /root/.cargo/env; cargo fmt --all -- --check - name: Build Main Lib diff --git a/.github/workflows/iron.yml b/.github/workflows/iron.yml new file mode 100644 index 00000000..db003954 --- /dev/null +++ b/.github/workflows/iron.yml @@ -0,0 +1,39 @@ +name: Iron + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +env: + HOME: /root + # Coupled with our use of the test_log crate this should give us good CI output on failure + RUST_LOG: debug + +jobs: + humble: + runs-on: ubuntu-latest + defaults: + run: + shell: bash + container: carter12s/roslibrust-ci-iron:latest + timeout-minutes: 20 + steps: + - name: Checkout repo + uses: actions/checkout@v3 + with: + submodules: 'true' + - name: Lint + run: source /root/.cargo/env; cargo fmt --all -- --check + - name: Build Main Lib + run: source /root/.cargo/env; cargo build + # This step is required to confirm feature combinations work, the main workspace build does all features + - name: Build Proc Macro + run: source /root/.cargo/env; cargo build -p roslibrust_codegen_macro + - name: Unit Tests + run: source /root/.cargo/env; cargo test + - name: Start rosbridge + run: source /opt/ros/iron/setup.bash; ros2 launch rosbridge_server rosbridge_websocket_launch.xml & disown; ros2 run rosapi rosapi_node & sleep 1 + - name: Integration Tests + run: source /root/.cargo/env; cargo test --features ros2_test -- --test-threads 1 \ No newline at end of file diff --git a/.github/workflows/noetic.yml b/.github/workflows/noetic.yml index 10a0e2f8..6110e7f6 100644 --- a/.github/workflows/noetic.yml +++ b/.github/workflows/noetic.yml @@ -9,6 +9,8 @@ on: env: HOME: /root ROS_PACKAGE_PATH: /opt/ros/noetic/share + # Coupled with our use of the test_log crate this should give us good CI output on failure + RUST_LOG: debug jobs: noetic: @@ -23,6 +25,8 @@ jobs: uses: actions/checkout@v3 with: submodules: 'true' + - name: Verify rust version + run: source /root/.cargo/env; rustc --version - name: Lint run: source /root/.cargo/env; cargo fmt --all -- --check - name: Build Main Lib diff --git a/.github/workflows/noetic_gencpp.yml b/.github/workflows/noetic_gencpp.yml index db077492..41e85bd5 100644 --- a/.github/workflows/noetic_gencpp.yml +++ b/.github/workflows/noetic_gencpp.yml @@ -8,6 +8,8 @@ on: env: HOME: /root + # Coupled with our use of the test_log crate this should give us good CI output on failure + RUST_LOG: debug jobs: noetic_gencpp: diff --git a/CHANGELOG.md b/CHANGELOG.md index e78f0b35..6c2d7e2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - The function interface for top level generation functions in `roslibrust_codegen` have been changed to include the list of dependent filesystem paths that should trigger re-running code generation. Note: new files added to the search paths will not be automatically detected. - [Breaking Change] Codegen now generates fixed sized arrays as arrays [T; N] instead of Vec<T> + - Removed `find_and_generate_ros_messages_relative_to_manifest_dir!` this proc_macro was changing the current working directory of the compilation job resulting in a variety of strange compilation behaviors. Build.rs scripts are recommended for use cases requiring fine grained control of message generation. + - The function interface for top level generation functions in `roslibrust_codegen` have been changed to include the list of dependent filesystem paths that should trigger re-running code generation. Note: new files added to the search paths will not be automatically detected. + - Refactor the `ros1::node` module into separate smaller pieces. This should be invisible externally (and no changes to examples were required). ## 0.8.0 - October 4th, 2023 diff --git a/README.md b/README.md index 54523d19..f606040f 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,7 @@ [](https://github.com/Carter12s/roslibrust/actions/workflows/noetic.yml) [](https://github.com/Carter12s/roslibrust/actions/workflows/galactic.yml) [](https://github.com/Carter12s/roslibrust/actions/workflows/humble.yml) +[](https://github.com/Carter12s/roslibrust/actions/workflows/iron.yml) [](https://opensource.org/licenses/MIT) This package aims to provide a convenient intermediary between ROS1's rosbridge and Rust similar to roslibpy and roslibjs. diff --git a/docker/galactic/Dockerfile b/docker/galactic/Dockerfile index bfb9ecab..7efd9e2a 100644 --- a/docker/galactic/Dockerfile +++ b/docker/galactic/Dockerfile @@ -4,7 +4,6 @@ LABEL maintainer="Carter Schultz <carterjschultz@gmail.com>" # Required by github CI for submodule support RUN apt update && apt install -y git -# Adapted from https://github.com/JoaquinGimenez1/docker-noetic-rosbridge/blob/main/Dockerfile RUN apt update && apt install -y ros-galactic-rosbridge-suite # Curl required to install rust, build-essential required to build quote & proc-macro2 @@ -12,10 +11,6 @@ RUN apt update && apt install -y --fix-missing curl build-essential # Install latest stable rust RUN curl https://sh.rustup.rs -sSf | sh -s -- -y -# Update Cargo Registry -# See this for why we do it so wierd https://github.com/rust-lang/cargo/issues/3377 -RUN /root/.cargo/bin/cargo install lazy_static; exit 0 - WORKDIR / COPY entrypoint.sh . RUN chmod +x entrypoint.sh diff --git a/docker/humble/Dockerfile b/docker/humble/Dockerfile index d52a0835..586e7551 100644 --- a/docker/humble/Dockerfile +++ b/docker/humble/Dockerfile @@ -4,7 +4,6 @@ LABEL maintainer="Carter Schultz <carterjschultz@gmail.com>" # Required by github CI for submodule support RUN apt update && apt install -y git -# Adapted from https://github.com/JoaquinGimenez1/docker-noetic-rosbridge/blob/main/Dockerfile RUN apt update && apt install -y ros-humble-rosbridge-suite # Curl required to install rust, build-essential required to build quote & proc-macro2 @@ -12,10 +11,6 @@ RUN apt update && apt install -y --fix-missing curl build-essential # Install latest stable rust RUN curl https://sh.rustup.rs -sSf | sh -s -- -y -# Update Cargo Registry -# See this for why we do it so wierd https://github.com/rust-lang/cargo/issues/3377 -RUN /root/.cargo/bin/cargo install lazy_static; exit 0 - WORKDIR / COPY entrypoint.sh . RUN chmod +x entrypoint.sh diff --git a/docker/humble/README.md b/docker/humble/README.md index ce8abbad..04044364 100644 --- a/docker/humble/README.md +++ b/docker/humble/README.md @@ -1,10 +1,10 @@ -# Galactic CI Docker Image -Produces a docker image including both galactic rosbridge and needed rust. +# Humble CI Docker Image +Produces a docker image including both humble rosbridge and needed rust. # Building / Publishing (currently only carter has access, need to fix) -- docker build -t carter12s/roslibrust-ci-galactic:latest . +- docker build -t carter12s/roslibrust-ci-humble:latest . - Maybe needed: docker login -- docker push carter12s/roslibrust-ci-galactic:latest +- docker push carter12s/roslibrust-ci-humble:latest For debug: -docker run -it carter12s/roslibrust-ci-galactic /bin/bash \ No newline at end of file +docker run -it carter12s/roslibrust-ci-humble /bin/bash \ No newline at end of file diff --git a/docker/iron/Dockerfile b/docker/iron/Dockerfile new file mode 100644 index 00000000..be37d179 --- /dev/null +++ b/docker/iron/Dockerfile @@ -0,0 +1,17 @@ +FROM ros:iron-ros-core +LABEL maintainer="Carter Schultz <carterjschultz@gmail.com>" + +# Required by github CI for submodule support +RUN apt update && apt install -y git + +RUN apt update && apt install -y ros-iron-rosbridge-suite + +# Curl required to install rust, build-essential required to build quote & proc-macro2 +RUN apt update && apt install -y --fix-missing curl build-essential +# Install latest stable rust +RUN curl https://sh.rustup.rs -sSf | sh -s -- -y + +WORKDIR / +COPY entrypoint.sh . +RUN chmod +x entrypoint.sh +ENTRYPOINT ["/entrypoint.sh"] diff --git a/docker/iron/README.md b/docker/iron/README.md new file mode 100644 index 00000000..f15bd5c8 --- /dev/null +++ b/docker/iron/README.md @@ -0,0 +1,10 @@ +# Iron CI Docker Image +Produces a docker image including both iron rosbridge and needed rust. + +# Building / Publishing (currently only carter has access, need to fix) +- docker build -t carter12s/roslibrust-ci-iron:latest . +- Maybe needed: docker login +- docker push carter12s/roslibrust-ci-iron:latest + +For debug: +docker run -it carter12s/roslibrust-ci-iron /bin/bash diff --git a/docker/iron/entrypoint.sh b/docker/iron/entrypoint.sh new file mode 100644 index 00000000..9d8ad4d3 --- /dev/null +++ b/docker/iron/entrypoint.sh @@ -0,0 +1,2 @@ +#!/bin/bash +exec "$@" diff --git a/docker/iron_compose.yaml b/docker/iron_compose.yaml new file mode 100644 index 00000000..45eb17ce --- /dev/null +++ b/docker/iron_compose.yaml @@ -0,0 +1,7 @@ +version: "3.9" +services: + rosbridge: + image: carter12s/roslibrust-ci-iron:latest + ports: + - "9090:9090" + command: bash -c "source /opt/ros/humble/setup.bash; ros2 launch rosbridge_server rosbridge_websocket_launch.xml & disown; ros2 run rosapi rosapi_node --ros-args --log-level debug" \ No newline at end of file diff --git a/docker/noetic/Dockerfile b/docker/noetic/Dockerfile index c2ff582c..b570c5c9 100644 --- a/docker/noetic/Dockerfile +++ b/docker/noetic/Dockerfile @@ -9,10 +9,6 @@ RUN apt update && apt install -y --fix-missing curl build-essential # Install latest stable rust RUN curl https://sh.rustup.rs -sSf | sh -s -- -y -# Update Cargo Registry -# See this for why we do it so wierd https://github.com/rust-lang/cargo/issues/3377 -RUN /root/.cargo/bin/cargo install lazy_static; exit 0 - WORKDIR / COPY entrypoint.sh . RUN chmod +x entrypoint.sh diff --git a/docker/noetic_cpp/Dockerfile b/docker/noetic_cpp/Dockerfile index 24b69cbc..00212510 100644 --- a/docker/noetic_cpp/Dockerfile +++ b/docker/noetic_cpp/Dockerfile @@ -16,10 +16,6 @@ RUN apt update && apt install -y ros-noetic-ros-base # Install latest stable rust RUN curl https://sh.rustup.rs -sSf | sh -s -- -y -# Update Cargo Registry -# See this for why we do it so weird https://github.com/rust-lang/cargo/issues/3377 -RUN /root/.cargo/bin/cargo install lazy_static; exit 0 - WORKDIR / COPY entrypoint.sh . RUN chmod +x entrypoint.sh diff --git a/roslibrust/Cargo.toml b/roslibrust/Cargo.toml index 503a1ac5..73c7d184 100644 --- a/roslibrust/Cargo.toml +++ b/roslibrust/Cargo.toml @@ -54,10 +54,10 @@ serde-big-array = "0.5" [features] default = [] -# Provides a rosapi rust interface -rosapi = [] # Note: all does not include running_bridge as that is only intended for CI all = [] +# Provides a rosapi rust interface +rosapi = [] # Intended for use with tests, includes tests that rely on a locally running rosbridge running_bridge = [] # For use with integration tests, indicating we are testing integration with a ros1 bridge diff --git a/roslibrust/examples/native_ros1.rs b/roslibrust/examples/native_ros1.rs index 3f454677..5e4e3673 100644 --- a/roslibrust/examples/native_ros1.rs +++ b/roslibrust/examples/native_ros1.rs @@ -4,8 +4,8 @@ #[cfg(feature = "ros1")] #[tokio::main] -async fn main() -> Result<(), Box<dyn std::error::Error>> { - use roslibrust::NodeHandle; +async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { + use roslibrust::ros1::NodeHandle; simple_logger::SimpleLogger::new() .with_level(log::LevelFilter::Debug) diff --git a/roslibrust/examples/ros1_listener.rs b/roslibrust/examples/ros1_listener.rs index c0983a40..cdbc6826 100644 --- a/roslibrust/examples/ros1_listener.rs +++ b/roslibrust/examples/ros1_listener.rs @@ -2,8 +2,8 @@ roslibrust_codegen_macro::find_and_generate_ros_messages!("assets/ros1_common_in #[cfg(feature = "ros1")] #[tokio::main] -async fn main() -> Result<(), Box<dyn std::error::Error>> { - use roslibrust::NodeHandle; +async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { + use roslibrust::ros1::NodeHandle; simple_logger::SimpleLogger::new() .with_level(log::LevelFilter::Debug) diff --git a/roslibrust/examples/ros1_talker.rs b/roslibrust/examples/ros1_talker.rs index 2671b068..934a19cf 100644 --- a/roslibrust/examples/ros1_talker.rs +++ b/roslibrust/examples/ros1_talker.rs @@ -2,8 +2,8 @@ roslibrust_codegen_macro::find_and_generate_ros_messages!("assets/ros1_common_in #[cfg(feature = "ros1")] #[tokio::main] -async fn main() -> Result<(), Box<dyn std::error::Error>> { - use roslibrust::NodeHandle; +async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { + use roslibrust::ros1::NodeHandle; simple_logger::SimpleLogger::new() .with_level(log::LevelFilter::Debug) diff --git a/roslibrust/src/lib.rs b/roslibrust/src/lib.rs index 8932461d..e3044431 100644 --- a/roslibrust/src/lib.rs +++ b/roslibrust/src/lib.rs @@ -105,6 +105,33 @@ pub use rosbridge::*; pub mod rosapi; #[cfg(feature = "ros1")] -mod ros1; -#[cfg(feature = "ros1")] -pub use ros1::*; +pub mod ros1; + +/// For now starting with a central error type, may break this up more in future +#[derive(thiserror::Error, Debug)] +pub enum RosLibRustError { + #[error("Not currently connected to ros master / bridge")] + Disconnected, + // TODO we probably want to eliminate tungstenite from this and hide our + // underlying websocket implementation from the API + // currently we "technically" break the API when we change tungstenite verisons + #[error("Websocket communication error: {0}")] + CommFailure(#[from] tokio_tungstenite::tungstenite::Error), + #[error("Operation timed out: {0}")] + Timeout(#[from] tokio::time::error::Elapsed), + #[error("Failed to parse message from JSON: {0}")] + InvalidMessage(#[from] serde_json::Error), + #[error("Rosbridge server reported an error: {0}")] + ServerError(String), + #[error("Name does not meet ROS requirements: {0}")] + InvalidName(String), + // Generic catch-all error type for not-yet-handled errors + // TODO ultimately this type will be removed from API of library + #[error(transparent)] + Unexpected(#[from] anyhow::Error), +} + +/// Generic result type used as standard throughout library. +/// Note: many functions which currently return this will be updated to provide specific error +/// types in the future instead of the generic error here. +pub type RosLibRustResult<T> = Result<T, RosLibRustError>; diff --git a/roslibrust/src/ros1/master_client.rs b/roslibrust/src/ros1/master_client.rs index 03de74d0..4a015880 100644 --- a/roslibrust/src/ros1/master_client.rs +++ b/roslibrust/src/ros1/master_client.rs @@ -254,6 +254,7 @@ impl MasterClient { self.client_uri.clone().into(), ], )?; + log::trace!("Posting {body:?} to register publisher"); self.post(body).await } @@ -360,7 +361,7 @@ impl MasterClient { #[cfg(test)] mod test { - use crate::{MasterClient, RosMasterError}; + use super::{MasterClient, RosMasterError}; const TEST_NODE_ID: &str = "/native_ros1_test"; diff --git a/roslibrust/src/ros1/mod.rs b/roslibrust/src/ros1/mod.rs index 3c15c71d..2ae9fdb5 100644 --- a/roslibrust/src/ros1/mod.rs +++ b/roslibrust/src/ros1/mod.rs @@ -4,10 +4,6 @@ mod master_client; pub use master_client::*; -/// [xmlrpc_server] module contains the xmlrpc server that a node must host -mod xmlrpc_server; -pub(crate) use xmlrpc_server::*; - mod names; /// [node] module contains the central Node and NodeHandle APIs @@ -15,5 +11,7 @@ mod node; pub use node::*; mod publisher; +pub use publisher::Publisher; mod subscriber; +pub use subscriber::Subscriber; mod tcpros; diff --git a/roslibrust/src/ros1/names.rs b/roslibrust/src/ros1/names.rs index ebf9a120..d61a3d93 100644 --- a/roslibrust/src/ros1/names.rs +++ b/roslibrust/src/ros1/names.rs @@ -1,3 +1,6 @@ +use crate::{RosLibRustError, RosLibRustResult}; +use std::fmt::Display; + lazy_static::lazy_static! { static ref GRAPH_NAME_REGEX: regex::Regex = regex::Regex::new(r"^([/~a-zA-Z]){1}([a-zA-Z0-9_/])*([A-z0-9_])$").unwrap(); } @@ -8,12 +11,12 @@ pub struct Name { } impl Name { - pub fn new(name: impl Into<String>) -> Option<Self> { + pub fn new(name: impl Into<String>) -> RosLibRustResult<Self> { let name: String = name.into(); if is_valid(&name) { - Some(Self { inner: name }) + Ok(Self { inner: name }) } else { - None + Err(RosLibRustError::InvalidName(name)) } } @@ -54,6 +57,12 @@ fn is_valid(name: &str) -> bool { GRAPH_NAME_REGEX.is_match(name) } +impl Display for Name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.inner.fmt(f) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/roslibrust/src/ros1/node.rs b/roslibrust/src/ros1/node/actor.rs similarity index 65% rename from roslibrust/src/ros1/node.rs rename to roslibrust/src/ros1/node/actor.rs index 04c00d4a..c72258a3 100644 --- a/roslibrust/src/ros1/node.rs +++ b/roslibrust/src/ros1/node/actor.rs @@ -1,24 +1,19 @@ -//! This module contains the top level Node and NodeHandle classes. -//! These wrap the lower level management of a ROS Node connection into a higher level and thread safe API. - -use super::{ - names::Name, - publisher::{Publication, Publisher}, - subscriber::{Subscriber, Subscription}, +use super::ProtocolParams; +use crate::{ + ros1::{ + names::Name, + node::{XmlRpcServer, XmlRpcServerHandle}, + publisher::Publication, + subscriber::Subscription, + MasterClient, + }, + ServiceCallback, }; -use crate::{MasterClient, RosMasterError, ServiceCallback, XmlRpcServer, XmlRpcServerHandle}; -use dashmap::DashMap; +use abort_on_drop::ChildTask; use roslibrust_codegen::RosMessageType; -use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs}; +use std::{collections::HashMap, net::Ipv4Addr, sync::Arc}; use tokio::sync::{broadcast, mpsc, oneshot}; -#[derive(Debug)] -pub struct ProtocolParams { - pub hostname: String, - pub protocol: String, - pub port: u16, -} - #[derive(Debug)] pub enum NodeMsg { GetMasterUri { @@ -64,7 +59,11 @@ pub enum NodeMsg { #[derive(Clone)] pub(crate) struct NodeServerHandle { - node_server_sender: mpsc::UnboundedSender<NodeMsg>, + pub(crate) node_server_sender: mpsc::UnboundedSender<NodeMsg>, + // If this handle should keep the underlying node task alive it will hold an + // Arc to the underlying node task. This is an option because internal handles + // within the node shouldn't keep it alive (e.g. what we hand to xml server) + _node_task: Option<Arc<ChildTask<()>>>, } impl NodeServerHandle { @@ -80,7 +79,7 @@ impl NodeServerHandle { } } - pub async fn get_client_uri(&self) -> Result<String, Box<dyn std::error::Error>> { + pub async fn get_client_uri(&self) -> Result<String, Box<dyn std::error::Error + Send + Sync>> { let (sender, receiver) = oneshot::channel(); match self .node_server_sender @@ -144,22 +143,20 @@ impl NodeServerHandle { pub async fn register_publisher<T: RosMessageType>( &self, topic: &str, - topic_type: &str, queue_size: usize, - ) -> Result<mpsc::Sender<Vec<u8>>, Box<dyn std::error::Error>> { + ) -> Result<mpsc::Sender<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>> { let (sender, receiver) = oneshot::channel(); match self.node_server_sender.send(NodeMsg::RegisterPublisher { reply: sender, topic: topic.to_owned(), - topic_type: topic_type.to_owned(), + topic_type: T::ROS_TYPE_NAME.to_owned(), queue_size, msg_definition: T::DEFINITION.to_owned(), md5sum: T::MD5SUM.to_owned(), }) { Ok(()) => { let received = receiver.await.map_err(|err| Box::new(err))?; - Ok(received.map_err(|err| { - log::error!("Failed to register publisher: {err}"); + Ok(received.map_err(|_err| { Box::new(std::io::Error::from(std::io::ErrorKind::ConnectionAborted)) })?) } @@ -171,7 +168,7 @@ impl NodeServerHandle { &self, topic: &str, queue_size: usize, - ) -> Result<broadcast::Receiver<Vec<u8>>, Box<dyn std::error::Error>> { + ) -> Result<broadcast::Receiver<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>> { let (sender, receiver) = oneshot::channel(); match self.node_server_sender.send(NodeMsg::RegisterSubscriber { reply: sender, @@ -221,7 +218,7 @@ impl NodeServerHandle { /// Represents a single "real" node, typically only one of these is expected per process /// but nothing should specifically prevent that. -pub struct Node { +pub(crate) struct Node { // The xmlrpc client this node uses to make requests to master client: MasterClient, // Server which handles updates from the rosmaster and other ROS nodes @@ -229,11 +226,11 @@ pub struct Node { // Receiver for requests to the Node actor node_msg_rx: mpsc::UnboundedReceiver<NodeMsg>, // Map of topic names to the publishing channels associated with the topic - publishers: DashMap<String, Publication>, + publishers: HashMap<String, Publication>, // Record of subscriptions this node has - subscriptions: DashMap<String, Subscription>, + subscriptions: HashMap<String, Subscription>, // Record of what services this node is serving - services: DashMap<String, ServiceCallback>, + services: HashMap<String, ServiceCallback>, // TODO need signal to shutdown xmlrpc server when node is dropped host_addr: Ipv4Addr, hostname: String, @@ -241,57 +238,61 @@ pub struct Node { } impl Node { - async fn new( + pub(crate) async fn new( master_uri: &str, hostname: &str, node_name: &str, addr: Ipv4Addr, - ) -> Result<NodeServerHandle, Box<dyn std::error::Error>> { + ) -> Result<NodeServerHandle, Box<dyn std::error::Error + Send + Sync>> { let (node_sender, node_receiver) = mpsc::unbounded_channel(); - let node_server_handle = NodeServerHandle { - node_server_sender: node_sender, + let xml_server_handle = NodeServerHandle { + node_server_sender: node_sender.clone(), + // None here because this handle should not keep task alive + _node_task: None, }; // Create our xmlrpc server and bind our socket so we know our port and can determine our local URI - let xmlrpc_server = XmlRpcServer::new(addr, node_server_handle.clone()); + let xmlrpc_server = XmlRpcServer::new(addr, xml_server_handle)?; let client_uri = format!("http://{hostname}:{}", xmlrpc_server.port()); - if let None = Name::new(node_name) { - log::error!("Node name {node_name} is not valid"); - return Err(Box::new(std::io::Error::from( - std::io::ErrorKind::InvalidInput, - ))); - } + let _ = Name::new(node_name)?; let rosmaster_client = MasterClient::new(master_uri, client_uri, node_name).await?; let mut node = Self { client: rosmaster_client, _xmlrpc_server: xmlrpc_server, node_msg_rx: node_receiver, - publishers: DashMap::new(), - subscriptions: DashMap::new(), - services: DashMap::new(), + publishers: std::collections::HashMap::new(), + subscriptions: std::collections::HashMap::new(), + services: std::collections::HashMap::new(), host_addr: addr, hostname: hostname.to_owned(), node_name: node_name.to_owned(), }; - tokio::spawn(async move { - loop { - match node.node_msg_rx.recv().await { - Some(NodeMsg::Shutdown) => { - log::info!("Shutdown requested, shutting down node"); - break; - } - Some(node_msg) => { - node.handle_msg(node_msg).await; - } - None => { - break; + let t = Arc::new( + tokio::spawn(async move { + loop { + match node.node_msg_rx.recv().await { + Some(NodeMsg::Shutdown) => { + log::info!("Shutdown requested, shutting down node"); + break; + } + Some(node_msg) => { + node.handle_msg(node_msg).await; + } + None => { + break; + } } } - } - }); + }) + .into(), + ); + let node_server_handle = NodeServerHandle { + node_server_sender: node_sender, + _node_task: Some(t), + }; Ok(node_server_handle) } @@ -307,7 +308,9 @@ impl Node { let _ = reply.send( self.subscriptions .iter() - .map(|entry| (entry.key().clone(), entry.topic_type().to_owned())) + .map(|(topic_name, subscription)| { + (topic_name.clone(), subscription.topic_type().to_owned()) + }) .collect(), ); } @@ -315,12 +318,12 @@ impl Node { let _ = reply.send( self.publishers .iter() - .map(|entry| (entry.key().clone(), entry.topic_type().to_owned())) + .map(|(key, entry)| (key.clone(), entry.topic_type().to_owned())) .collect(), ); } NodeMsg::SetPeerPublishers { topic, publishers } => { - if let Some(mut subscription) = self.subscriptions.get_mut(&topic) { + if let Some(subscription) = self.subscriptions.get_mut(&topic) { for publisher_uri in publishers { if let Err(err) = subscription.add_publisher_source(&publisher_uri).await { log::error!( @@ -342,10 +345,10 @@ impl Node { msg_definition, md5sum, } => { - match self + let res = self .register_publisher(topic, &topic_type, queue_size, msg_definition, md5sum) - .await - { + .await; + match res { Ok(handle) => reply.send(Ok(handle)), Err(err) => reply.send(Err(err.to_string())), } @@ -383,10 +386,8 @@ impl Node { .find(|proto| proto.as_str() == "TCPROS") .is_some() { - if let Some(publishing_channel) = self - .publishers - .iter() - .find(|publisher| publisher.key() == &topic) + if let Some((_key, publishing_channel)) = + self.publishers.iter().find(|(key, _pub)| *key == &topic) { let protocol_params = ProtocolParams { hostname: self.hostname.clone(), @@ -421,12 +422,8 @@ impl Node { msg_definition: &str, md5sum: &str, ) -> Result<broadcast::Receiver<Vec<u8>>, Box<dyn std::error::Error>> { - match self - .subscriptions - .iter() - .find(|subscription| subscription.key() == topic) - { - Some(subscription) => Ok(subscription.get_receiver()), + match self.subscriptions.iter().find(|(key, _)| *key == topic) { + Some((_topic, subscription)) => Ok(subscription.get_receiver()), None => { let mut subscription = Subscription::new( &self.node_name, @@ -457,19 +454,23 @@ impl Node { msg_definition: String, md5sum: String, ) -> Result<mpsc::Sender<Vec<u8>>, Box<dyn std::error::Error>> { - if let Some(handle) = self.publishers.iter().find_map(|entry| { - if entry.key().as_str() == &topic { - if entry.topic_type() == topic_type { - Some(Ok(entry.get_sender())) + let existing_entry = { + self.publishers.iter().find_map(|(key, value)| { + if key.as_str() == &topic { + if value.topic_type() == topic_type { + Some(Ok(value.get_sender())) + } else { + Some(Err(Box::new(std::io::Error::from( + std::io::ErrorKind::AddrInUse, + )))) + } } else { - Some(Err(Box::new(std::io::Error::from( - std::io::ErrorKind::AddrInUse, - )))) + None } - } else { - None - } - }) { + }) + }; + + if let Some(handle) = existing_entry { Ok(handle?) } else { let channel = Publication::new( @@ -489,119 +490,8 @@ impl Node { })?; let handle = channel.get_sender(); self.publishers.insert(topic.clone(), channel); - - let _current_subscribers = self.client.register_publisher(topic, topic_type).await?; + let _current_subscribers = self.client.register_publisher(&topic, topic_type).await?; Ok(handle) } } } - -/// Represents a handle to an underlying [Node]. NodeHandle's can be freely cloned, moved, copied, etc. -/// This class provides the user facing API for interacting with ROS. -#[derive(Clone)] -pub struct NodeHandle { - inner: NodeServerHandle, -} - -impl NodeHandle { - // TODO builder, async, result, better error type - /// Creates a new node connect and returns a handle to it - /// It is idiomatic to call this once per process and treat the created node as singleton. - /// The returned handle can be freely clone'd to create additional handles without creating additional connections. - pub async fn new( - master_uri: &str, - name: &str, - ) -> Result<NodeHandle, Box<dyn std::error::Error>> { - // Follow ROS rules and determine our IP and hostname - let (addr, hostname) = determine_addr()?; - - let node = Node::new(master_uri, &hostname, name, addr).await?; - let nh = NodeHandle { inner: node }; - - // TODO spawn our TcpManager here for TCPROS - - Ok(nh) - } - - pub fn is_ok(&self) -> bool { - !self.inner.node_server_sender.is_closed() - } - - pub async fn get_client_uri(&self) -> Result<String, Box<dyn std::error::Error>> { - self.inner.get_client_uri().await - } - - pub async fn advertise<T: roslibrust_codegen::RosMessageType>( - &self, - topic_name: &str, - queue_size: usize, - ) -> Result<Publisher<T>, Box<dyn std::error::Error>> { - let sender = self - .inner - .register_publisher::<T>(topic_name, T::ROS_TYPE_NAME, queue_size) - .await?; - Ok(Publisher::new(topic_name, sender)) - } - - pub async fn subscribe<T: roslibrust_codegen::RosMessageType>( - &self, - topic_name: &str, - queue_size: usize, - ) -> Result<Subscriber<T>, Box<dyn std::error::Error>> { - let receiver = self - .inner - .register_subscriber::<T>(topic_name, queue_size) - .await?; - Ok(Subscriber::new(receiver)) - } -} - -// TODO at the end of the day I'd like to offer a builder pattern for configuration that allow manual setting of this or "ros idiomatic" behavior - Carter -/// Following ROS's idiomatic address rules uses ROS_HOSTNAME and ROS_IP to determine the address that server should be hosted at. -/// Returns both the resolved IpAddress of the host (used for actually opening the socket), and the String "hostname" which should -/// be used in the URI. -fn determine_addr() -> Result<(Ipv4Addr, String), RosMasterError> { - // If ROS_IP is set that trumps anything else - if let Ok(ip_str) = std::env::var("ROS_IP") { - let ip = ip_str.parse().map_err(|e| { - RosMasterError::HostIpResolutionFailure(format!( - "ROS_IP environment variable did not parse to a valid IpAddr::V4: {e:?}" - )) - })?; - return Ok((ip, ip_str)); - } - // If ROS_HOSTNAME is set that is next highest precedent - if let Ok(name) = std::env::var("ROS_HOSTNAME") { - let ip = hostname_to_ipv4(&name)?; - return Ok((ip, name)); - } - // If neither env var is set, use the computers "hostname" - let name = gethostname::gethostname(); - let name = name.into_string().map_err(|e| { - RosMasterError::HostIpResolutionFailure(format!("This host's hostname is a string that cannot be validly converted into a Rust type, and therefore we cannot convert it into an IpAddrv4: {e:?}")) - })?; - let ip = hostname_to_ipv4(&name)?; - return Ok((ip, name)); -} - -/// Given a the name of a host use's std::net::ToSocketAddrs to perform a DNS lookup and return the resulting IP address. -/// This function is intended to be used to determine the correct IP host the socket for the xmlrpc server on. -fn hostname_to_ipv4(name: &str) -> Result<Ipv4Addr, RosMasterError> { - let mut i = (name, 0).to_socket_addrs().map_err(|e| { - RosMasterError::HostIpResolutionFailure(format!( - "Failure while attempting to lookup ROS_HOSTNAME: {e:?}" - )) - })?; - if let Some(addr) = i.next() { - match addr.ip() { - IpAddr::V4(ip) => Ok(ip), - IpAddr::V6(ip) => { - Err(RosMasterError::HostIpResolutionFailure(format!("ROS_HOSTNAME resolved to an IPv6 address which is not support by ROS/roslibrust: {ip:?}"))) - } - } - } else { - Err(RosMasterError::HostIpResolutionFailure(format!( - "ROS_HOSTNAME did not resolve any address: {name:?}" - ))) - } -} diff --git a/roslibrust/src/ros1/node/handle.rs b/roslibrust/src/ros1/node/handle.rs new file mode 100644 index 00000000..75888ec8 --- /dev/null +++ b/roslibrust/src/ros1/node/handle.rs @@ -0,0 +1,60 @@ +use super::actor::{Node, NodeServerHandle}; +use crate::ros1::{publisher::Publisher, subscriber::Subscriber}; + +/// Represents a handle to an underlying [Node]. NodeHandle's can be freely cloned, moved, copied, etc. +/// This class provides the user facing API for interacting with ROS. +#[derive(Clone)] +pub struct NodeHandle { + inner: NodeServerHandle, +} + +impl NodeHandle { + // TODO builder, result, better error type + /// Creates a new node connect and returns a handle to it + /// It is idiomatic to call this once per process and treat the created node as singleton. + /// The returned handle can be freely clone'd to create additional handles without creating additional connections. + pub async fn new( + master_uri: &str, + name: &str, + ) -> Result<NodeHandle, Box<dyn std::error::Error + Send + Sync>> { + // Follow ROS rules and determine our IP and hostname + let (addr, hostname) = super::determine_addr().await?; + + let node = Node::new(master_uri, &hostname, name, addr).await?; + let nh = NodeHandle { inner: node }; + + Ok(nh) + } + + pub fn is_ok(&self) -> bool { + !self.inner.node_server_sender.is_closed() + } + + pub async fn get_client_uri(&self) -> Result<String, Box<dyn std::error::Error + Send + Sync>> { + self.inner.get_client_uri().await + } + + pub async fn advertise<T: roslibrust_codegen::RosMessageType>( + &self, + topic_name: &str, + queue_size: usize, + ) -> Result<Publisher<T>, Box<dyn std::error::Error + Send + Sync>> { + let sender = self + .inner + .register_publisher::<T>(topic_name, queue_size) + .await?; + Ok(Publisher::new(topic_name, sender)) + } + + pub async fn subscribe<T: roslibrust_codegen::RosMessageType>( + &self, + topic_name: &str, + queue_size: usize, + ) -> Result<Subscriber<T>, Box<dyn std::error::Error + Send + Sync>> { + let receiver = self + .inner + .register_subscriber::<T>(topic_name, queue_size) + .await?; + Ok(Subscriber::new(receiver)) + } +} diff --git a/roslibrust/src/ros1/node/mod.rs b/roslibrust/src/ros1/node/mod.rs new file mode 100644 index 00000000..1a167764 --- /dev/null +++ b/roslibrust/src/ros1/node/mod.rs @@ -0,0 +1,70 @@ +//! This module contains the top level Node and NodeHandle classes. +//! These wrap the lower level management of a ROS Node connection into a higher level and thread safe API. + +use super::RosMasterError; +use std::net::{IpAddr, Ipv4Addr}; + +mod actor; +mod handle; +mod xmlrpc; +use actor::*; +pub use handle::NodeHandle; +use xmlrpc::*; + +#[derive(Debug)] +pub struct ProtocolParams { + pub hostname: String, + pub protocol: String, + pub port: u16, +} + +// TODO at the end of the day I'd like to offer a builder pattern for configuration that allow manual setting of this or "ros idiomatic" behavior - Carter +/// Following ROS's idiomatic address rules uses ROS_HOSTNAME and ROS_IP to determine the address that server should be hosted at. +/// Returns both the resolved IpAddress of the host (used for actually opening the socket), and the String "hostname" which should +/// be used in the URI. +async fn determine_addr() -> Result<(Ipv4Addr, String), RosMasterError> { + // If ROS_IP is set that trumps anything else + if let Ok(ip_str) = std::env::var("ROS_IP") { + let ip = ip_str.parse().map_err(|e| { + RosMasterError::HostIpResolutionFailure(format!( + "ROS_IP environment variable did not parse to a valid IpAddr::V4: {e:?}" + )) + })?; + return Ok((ip, ip_str)); + } + // If ROS_HOSTNAME is set that is next highest precedent + if let Ok(name) = std::env::var("ROS_HOSTNAME") { + let ip = hostname_to_ipv4(&name).await?; + return Ok((ip, name)); + } + // If neither env var is set, use the computers "hostname" + let name = gethostname::gethostname(); + let name = name.into_string().map_err(|e| { + RosMasterError::HostIpResolutionFailure(format!("This host's hostname is a string that cannot be validly converted into a Rust type, and therefore we cannot convert it into an IpAddrv4: {e:?}")) + })?; + let ip = hostname_to_ipv4(&name).await?; + return Ok((ip, name)); +} + +/// Given a the name of a host use's std::net::ToSocketAddrs to perform a DNS lookup and return the resulting IP address. +/// This function is intended to be used to determine the correct IP host the socket for the xmlrpc server on. +async fn hostname_to_ipv4(name: &str) -> Result<Ipv4Addr, RosMasterError> { + let name_with_port = &format!("{name}:0"); + let mut i = tokio::net::lookup_host(name_with_port).await.map_err(|e| { + RosMasterError::HostIpResolutionFailure(format!( + "Failure while attempting to lookup ROS_HOSTNAME: {e:?}" + )) + })?; + if let Some(addr) = i.next() { + match addr.ip() { + IpAddr::V4(ip) => Ok(ip), + IpAddr::V6(ip) => { + Err(RosMasterError::HostIpResolutionFailure(format!("ROS_HOSTNAME resolved to an IPv6 address which is not support by ROS/roslibrust: {ip:?}"))) + } + } + } else { + Err(RosMasterError::HostIpResolutionFailure(format!( + "ROS_HOSTNAME did not resolve any address: {name:?}" + ))) + } +} diff --git a/roslibrust/src/ros1/xmlrpc_server.rs b/roslibrust/src/ros1/node/xmlrpc.rs similarity index 97% rename from roslibrust/src/ros1/xmlrpc_server.rs rename to roslibrust/src/ros1/node/xmlrpc.rs index 1ebc7d95..58015494 100644 --- a/roslibrust/src/ros1/xmlrpc_server.rs +++ b/roslibrust/src/ros1/node/xmlrpc.rs @@ -1,4 +1,4 @@ -use super::node::NodeServerHandle; +use super::NodeServerHandle; use abort_on_drop::ChildTask; use hyper::{Body, Response, StatusCode}; use log::*; @@ -44,7 +44,10 @@ impl XmlRpcServerHandle { } impl XmlRpcServer { - pub fn new(host_addr: Ipv4Addr, node_server: NodeServerHandle) -> XmlRpcServerHandle { + pub fn new( + host_addr: Ipv4Addr, + node_server: NodeServerHandle, + ) -> Result<XmlRpcServerHandle, Box<dyn std::error::Error + Send + Sync>> { let make_svc = hyper::service::make_service_fn(move |connection| { debug!("New node xmlrpc connection {connection:?}"); let node_server = node_server.clone(); @@ -55,7 +58,7 @@ impl XmlRpcServer { } }); let host_addr = SocketAddr::from((host_addr, 0)); - let server = hyper::server::Server::bind(&host_addr.into()); + let server = hyper::server::Server::try_bind(&host_addr.into())?; let server = server.serve(make_svc); let addr = server.local_addr(); @@ -65,10 +68,10 @@ impl XmlRpcServer { } }); - XmlRpcServerHandle { + Ok(XmlRpcServerHandle { port: addr.port(), _handle: handle.into(), - } + }) } // Our actual service handler with our error type diff --git a/roslibrust/src/ros1/publisher.rs b/roslibrust/src/ros1/publisher.rs index 0a630104..b25abcca 100644 --- a/roslibrust/src/ros1/publisher.rs +++ b/roslibrust/src/ros1/publisher.rs @@ -1,4 +1,4 @@ -use super::tcpros::ConnectionHeader; +use crate::{ros1::tcpros::ConnectionHeader, RosLibRustError}; use abort_on_drop::ChildTask; use roslibrust_codegen::RosMessageType; use std::{ @@ -26,8 +26,10 @@ impl<T: RosMessageType> Publisher<T> { } } - pub async fn publish(&self, data: &T) -> Result<(), Box<dyn std::error::Error>> { - let data = serde_rosmsg::to_vec(&data)?; + pub async fn publish(&self, data: &T) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { + let data = serde_rosmsg::to_vec(&data) + // Gotta do some funny error mapping here as serde_rosmsg's error type is not sync + .map_err(|e| RosLibRustError::Unexpected(anyhow::anyhow!("{e:?}")))?; self.sender.send(data).await?; log::debug!("Publishing data on topic {}", self.topic_name); Ok(()) diff --git a/roslibrust/src/ros1/subscriber.rs b/roslibrust/src/ros1/subscriber.rs index 7d821a4d..7416dca1 100644 --- a/roslibrust/src/ros1/subscriber.rs +++ b/roslibrust/src/ros1/subscriber.rs @@ -1,4 +1,4 @@ -use super::tcpros::ConnectionHeader; +use crate::ros1::tcpros::ConnectionHeader; use abort_on_drop::ChildTask; use roslibrust_codegen::RosMessageType; use std::{marker::PhantomData, sync::Arc}; diff --git a/roslibrust/src/rosapi/mod.rs b/roslibrust/src/rosapi/mod.rs index 8e198208..a8daf3d0 100644 --- a/roslibrust/src/rosapi/mod.rs +++ b/roslibrust/src/rosapi/mod.rs @@ -6,10 +6,11 @@ use crate::{ClientHandle, RosLibRustResult}; use async_trait::async_trait; -// TODO Fix this... this sucks -roslibrust_codegen_macro::find_and_generate_ros_messages_relative_to_manifest_dir!( - "../assets/ros1_common_interfaces/rosapi" -); +// TODO major issue here for folks who actually try to use rosapi in their project +// This macro isn't going to expand correctly when not used from this crate's workspace +// We almost certainly need to generate and commit the resulting messages, or +// do some include_str!() hax to be able to ship these types with the crate... +roslibrust_codegen_macro::find_and_generate_ros_messages!("assets/ros1_common_interfaces/rosapi"); /// Represents the ability to interact with the interfaces provided by the rosapi node. /// This trait is implemented for ClientHandle when the `rosapi` feature is enabled. diff --git a/roslibrust/src/rosbridge/integration_tests.rs b/roslibrust/src/rosbridge/integration_tests.rs index cf40c5d0..047f52ac 100644 --- a/roslibrust/src/rosbridge/integration_tests.rs +++ b/roslibrust/src/rosbridge/integration_tests.rs @@ -135,9 +135,13 @@ mod integration_tests { // Intentionally a port where there won't be a server at let opts = ClientHandleOptions::new("ws://localhost:9091").timeout(TIMEOUT); assert!(ClientHandle::new_with_options(opts).await.is_err()); + + // This case sometimes actually passed in CI and caused test to fail + // Removed as flaky, but left here for posterity. // Impossibly short to actually work - let opts = ClientHandleOptions::new(LOCAL_WS).timeout(Duration::from_nanos(1)); - assert!(ClientHandle::new_with_options(opts).await.is_err()); + // let opts = ClientHandleOptions::new(LOCAL_WS).timeout(Duration::from_nanos(1)); + // assert!(ClientHandle::new_with_options(opts).await.is_err()); + // Doesn't timeout if given enough time let opts = ClientHandleOptions::new(LOCAL_WS).timeout(TIMEOUT); assert!(ClientHandle::new_with_options(opts).await.is_ok()); @@ -324,10 +328,12 @@ mod integration_tests { ClientHandle::new_with_options(ClientHandleOptions::new(LOCAL_WS).timeout(TIMEOUT)) .await?; - let subscriber = client.subscribe::<std_msgs::Char>("/char_topic").await?; - tokio::time::sleep(TIMEOUT).await; + // Thing for us to figure out, and don't ask me why, but this test is WAY more reliable if you advertise first then subscribe + // Unclear if this is our fault or rosbridge's let publisher = client.advertise("/char_topic").await?; tokio::time::sleep(TIMEOUT).await; + let subscriber = client.subscribe::<std_msgs::Char>("/char_topic").await?; + tokio::time::sleep(TIMEOUT).await; // Note because C++ char != rust char some care has to be taken when converting let x = std_msgs::Char { @@ -349,7 +355,7 @@ mod integration_tests { #[test_log::test(tokio::test)] async fn error_on_non_existent_service() -> TestResult { // This test is designed to catch a specific error raised in issue #88 - // When roslibrust expereiences a server side error, it returns a string instead of our message + // When roslibrust experiences a server side error, it returns a string instead of our message // We are trying to force that here, and ensure we correctly report the error let client = diff --git a/roslibrust/src/rosbridge/mod.rs b/roslibrust/src/rosbridge/mod.rs index 284b9dec..258c8602 100644 --- a/roslibrust/src/rosbridge/mod.rs +++ b/roslibrust/src/rosbridge/mod.rs @@ -30,46 +30,13 @@ mod topic_provider; mod comm; use futures_util::stream::{SplitSink, SplitStream}; -use log::*; use std::collections::HashMap; use tokio::net::TcpStream; use tokio_tungstenite::*; use tungstenite::Message; -/// For now starting with a central error type, may break this up more in future -#[derive(thiserror::Error, Debug)] -pub enum RosLibRustError { - #[error("Not currently connected to ros master / bridge")] - Disconnected, - // TODO we probably want to eliminate tungstenite from this and hide our - // underlying websocket implementation from the API - // currently we "technically" break the API when we change tungstenite verisons - #[error("Websocket communication error: {0}")] - CommFailure(tokio_tungstenite::tungstenite::Error), - #[error("Operation timed out: {0}")] - Timeout(#[from] tokio::time::error::Elapsed), - #[error("Failed to parse message from JSON: {0}")] - InvalidMessage(#[from] serde_json::Error), - #[error("Rosbridge server reported an error: {0}")] - ServerError(String), - // Generic catch-all error type for not-yet-handled errors - // TODO ultimately this type will be removed from API of library - #[error(transparent)] - Unexpected(#[from] anyhow::Error), -} - -/// Provides an implementation tranlating the underlying websocket error into our error type -impl From<tokio_tungstenite::tungstenite::Error> for RosLibRustError { - fn from(e: tokio_tungstenite::tungstenite::Error) -> Self { - // TODO we probably want to expand this type and do some matching here - RosLibRustError::CommFailure(e) - } -} - -/// Generic result type used as standard throughout library. -/// Note: many functions which currently return this will be updated to provide specific error -/// types in the future instead of the generic error here. -pub type RosLibRustResult<T> = Result<T, RosLibRustError>; +// Doing this to maintain backwards compatibilities like `use roslibrust::rosbridge::RosLibRustError` +pub use super::{RosLibRustError, RosLibRustResult}; /// Used for type erasure of message type so that we can store arbitrary handles type Callback = Box<dyn Fn(&str) + Send + Sync>; diff --git a/roslibrust/tests/ros1_xmlrpc.rs b/roslibrust/tests/ros1_xmlrpc.rs index fa06bc37..54837788 100644 --- a/roslibrust/tests/ros1_xmlrpc.rs +++ b/roslibrust/tests/ros1_xmlrpc.rs @@ -1,28 +1,23 @@ #[cfg(all(feature = "ros1", feature = "ros1_test"))] mod tests { + use roslibrust::ros1::NodeHandle; use roslibrust_codegen::RosMessageType; use serde::de::DeserializeOwned; use serde_xmlrpc::Value; - use tokio::time::timeout; - const TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_millis(500); - - roslibrust_codegen_macro::find_and_generate_ros_messages_relative_to_manifest_dir!( - "../assets/ros1_common_interfaces" - ); + roslibrust_codegen_macro::find_and_generate_ros_messages!("assets/ros1_common_interfaces"); async fn call_node_api_raw(uri: &str, endpoint: &str, args: Vec<Value>) -> String { let client = reqwest::Client::new(); let body = serde_xmlrpc::request_to_string(endpoint, args).unwrap(); - let response = timeout(TIMEOUT, client.post(uri).body(body).send()) - .await - .unwrap() - .unwrap(); - timeout(TIMEOUT, response.text()).await.unwrap().unwrap() + let response = client.post(uri).body(body).send().await.unwrap(); + response.text().await.unwrap() } async fn call_node_api<T: DeserializeOwned>(uri: &str, endpoint: &str, args: Vec<Value>) -> T { + log::debug!("Start call api: {uri:?}, {endpoint:?}, {args:?}"); // Note: don't need timeout here as all operations in side call_node_api_raw are timeout()'d let response = call_node_api_raw(uri, endpoint, args).await; + log::debug!("Got api raw response"); let (error_code, error_description, value): (i8, String, T) = serde_xmlrpc::response_from_str(&response).unwrap(); @@ -31,19 +26,13 @@ mod tests { value } - #[tokio::test] - async fn verify_get_master_uri() { - let node = timeout( - TIMEOUT, - roslibrust::NodeHandle::new("http://localhost:11311", "verify_get_master_uri"), - ) - .await - .unwrap() - .unwrap(); - let node_uri = timeout(TIMEOUT, node.get_client_uri()) - .await - .unwrap() - .unwrap(); + #[test_log::test(tokio::test)] + async fn verify_get_master_uri() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { + let node = NodeHandle::new("http://localhost:11311", "verify_get_master_uri").await?; + log::info!("Got new handle"); + + let node_uri = node.get_client_uri().await?; + log::info!("Got uri"); // Note: doesn't need timeout as it is internally timeout()'d let master_uri = call_node_api::<String>( @@ -52,23 +41,18 @@ mod tests { vec!["/get_master_uri_test".into()], ) .await; + log::info!("Got master"); assert_eq!(master_uri, "http://localhost:11311"); + Ok(()) } - #[tokio::test] - async fn verify_get_publications() { - let node = timeout( - TIMEOUT, - roslibrust::NodeHandle::new("http://localhost:11311", "verify_get_publications"), - ) - .await - .unwrap() - .unwrap(); + #[test_log::test(tokio::test)] + async fn verify_get_publications() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { + let node = NodeHandle::new("http://localhost:11311", "verify_get_publications").await?; + log::info!("Got new handle"); - let node_uri = timeout(TIMEOUT, node.get_client_uri()) - .await - .unwrap() - .unwrap(); + let node_uri = node.get_client_uri().await?; + log::info!("Got uri"); // Note: timeout not needed as it is internally timeout'd let publications = call_node_api::<Vec<(String, String)>>( @@ -78,14 +62,10 @@ mod tests { ) .await; assert_eq!(publications.len(), 0); + log::info!("Got publications"); - let _publisher = timeout( - TIMEOUT, - node.advertise::<std_msgs::String>("/test_topic", 1), - ) - .await - .unwrap() - .unwrap(); + let _publisher = node.advertise::<std_msgs::String>("/test_topic", 1).await?; + log::info!("advertised"); // Note: internally timeout()'d let publications = call_node_api::<Vec<(String, String)>>( @@ -94,26 +74,24 @@ mod tests { vec!["/verify_get_publications".into()], ) .await; + log::info!("Got post advertise publications"); + assert_eq!(publications.len(), 1); let (topic, topic_type) = publications.iter().nth(0).unwrap(); assert_eq!(topic, "/test_topic"); assert_eq!(topic_type, std_msgs::String::ROS_TYPE_NAME); + Ok(()) } - #[tokio::test] + #[test_log::test(tokio::test)] async fn verify_shutdown() { - let node = timeout( - TIMEOUT, - roslibrust::NodeHandle::new("http://localhost:11311", "verify_shutdown"), - ) - .await - .unwrap() - .unwrap(); - let node_uri = timeout(TIMEOUT, node.get_client_uri()) + let node = NodeHandle::new("http://localhost:11311", "verify_shutdown") .await - .unwrap() .unwrap(); + log::info!("Got handle"); + let node_uri = node.get_client_uri().await.unwrap(); assert!(node.is_ok()); + log::info!("Got uri"); // Note: internally timeout()'d call_node_api::<i32>( @@ -122,31 +100,27 @@ mod tests { vec!["/verify_shutdown".into(), "".into()], ) .await; + log::info!("shutdown?"); + + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; assert!(!node.is_ok()); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn verify_request_topic() { - let node = timeout( - TIMEOUT, - roslibrust::NodeHandle::new("http://localhost:11311", "verify_request_topic"), - ) - .await - .unwrap() - .unwrap(); - let node_uri = timeout(TIMEOUT, node.get_client_uri()) + let node = NodeHandle::new("http://localhost:11311", "verify_request_topic") .await - .unwrap() .unwrap(); + log::info!("Got handle"); + let node_uri = node.get_client_uri().await.unwrap(); + log::info!("Got uri {node_uri:?}"); - let _publisher = timeout( - TIMEOUT, - node.advertise::<std_msgs::String>("/test_topic", 1), - ) - .await - .unwrap() - .unwrap(); + let _publisher = node + .advertise::<std_msgs::String>("/test_topic", 1) + .await + .unwrap(); + log::info!("Got publisher"); // Note: internally timeout()'d let response = call_node_api_raw( @@ -159,6 +133,7 @@ mod tests { ], ) .await; + log::info!("Got response"); let (code, _description, (protocol, host, port)): (i8, String, (String, String, u16)) = serde_xmlrpc::response_from_str(&response).unwrap(); diff --git a/roslibrust_codegen_macro/src/lib.rs b/roslibrust_codegen_macro/src/lib.rs index d227f3db..7484c630 100644 --- a/roslibrust_codegen_macro/src/lib.rs +++ b/roslibrust_codegen_macro/src/lib.rs @@ -43,33 +43,6 @@ pub fn find_and_generate_ros_messages(input_stream: TokenStream) -> TokenStream } } -/// Does the same as find_and_generate_ros_messages, but interprets relative paths -/// as relative to the root of this crate. No idea if this is useful, but I needed it! -#[proc_macro] -pub fn find_and_generate_ros_messages_relative_to_manifest_dir( - input_stream: TokenStream, -) -> TokenStream { - let RosLibRustMessagePaths { mut paths } = - parse_macro_input!(input_stream as RosLibRustMessagePaths); - - std::env::set_current_dir(env!("CARGO_MANIFEST_DIR")).expect("Failed to set working dir"); - for path in &mut paths { - *path = path.canonicalize().unwrap_or_else(|err| { - panic!("Failed to canonicalize path {path:?}: {}", err.to_string()) - }); - } - - match roslibrust_codegen::find_and_generate_ros_messages(paths) { - // Note: there is not currently a way for proc_macros to indicate that they need to be re-generated - // We discard the "dependent_paths" part of the response here... - Ok((source, _dependent_paths)) => source.into(), - Err(e) => { - let error_msg = e.to_string(); - quote::quote!(compile_error!(#error_msg);).into() - } - } -} - /// Similar to `find_and_generate_ros_messages`, but does not search the /// `ROS_PACKAGE_PATH` environment variable paths (useful in some situations). #[proc_macro]