diff --git a/examples/message_demo/Cargo.toml b/examples/message_demo/Cargo.toml index 759960805..4e9a6f256 100644 --- a/examples/message_demo/Cargo.toml +++ b/examples/message_demo/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "examples_rclrs_message_demo" -version = "0.4.1" +version = "0.5.0" authors = ["Nikolai Morin "] edition = "2021" @@ -12,7 +12,7 @@ path = "src/message_demo.rs" anyhow = {version = "1", features = ["backtrace"]} [dependencies.rclrs] -version = "0.4" +version = "0.5" [dependencies.rosidl_runtime_rs] version = "0.4" diff --git a/examples/message_demo/src/message_demo.rs b/examples/message_demo/src/message_demo.rs index 3e2bf53b2..f8cc6431d 100644 --- a/examples/message_demo/src/message_demo.rs +++ b/examples/message_demo/src/message_demo.rs @@ -1,4 +1,4 @@ -use std::{convert::TryInto, env, sync::Arc}; +use std::convert::TryInto; use anyhow::{Error, Result}; use rosidl_runtime_rs::{seq, BoundedSequence, Message, Sequence}; @@ -138,38 +138,32 @@ fn demonstrate_sequences() { fn demonstrate_pubsub() -> Result<(), Error> { println!("================== Interoperability demo =================="); // Demonstrate interoperability between idiomatic and RMW-native message types - let context = rclrs::Context::new(env::args())?; - let node = rclrs::create_node(&context, "message_demo")?; + let mut executor = rclrs::Context::default_from_env()?.create_basic_executor(); + let node = executor.create_node("message_demo")?; - let idiomatic_publisher = node.create_publisher::( - "topic", - rclrs::QOS_PROFILE_DEFAULT, - )?; - let direct_publisher = node.create_publisher::( - "topic", - rclrs::QOS_PROFILE_DEFAULT, - )?; + let idiomatic_publisher = + node.create_publisher::("topic")?; + let direct_publisher = + node.create_publisher::("topic")?; let _idiomatic_subscription = node .create_subscription::( "topic", - rclrs::QOS_PROFILE_DEFAULT, move |_msg: rclrs_example_msgs::msg::VariousTypes| println!("Got idiomatic message!"), )?; let _direct_subscription = node .create_subscription::( "topic", - rclrs::QOS_PROFILE_DEFAULT, move |_msg: rclrs_example_msgs::msg::rmw::VariousTypes| { println!("Got RMW-native message!") }, )?; println!("Sending idiomatic message."); idiomatic_publisher.publish(rclrs_example_msgs::msg::VariousTypes::default())?; - rclrs::spin_once(Arc::clone(&node), None)?; + executor.spin(rclrs::SpinOptions::spin_once())?; println!("Sending RMW-native message."); direct_publisher.publish(rclrs_example_msgs::msg::rmw::VariousTypes::default())?; - rclrs::spin_once(Arc::clone(&node), None)?; + executor.spin(rclrs::SpinOptions::spin_once())?; Ok(()) } diff --git a/examples/minimal_client_service/Cargo.toml b/examples/minimal_client_service/Cargo.toml index 618b0028c..a2d10fcf0 100644 --- a/examples/minimal_client_service/Cargo.toml +++ b/examples/minimal_client_service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "examples_rclrs_minimal_client_service" -version = "0.4.1" +version = "0.5.0" authors = ["Esteve Fernandez "] edition = "2021" @@ -21,7 +21,7 @@ anyhow = {version = "1", features = ["backtrace"]} tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "time"] } [dependencies.rclrs] -version = "0.4" +version = "0.5" [dependencies.rosidl_runtime_rs] version = "0.4" diff --git a/examples/minimal_client_service/src/minimal_client.rs b/examples/minimal_client_service/src/minimal_client.rs index 915541d54..49f18e242 100644 --- a/examples/minimal_client_service/src/minimal_client.rs +++ b/examples/minimal_client_service/src/minimal_client.rs @@ -1,11 +1,9 @@ -use std::env; - use anyhow::{Error, Result}; fn main() -> Result<(), Error> { - let context = rclrs::Context::new(env::args())?; + let mut executor = rclrs::Context::default_from_env()?.create_basic_executor(); - let node = rclrs::create_node(&context, "minimal_client")?; + let node = executor.create_node("minimal_client")?; let client = node.create_client::("add_two_ints")?; @@ -30,5 +28,7 @@ fn main() -> Result<(), Error> { std::thread::sleep(std::time::Duration::from_millis(500)); println!("Waiting for response"); - rclrs::spin(node).map_err(|err| err.into()) + executor + .spin(rclrs::SpinOptions::default()) + .map_err(|err| err.into()) } diff --git a/examples/minimal_client_service/src/minimal_client_async.rs b/examples/minimal_client_service/src/minimal_client_async.rs index 0eeb87f4d..8babe04e7 100644 --- a/examples/minimal_client_service/src/minimal_client_async.rs +++ b/examples/minimal_client_service/src/minimal_client_async.rs @@ -1,12 +1,10 @@ -use std::env; - use anyhow::{Error, Result}; #[tokio::main] async fn main() -> Result<(), Error> { - let context = rclrs::Context::new(env::args())?; + let mut executor = rclrs::Context::default_from_env()?.create_basic_executor(); - let node = rclrs::create_node(&context, "minimal_client")?; + let node = executor.create_node("minimal_client")?; let client = node.create_client::("add_two_ints")?; @@ -22,7 +20,8 @@ async fn main() -> Result<(), Error> { println!("Waiting for response"); - let rclrs_spin = tokio::task::spawn_blocking(move || rclrs::spin(node)); + let rclrs_spin = + tokio::task::spawn_blocking(move || executor.spin(rclrs::SpinOptions::default())); let response = future.await?; println!( diff --git a/examples/minimal_client_service/src/minimal_service.rs b/examples/minimal_client_service/src/minimal_service.rs index b4149c817..84d154fec 100644 --- a/examples/minimal_client_service/src/minimal_service.rs +++ b/examples/minimal_client_service/src/minimal_service.rs @@ -1,5 +1,3 @@ -use std::env; - use anyhow::{Error, Result}; fn handle_service( @@ -13,13 +11,15 @@ fn handle_service( } fn main() -> Result<(), Error> { - let context = rclrs::Context::new(env::args())?; + let mut executor = rclrs::Context::default_from_env()?.create_basic_executor(); - let node = rclrs::create_node(&context, "minimal_service")?; + let node = executor.create_node("minimal_service")?; let _server = node .create_service::("add_two_ints", handle_service)?; println!("Starting server"); - rclrs::spin(node).map_err(|err| err.into()) + executor + .spin(rclrs::SpinOptions::default()) + .map_err(|err| err.into()) } diff --git a/examples/minimal_pub_sub/Cargo.toml b/examples/minimal_pub_sub/Cargo.toml index 52c4f0544..a96377db6 100644 --- a/examples/minimal_pub_sub/Cargo.toml +++ b/examples/minimal_pub_sub/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "examples_rclrs_minimal_pub_sub" -version = "0.4.1" +version = "0.5.0" # This project is not military-sponsored, Jacob's employment contract just requires him to use this email address authors = ["Esteve Fernandez ", "Nikolai Morin ", "Jacob Hassold "] edition = "2021" @@ -29,7 +29,7 @@ path = "src/zero_copy_publisher.rs" anyhow = {version = "1", features = ["backtrace"]} [dependencies.rclrs] -version = "0.4" +version = "0.5" [dependencies.rosidl_runtime_rs] version = "0.4" diff --git a/examples/minimal_pub_sub/src/minimal_publisher.rs b/examples/minimal_pub_sub/src/minimal_publisher.rs index 720086917..be88b0f5a 100644 --- a/examples/minimal_pub_sub/src/minimal_publisher.rs +++ b/examples/minimal_pub_sub/src/minimal_publisher.rs @@ -1,14 +1,12 @@ -use std::env; - use anyhow::{Error, Result}; fn main() -> Result<(), Error> { - let context = rclrs::Context::new(env::args())?; + let context = rclrs::Context::default_from_env()?; + let executor = context.create_basic_executor(); - let node = rclrs::create_node(&context, "minimal_publisher")?; + let node = executor.create_node("minimal_publisher")?; - let publisher = - node.create_publisher::("topic", rclrs::QOS_PROFILE_DEFAULT)?; + let publisher = node.create_publisher::("topic")?; let mut message = std_msgs::msg::String::default(); diff --git a/examples/minimal_pub_sub/src/minimal_subscriber.rs b/examples/minimal_pub_sub/src/minimal_subscriber.rs index ebc5fc194..ebe7406ee 100644 --- a/examples/minimal_pub_sub/src/minimal_subscriber.rs +++ b/examples/minimal_pub_sub/src/minimal_subscriber.rs @@ -1,17 +1,15 @@ -use std::env; - use anyhow::{Error, Result}; fn main() -> Result<(), Error> { - let context = rclrs::Context::new(env::args())?; + let context = rclrs::Context::default_from_env()?; + let mut executor = context.create_basic_executor(); - let node = rclrs::create_node(&context, "minimal_subscriber")?; + let node = executor.create_node("minimal_subscriber")?; let mut num_messages: usize = 0; let _subscription = node.create_subscription::( "topic", - rclrs::QOS_PROFILE_DEFAULT, move |msg: std_msgs::msg::String| { num_messages += 1; println!("I heard: '{}'", msg.data); @@ -19,5 +17,7 @@ fn main() -> Result<(), Error> { }, )?; - rclrs::spin(node).map_err(|err| err.into()) + executor + .spin(rclrs::SpinOptions::default()) + .map_err(|err| err.into()) } diff --git a/examples/minimal_pub_sub/src/minimal_two_nodes.rs b/examples/minimal_pub_sub/src/minimal_two_nodes.rs index fb03574a2..46bd9780c 100644 --- a/examples/minimal_pub_sub/src/minimal_two_nodes.rs +++ b/examples/minimal_pub_sub/src/minimal_two_nodes.rs @@ -1,23 +1,23 @@ -use std::{ - env, - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, Mutex, - }, +use std::sync::{ + atomic::{AtomicU32, Ordering}, + Arc, Mutex, }; use anyhow::{Error, Result}; struct MinimalSubscriber { num_messages: AtomicU32, - node: Arc, - subscription: Mutex>>>, + node: rclrs::Node, + subscription: Mutex>>, } impl MinimalSubscriber { - pub fn new(name: &str, topic: &str) -> Result, rclrs::RclrsError> { - let context = rclrs::Context::new(env::args())?; - let node = rclrs::create_node(&context, name)?; + pub fn new( + executor: &rclrs::Executor, + name: &str, + topic: &str, + ) -> Result, rclrs::RclrsError> { + let node = executor.create_node(name)?; let minimal_subscriber = Arc::new(MinimalSubscriber { num_messages: 0.into(), node, @@ -29,7 +29,6 @@ impl MinimalSubscriber { .node .create_subscription::( topic, - rclrs::QOS_PROFILE_DEFAULT, move |msg: std_msgs::msg::String| { minimal_subscriber_aux.callback(msg); }, @@ -50,14 +49,15 @@ impl MinimalSubscriber { } fn main() -> Result<(), Error> { - let publisher_context = rclrs::Context::new(env::args())?; - let publisher_node = rclrs::create_node(&publisher_context, "minimal_publisher")?; + let mut executor = rclrs::Context::default_from_env()?.create_basic_executor(); + let publisher_node = executor.create_node("minimal_publisher")?; - let subscriber_node_one = MinimalSubscriber::new("minimal_subscriber_one", "topic")?; - let subscriber_node_two = MinimalSubscriber::new("minimal_subscriber_two", "topic")?; + let _subscriber_node_one = + MinimalSubscriber::new(&executor, "minimal_subscriber_one", "topic")?; + let _subscriber_node_two = + MinimalSubscriber::new(&executor, "minimal_subscriber_two", "topic")?; - let publisher = publisher_node - .create_publisher::("topic", rclrs::QOS_PROFILE_DEFAULT)?; + let publisher = publisher_node.create_publisher::("topic")?; std::thread::spawn(move || -> Result<(), rclrs::RclrsError> { let mut message = std_msgs::msg::String::default(); @@ -71,11 +71,7 @@ fn main() -> Result<(), Error> { } }); - let executor = rclrs::SingleThreadedExecutor::new(); - - executor.add_node(&publisher_node)?; - executor.add_node(&subscriber_node_one.node)?; - executor.add_node(&subscriber_node_two.node)?; - - executor.spin().map_err(|err| err.into()) + executor + .spin(rclrs::SpinOptions::default()) + .map_err(|err| err.into()) } diff --git a/examples/minimal_pub_sub/src/zero_copy_publisher.rs b/examples/minimal_pub_sub/src/zero_copy_publisher.rs index 5e73b5de7..d495f90bb 100644 --- a/examples/minimal_pub_sub/src/zero_copy_publisher.rs +++ b/examples/minimal_pub_sub/src/zero_copy_publisher.rs @@ -1,14 +1,12 @@ -use std::env; - use anyhow::{Error, Result}; fn main() -> Result<(), Error> { - let context = rclrs::Context::new(env::args())?; + let context = rclrs::Context::default_from_env()?; + let executor = context.create_basic_executor(); - let node = rclrs::create_node(&context, "minimal_publisher")?; + let node = executor.create_node("minimal_publisher")?; - let publisher = - node.create_publisher::("topic", rclrs::QOS_PROFILE_DEFAULT)?; + let publisher = node.create_publisher::("topic")?; let mut publish_count: u32 = 1; diff --git a/examples/minimal_pub_sub/src/zero_copy_subscriber.rs b/examples/minimal_pub_sub/src/zero_copy_subscriber.rs index 9551dba0e..b44752d65 100644 --- a/examples/minimal_pub_sub/src/zero_copy_subscriber.rs +++ b/examples/minimal_pub_sub/src/zero_copy_subscriber.rs @@ -1,17 +1,14 @@ -use std::env; - use anyhow::{Error, Result}; fn main() -> Result<(), Error> { - let context = rclrs::Context::new(env::args())?; + let mut executor = rclrs::Context::default_from_env()?.create_basic_executor(); - let node = rclrs::create_node(&context, "minimal_subscriber")?; + let node = executor.create_node("minimal_subscriber")?; let mut num_messages: usize = 0; let _subscription = node.create_subscription::( "topic", - rclrs::QOS_PROFILE_DEFAULT, move |msg: rclrs::ReadOnlyLoanedMessage<'_, std_msgs::msg::UInt32>| { num_messages += 1; println!("I heard: '{}'", msg.data); @@ -19,5 +16,7 @@ fn main() -> Result<(), Error> { }, )?; - rclrs::spin(node).map_err(|err| err.into()) + executor + .spin(rclrs::SpinOptions::default()) + .map_err(|err| err.into()) } diff --git a/examples/rust_pubsub/src/simple_publisher.rs b/examples/rust_pubsub/src/simple_publisher.rs index 98d0e0f74..212ab524a 100644 --- a/examples/rust_pubsub/src/simple_publisher.rs +++ b/examples/rust_pubsub/src/simple_publisher.rs @@ -1,36 +1,35 @@ -use rclrs::{create_node, Context, Node, Publisher, RclrsError, QOS_PROFILE_DEFAULT}; +use rclrs::{Context, Executor, Publisher, RclrsError, SpinOptions}; use std::{sync::Arc, thread, time::Duration}; use std_msgs::msg::String as StringMsg; -struct SimplePublisherNode { - node: Arc, - _publisher: Arc>, + +struct SimplePublisher { + publisher: Publisher, } -impl SimplePublisherNode { - fn new(context: &Context) -> Result { - let node = create_node(context, "simple_publisher").unwrap(); - let _publisher = node - .create_publisher("publish_hello", QOS_PROFILE_DEFAULT) - .unwrap(); - Ok(Self { node, _publisher }) + +impl SimplePublisher { + fn new(executor: &Executor) -> Result { + let node = executor.create_node("simple_publisher").unwrap(); + let publisher = node.create_publisher("publish_hello").unwrap(); + Ok(Self { publisher }) } fn publish_data(&self, increment: i32) -> Result { let msg: StringMsg = StringMsg { data: format!("Hello World {}", increment), }; - self._publisher.publish(msg).unwrap(); + self.publisher.publish(msg).unwrap(); Ok(increment + 1_i32) } } fn main() -> Result<(), RclrsError> { - let context = Context::new(std::env::args()).unwrap(); - let publisher = Arc::new(SimplePublisherNode::new(&context).unwrap()); + let mut executor = Context::default_from_env().unwrap().create_basic_executor(); + let publisher = Arc::new(SimplePublisher::new(&executor).unwrap()); let publisher_other_thread = Arc::clone(&publisher); let mut count: i32 = 0; thread::spawn(move || loop { thread::sleep(Duration::from_millis(1000)); count = publisher_other_thread.publish_data(count).unwrap(); }); - rclrs::spin(publisher.node.clone()) + executor.spin(SpinOptions::default()) } diff --git a/examples/rust_pubsub/src/simple_subscriber.rs b/examples/rust_pubsub/src/simple_subscriber.rs index a0d02bb4c..30be0971e 100644 --- a/examples/rust_pubsub/src/simple_subscriber.rs +++ b/examples/rust_pubsub/src/simple_subscriber.rs @@ -1,35 +1,27 @@ -use rclrs::{create_node, Context, Node, RclrsError, Subscription, QOS_PROFILE_DEFAULT}; +use rclrs::{Context, Executor, RclrsError, SpinOptions, Subscription}; use std::{ - env, sync::{Arc, Mutex}, thread, time::Duration, }; use std_msgs::msg::String as StringMsg; + pub struct SimpleSubscriptionNode { - node: Arc, - _subscriber: Arc>, + _subscriber: Subscription, data: Arc>>, } + impl SimpleSubscriptionNode { - fn new(context: &Context) -> Result { - let node = create_node(context, "simple_subscription").unwrap(); + fn new(executor: &Executor) -> Result { + let node = executor.create_node("simple_subscription").unwrap(); let data: Arc>> = Arc::new(Mutex::new(None)); let data_mut: Arc>> = Arc::clone(&data); let _subscriber = node - .create_subscription::( - "publish_hello", - QOS_PROFILE_DEFAULT, - move |msg: StringMsg| { - *data_mut.lock().unwrap() = Some(msg); - }, - ) + .create_subscription::("publish_hello", move |msg: StringMsg| { + *data_mut.lock().unwrap() = Some(msg); + }) .unwrap(); - Ok(Self { - node, - _subscriber, - data, - }) + Ok(Self { _subscriber, data }) } fn data_callback(&self) -> Result<(), RclrsError> { if let Some(data) = self.data.lock().unwrap().as_ref() { @@ -41,12 +33,12 @@ impl SimpleSubscriptionNode { } } fn main() -> Result<(), RclrsError> { - let context = Context::new(env::args()).unwrap(); - let subscription = Arc::new(SimpleSubscriptionNode::new(&context).unwrap()); + let mut executor = Context::default_from_env().unwrap().create_basic_executor(); + let subscription = Arc::new(SimpleSubscriptionNode::new(&executor).unwrap()); let subscription_other_thread = Arc::clone(&subscription); thread::spawn(move || loop { thread::sleep(Duration::from_millis(1000)); subscription_other_thread.data_callback().unwrap() }); - rclrs::spin(subscription.node.clone()) + executor.spin(SpinOptions::default()) } diff --git a/rclrs/Cargo.toml b/rclrs/Cargo.toml index fe17cc990..693ad14f2 100644 --- a/rclrs/Cargo.toml +++ b/rclrs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rclrs" -version = "0.4.1" +version = "0.5.0" # This project is not military-sponsored, Jacob's employment contract just requires him to use this email address authors = ["Esteve Fernandez ", "Nikolai Morin ", "Jacob Hassold "] edition = "2021" diff --git a/rclrs/src/client.rs b/rclrs/src/client.rs index b308f1de2..d8c6fd7cb 100644 --- a/rclrs/src/client.rs +++ b/rclrs/src/client.rs @@ -11,7 +11,7 @@ use rosidl_runtime_rs::Message; use crate::{ error::{RclReturnCode, ToResult}, rcl_bindings::*, - MessageCow, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX, + IntoPrimitiveOptions, MessageCow, NodeHandle, QoSProfile, RclrsError, ENTITY_LIFECYCLE_MUTEX, }; // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread @@ -67,7 +67,7 @@ type RequestId = i64; /// The only available way to instantiate clients is via [`Node::create_client`][1], this is to /// ensure that [`Node`][2]s can track all the clients that have been created. /// -/// [1]: crate::Node::create_client +/// [1]: crate::NodeState::create_client /// [2]: crate::Node pub struct Client where @@ -83,23 +83,29 @@ where T: rosidl_runtime_rs::Service, { /// Creates a new client. - pub(crate) fn new(node_handle: Arc, topic: &str) -> Result + pub(crate) fn new<'a>( + node_handle: Arc, + options: impl Into>, + ) -> Result // This uses pub(crate) visibility to avoid instantiating this struct outside // [`Node::create_client`], see the struct's documentation for the rationale where T: rosidl_runtime_rs::Service, { + let ClientOptions { service_name, qos } = options.into(); // SAFETY: Getting a zero-initialized value is always safe. let mut rcl_client = unsafe { rcl_get_zero_initialized_client() }; let type_support = ::get_type_support() as *const rosidl_service_type_support_t; - let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul { - err, - s: topic.into(), - })?; + let topic_c_string = + CString::new(service_name).map_err(|err| RclrsError::StringContainsNul { + err, + s: service_name.into(), + })?; // SAFETY: No preconditions for this function. - let client_options = unsafe { rcl_client_get_default_options() }; + let mut client_options = unsafe { rcl_client_get_default_options() }; + client_options.qos = qos.into(); { let rcl_node = node_handle.rcl_node.lock().unwrap(); @@ -275,6 +281,38 @@ where } } +/// `ClientOptions` are used by [`Node::create_client`][1] to initialize a +/// [`Client`] for a service. +/// +/// [1]: crate::NodeState::create_client +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ClientOptions<'a> { + /// The name of the service that this client will send requests to + pub service_name: &'a str, + /// The quality of the service profile for this client + pub qos: QoSProfile, +} + +impl<'a> ClientOptions<'a> { + /// Initialize a new [`ClientOptions`] with default settings. + pub fn new(service_name: &'a str) -> Self { + Self { + service_name, + qos: QoSProfile::services_default(), + } + } +} + +impl<'a, T: IntoPrimitiveOptions<'a>> From for ClientOptions<'a> { + fn from(value: T) -> Self { + let primitive = value.into_primitive_options(); + let mut options = Self::new(primitive.name); + primitive.apply(&mut options.qos); + options + } +} + impl ClientBase for Client where T: rosidl_runtime_rs::Service, diff --git a/rclrs/src/context.rs b/rclrs/src/context.rs index 90c8fbd3c..17894ca46 100644 --- a/rclrs/src/context.rs +++ b/rclrs/src/context.rs @@ -6,7 +6,7 @@ use std::{ vec::Vec, }; -use crate::{rcl_bindings::*, RclrsError, ToResult}; +use crate::{rcl_bindings::*, Executor, RclrsError, ToResult}; /// This is locked whenever initializing or dropping any middleware entity /// because we have found issues in RCL and some RMW implementations that @@ -70,34 +70,40 @@ pub(crate) struct ContextHandle { pub(crate) rcl_context: Mutex, } +impl Default for Context { + fn default() -> Self { + // SAFETY: It should always be valid to instantiate a context with no + // arguments, no parameters, no options, etc. + Self::new([], InitOptions::default()).expect("Failed to instantiate a default context") + } +} + impl Context { /// Creates a new context. /// - /// Usually this would be called with `std::env::args()`, analogously to `rclcpp::init()`. - /// See also the official "Passing ROS arguments to nodes via the command-line" tutorial. + /// * `args` - A sequence of strings that resembles command line arguments + /// that users can pass into a ROS executable. See [the official tutorial][1] + /// to know what these arguments may look like. To simply pass in the arguments + /// that the user has provided from the command line, call [`Self::from_env`] + /// or [`Self::default_from_env`] instead. /// - /// Creating a context will fail if the args contain invalid ROS arguments. + /// * `options` - Additional options that your application can use to override + /// settings that would otherwise be determined by the environment. /// - /// # Example - /// ``` - /// # use rclrs::Context; - /// assert!(Context::new([]).is_ok()); - /// let invalid_remapping = ["--ros-args", "-r", ":=:*/]"].map(String::from); - /// assert!(Context::new(invalid_remapping).is_err()); - /// ``` - pub fn new(args: impl IntoIterator) -> Result { - Self::new_with_options(args, InitOptions::new()) - } - - /// Same as [`Context::new`] except you can additionally provide initialization options. + /// Creating a context will fail if `args` contains invalid ROS arguments. /// /// # Example /// ``` /// use rclrs::{Context, InitOptions}; - /// let context = Context::new_with_options([], InitOptions::new().with_domain_id(Some(5))).unwrap(); + /// let context = Context::new( + /// std::env::args(), + /// InitOptions::new().with_domain_id(Some(5)), + /// ).unwrap(); /// assert_eq!(context.domain_id(), 5); - /// ```` - pub fn new_with_options( + /// ``` + /// + /// [1]: https://docs.ros.org/en/rolling/How-To-Guides/Node-arguments.html + pub fn new( args: impl IntoIterator, options: InitOptions, ) -> Result { @@ -150,6 +156,23 @@ impl Context { }) } + /// Same as [`Self::new`] but [`std::env::args`] is automatically passed in + /// for `args`. + pub fn from_env(options: InitOptions) -> Result { + Self::new(std::env::args(), options) + } + + /// Same as [`Self::from_env`] but the default [`InitOptions`] is passed in + /// for `options`. + pub fn default_from_env() -> Result { + Self::new(std::env::args(), InitOptions::default()) + } + + /// Create a basic executor that comes built into rclrs. + pub fn create_basic_executor(&self) -> Executor { + Executor::new(Arc::clone(&self.handle)) + } + /// Returns the ROS domain ID that the context is using. /// /// The domain ID controls which nodes can send messages to each other, see the [ROS 2 concept article][1]. @@ -250,14 +273,14 @@ mod tests { #[test] fn test_create_context() -> Result<(), RclrsError> { // If the context fails to be created, this will cause a panic - let _ = Context::new(vec![])?; + let _ = Context::new(vec![], InitOptions::default())?; Ok(()) } #[test] fn test_context_ok() -> Result<(), RclrsError> { // If the context fails to be created, this will cause a panic - let created_context = Context::new(vec![]).unwrap(); + let created_context = Context::new(vec![], InitOptions::default()).unwrap(); assert!(created_context.ok()); Ok(()) diff --git a/rclrs/src/error.rs b/rclrs/src/error.rs index 3eba2549f..961e7d5e6 100644 --- a/rclrs/src/error.rs +++ b/rclrs/src/error.rs @@ -352,3 +352,30 @@ impl ToResult for rcl_ret_t { to_rclrs_result(*self) } } + +/// A helper trait to disregard timeouts as not an error. +pub trait RclrsErrorFilter { + /// If the result was a timeout error, change it to `Ok(())`. + fn timeout_ok(self) -> Result<(), RclrsError>; +} + +impl RclrsErrorFilter for Result<(), RclrsError> { + fn timeout_ok(self) -> Result<(), RclrsError> { + match self { + Ok(()) => Ok(()), + Err(err) => { + if matches!( + err, + RclrsError::RclError { + code: RclReturnCode::Timeout, + .. + } + ) { + return Ok(()); + } + + Err(err) + } + } + } +} diff --git a/rclrs/src/executor.rs b/rclrs/src/executor.rs index 37c43a68e..e2c8b60ed 100644 --- a/rclrs/src/executor.rs +++ b/rclrs/src/executor.rs @@ -1,45 +1,55 @@ -use crate::{rcl_bindings::rcl_context_is_valid, Node, RclReturnCode, RclrsError, WaitSet}; +use crate::{ + rcl_bindings::rcl_context_is_valid, ContextHandle, Node, NodeOptions, NodeState, RclrsError, + WaitSet, +}; use std::{ sync::{Arc, Mutex, Weak}, time::Duration, }; /// Single-threaded executor implementation. -pub struct SingleThreadedExecutor { - nodes_mtx: Mutex>>, +pub struct Executor { + context: Arc, + nodes_mtx: Mutex>>, } -impl Default for SingleThreadedExecutor { - fn default() -> Self { - Self::new() +impl Executor { + /// Create a [`Node`] that will run on this Executor. + pub fn create_node(&self, options: impl Into) -> Result { + let options: NodeOptions = options.into(); + let node = options.build(&self.context)?; + self.nodes_mtx.lock().unwrap().push(Arc::downgrade(&node)); + Ok(node) } -} -impl SingleThreadedExecutor { - /// Creates a new executor. - pub fn new() -> Self { - SingleThreadedExecutor { - nodes_mtx: Mutex::new(Vec::new()), - } - } + /// Spin the Executor. The current thread will be blocked until the Executor + /// stops spinning. + /// + /// [`SpinOptions`] can be used to automatically stop the spinning when + /// certain conditions are met. Use `SpinOptions::default()` to allow the + /// Executor to keep spinning indefinitely. + pub fn spin(&mut self, options: SpinOptions) -> Result<(), RclrsError> { + loop { + if self.nodes_mtx.lock().unwrap().is_empty() { + // Nothing to spin for, so just quit here + return Ok(()); + } - /// Add a node to the executor. - pub fn add_node(&self, node: &Arc) -> Result<(), RclrsError> { - { self.nodes_mtx.lock().unwrap() }.push(Arc::downgrade(node)); - Ok(()) - } + self.spin_once(options.timeout)?; - /// Remove a node from the executor. - pub fn remove_node(&self, node: Arc) -> Result<(), RclrsError> { - { self.nodes_mtx.lock().unwrap() } - .retain(|n| !n.upgrade().map(|n| Arc::ptr_eq(&n, &node)).unwrap_or(false)); - Ok(()) + if options.only_next_available_work { + // We were only suppposed to spin once, so quit here + return Ok(()); + } + + std::thread::yield_now(); + } } /// Polls the nodes for new messages and executes the corresponding callbacks. /// /// This function additionally checks that the context is still valid. - pub fn spin_once(&self, timeout: Option) -> Result<(), RclrsError> { + fn spin_once(&self, timeout: Option) -> Result<(), RclrsError> { for node in { self.nodes_mtx.lock().unwrap() } .iter() .filter_map(Weak::upgrade) @@ -66,19 +76,51 @@ impl SingleThreadedExecutor { Ok(()) } - /// Convenience function for calling [`SingleThreadedExecutor::spin_once`] in a loop. - pub fn spin(&self) -> Result<(), RclrsError> { - while !{ self.nodes_mtx.lock().unwrap() }.is_empty() { - match self.spin_once(None) { - Ok(_) - | Err(RclrsError::RclError { - code: RclReturnCode::Timeout, - .. - }) => std::thread::yield_now(), - error => return error, - } + /// Used by [`Context`] to create the `Executor`. Users cannot call this + /// function. + pub(crate) fn new(context: Arc) -> Self { + Self { + context, + nodes_mtx: Mutex::new(Vec::new()), } + } +} - Ok(()) +/// A bundle of optional conditions that a user may want to impose on how long +/// an executor spins for. +/// +/// By default the executor will be allowed to spin indefinitely. +#[non_exhaustive] +#[derive(Default)] +pub struct SpinOptions { + /// Only perform the next available work. This is similar to spin_once in + /// rclcpp and rclpy. + /// + /// To only process work that is immediately available without waiting at all, + /// set a timeout of zero. + pub only_next_available_work: bool, + /// Stop waiting after this duration of time has passed. Use `Some(0)` to not + /// wait any amount of time. Use `None` to wait an infinite amount of time. + pub timeout: Option, +} + +impl SpinOptions { + /// Use default spin options. + pub fn new() -> Self { + Self::default() + } + + /// Behave like spin_once in rclcpp and rclpy. + pub fn spin_once() -> Self { + Self { + only_next_available_work: true, + ..Default::default() + } + } + + /// Stop spinning once this durtion of time is reached. + pub fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = Some(timeout); + self } } diff --git a/rclrs/src/lib.rs b/rclrs/src/lib.rs index 4924b36cb..f41b61700 100644 --- a/rclrs/src/lib.rs +++ b/rclrs/src/lib.rs @@ -30,8 +30,6 @@ mod rcl_bindings; #[cfg(feature = "dyn_msg")] pub mod dynamic_message; -use std::{sync::Arc, time::Duration}; - pub use arguments::*; pub use client::*; pub use clock::*; @@ -48,66 +46,3 @@ pub use subscription::*; pub use time::*; use time_source::*; pub use wait::*; - -/// Polls the node for new messages and executes the corresponding callbacks. -/// -/// See [`WaitSet::wait`] for the meaning of the `timeout` parameter. -/// -/// This may under some circumstances return -/// [`SubscriptionTakeFailed`][1], [`ClientTakeFailed`][1], [`ServiceTakeFailed`][1] when the wait -/// set spuriously wakes up. -/// This can usually be ignored. -/// -/// [1]: crate::RclReturnCode -pub fn spin_once(node: Arc, timeout: Option) -> Result<(), RclrsError> { - let executor = SingleThreadedExecutor::new(); - executor.add_node(&node)?; - executor.spin_once(timeout) -} - -/// Convenience function for calling [`spin_once`] in a loop. -pub fn spin(node: Arc) -> Result<(), RclrsError> { - let executor = SingleThreadedExecutor::new(); - executor.add_node(&node)?; - executor.spin() -} - -/// Creates a new node in the empty namespace. -/// -/// Convenience function equivalent to [`Node::new`][1]. -/// Please see that function's documentation. -/// -/// [1]: crate::Node::new -/// -/// # Example -/// ``` -/// # use rclrs::{Context, RclrsError}; -/// let ctx = Context::new([])?; -/// let node = rclrs::create_node(&ctx, "my_node"); -/// assert!(node.is_ok()); -/// # Ok::<(), RclrsError>(()) -/// ``` -pub fn create_node(context: &Context, node_name: &str) -> Result, RclrsError> { - Node::new(context, node_name) -} - -/// Creates a [`NodeBuilder`]. -/// -/// Convenience function equivalent to [`NodeBuilder::new()`][1] and [`Node::builder()`][2]. -/// Please see that function's documentation. -/// -/// [1]: crate::NodeBuilder::new -/// [2]: crate::Node::builder -/// -/// # Example -/// ``` -/// # use rclrs::{Context, RclrsError}; -/// let context = Context::new([])?; -/// let node_builder = rclrs::create_node_builder(&context, "my_node"); -/// let node = node_builder.build()?; -/// assert_eq!(node.name(), "my_node"); -/// # Ok::<(), RclrsError>(()) -/// ``` -pub fn create_node_builder(context: &Context, node_name: &str) -> NodeBuilder { - Node::builder(context, node_name) -} diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index 97684d6bc..1be7c9928 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -1,5 +1,12 @@ -mod builder; mod graph; +pub use graph::*; + +mod node_options; +pub use node_options::*; + +mod primitive_options; +pub use primitive_options::*; + use std::{ cmp::PartialEq, ffi::CStr, @@ -11,12 +18,12 @@ use std::{ use rosidl_runtime_rs::Message; -pub use self::{builder::*, graph::*}; use crate::{ - rcl_bindings::*, Client, ClientBase, Clock, Context, ContextHandle, GuardCondition, - ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Publisher, QoSProfile, - RclrsError, Service, ServiceBase, Subscription, SubscriptionBase, SubscriptionCallback, - TimeSource, ENTITY_LIFECYCLE_MUTEX, + rcl_bindings::*, Client, ClientBase, ClientOptions, Clock, ContextHandle, GuardCondition, + ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Publisher, + PublisherOptions, PublisherState, RclrsError, Service, ServiceBase, ServiceOptions, + ServiceState, Subscription, SubscriptionBase, SubscriptionCallback, SubscriptionOptions, + SubscriptionState, TimeSource, ENTITY_LIFECYCLE_MUTEX, }; // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread @@ -28,9 +35,13 @@ unsafe impl Send for rcl_node_t {} /// Nodes are a core concept in ROS 2. Refer to the official ["Understanding ROS 2 nodes"][1] /// tutorial for an introduction. /// -/// Ownership of the node is shared with all [`Publisher`]s and [`Subscription`]s created from it. -/// That means that even after the node itself is dropped, it will continue to exist and be -/// displayed by e.g. `ros2 topic` as long as its publishers and subscriptions are not dropped. +/// Ownership of the node is shared with all the primitives such as [`Publisher`]s and [`Subscription`]s +/// that are created from it. That means that even after the `Node` itself is dropped, it will continue +/// to exist and be displayed by e.g. `ros2 topic` as long as any one of its primitives is not dropped. +/// +/// # Creating +/// Use [`Executor::create_node`][7] to create a new node. Pass in [`NodeOptions`] to set all the different +/// options for node creation, or just pass in a string for the node's name if the default options are okay. /// /// # Naming /// A node has a *name* and a *namespace*. @@ -48,17 +59,34 @@ unsafe impl Send for rcl_node_t {} /// In that sense, the parameters to the node creation functions are only the _default_ namespace and /// name. /// See also the [official tutorial][1] on the command line arguments for ROS nodes, and the -/// [`Node::namespace()`] and [`Node::name()`] functions for examples. +/// [`Node::namespace()`][3] and [`Node::name()`][4] functions for examples. /// /// ## Rules for valid names /// The rules for valid node names and node namespaces are explained in -/// [`NodeBuilder::new()`][3] and [`NodeBuilder::namespace()`][4]. +/// [`NodeOptions::new()`][5] and [`NodeOptions::namespace()`][6]. +/// +/// The `Node` object is really just a shared [`NodeState`], so see the [`NodeState`] +/// API to see the various operations you can do with a node, such as creating +/// subscriptions, publishers, services, and clients. /// /// [1]: https://docs.ros.org/en/rolling/Tutorials/Understanding-ROS2-Nodes.html /// [2]: https://docs.ros.org/en/rolling/How-To-Guides/Node-arguments.html -/// [3]: crate::NodeBuilder::new -/// [4]: crate::NodeBuilder::namespace -pub struct Node { +/// [3]: NodeState::namespace +/// [4]: NodeState::name +/// [5]: crate::NodeOptions::new +/// [6]: crate::NodeOptions::namespace +/// [7]: crate::Executor::create_node +pub type Node = Arc; + +/// The inner state of a [`Node`]. +/// +/// This is public so that you can choose to put it inside a [`Weak`] if you +/// want to be able to refer to a [`Node`] in a non-owning way. It is generally +/// recommended to manage the `NodeState` inside of an [`Arc`], and [`Node`] +/// is provided as convenience alias for that. +/// +/// The public API of the [`Node`] type is implemented via `NodeState`. +pub struct NodeState { pub(crate) clients_mtx: Mutex>>, pub(crate) guard_conditions_mtx: Mutex>>, pub(crate) services_mtx: Mutex>>, @@ -89,15 +117,15 @@ impl Drop for NodeHandle { } } -impl Eq for Node {} +impl Eq for NodeState {} -impl PartialEq for Node { +impl PartialEq for NodeState { fn eq(&self, other: &Self) -> bool { Arc::ptr_eq(&self.handle, &other.handle) } } -impl fmt::Debug for Node { +impl fmt::Debug for NodeState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_struct("Node") .field("fully_qualified_name", &self.fully_qualified_name()) @@ -105,15 +133,7 @@ impl fmt::Debug for Node { } } -impl Node { - /// Creates a new node in the empty namespace. - /// - /// See [`NodeBuilder::new()`] for documentation. - #[allow(clippy::new_ret_no_self)] - pub fn new(context: &Context, node_name: &str) -> Result, RclrsError> { - Self::builder(context, node_name).build() - } - +impl NodeState { /// Returns the clock associated with this node. pub fn get_clock(&self) -> Clock { self.time_source.get_clock() @@ -126,15 +146,15 @@ impl Node { /// /// # Example /// ``` - /// # use rclrs::{Context, RclrsError}; + /// # use rclrs::{Context, InitOptions, RclrsError}; /// // Without remapping - /// let context = Context::new([])?; - /// let node = rclrs::create_node(&context, "my_node")?; + /// let executor = Context::default().create_basic_executor(); + /// let node = executor.create_node("my_node")?; /// assert_eq!(node.name(), "my_node"); /// // With remapping /// let remapping = ["--ros-args", "-r", "__node:=your_node"].map(String::from); - /// let context_r = Context::new(remapping)?; - /// let node_r = rclrs::create_node(&context_r, "my_node")?; + /// let executor_r = Context::new(remapping, InitOptions::default())?.create_basic_executor(); + /// let node_r = executor_r.create_node("my_node")?; /// assert_eq!(node_r.name(), "your_node"); /// # Ok::<(), RclrsError>(()) /// ``` @@ -149,18 +169,18 @@ impl Node { /// /// # Example /// ``` - /// # use rclrs::{Context, RclrsError}; + /// # use rclrs::{Context, InitOptions, RclrsError, NodeOptions}; /// // Without remapping - /// let context = Context::new([])?; - /// let node = - /// rclrs::create_node_builder(&context, "my_node") - /// .namespace("/my/namespace") - /// .build()?; + /// let executor = Context::default().create_basic_executor(); + /// let node = executor.create_node( + /// NodeOptions::new("my_node") + /// .namespace("/my/namespace") + /// )?; /// assert_eq!(node.namespace(), "/my/namespace"); /// // With remapping /// let remapping = ["--ros-args", "-r", "__ns:=/your_namespace"].map(String::from); - /// let context_r = Context::new(remapping)?; - /// let node_r = rclrs::create_node(&context_r, "my_node")?; + /// let executor_r = Context::new(remapping, InitOptions::default())?.create_basic_executor(); + /// let node_r = executor_r.create_node("my_node")?; /// assert_eq!(node_r.namespace(), "/your_namespace"); /// # Ok::<(), RclrsError>(()) /// ``` @@ -171,19 +191,22 @@ impl Node { /// Returns the fully qualified name of the node. /// /// The fully qualified name of the node is the node namespace combined with the node name. - /// It is subject to the remappings shown in [`Node::name()`] and [`Node::namespace()`]. + /// It is subject to the remappings shown in [`Node::name()`][1] and [`Node::namespace()`][2]. /// /// # Example /// ``` - /// # use rclrs::{Context, RclrsError}; - /// let context = Context::new([])?; - /// let node = - /// rclrs::create_node_builder(&context, "my_node") - /// .namespace("/my/namespace") - /// .build()?; + /// # use rclrs::{Context, RclrsError, NodeOptions}; + /// let executor = Context::default().create_basic_executor(); + /// let node = executor.create_node( + /// NodeOptions::new("my_node") + /// .namespace("/my/namespace") + /// )?; /// assert_eq!(node.fully_qualified_name(), "/my/namespace/my_node"); /// # Ok::<(), RclrsError>(()) /// ``` + /// + /// [1]: NodeState::name + /// [2]: NodeState::namespace pub fn fully_qualified_name(&self) -> String { self.call_string_getter(rcl_node_get_fully_qualified_name) } @@ -199,13 +222,49 @@ impl Node { /// Creates a [`Client`][1]. /// + /// Pass in only the service name for the `options` argument to use all default client options: + /// ``` + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// let client = node.create_client::( + /// "my_service" + /// ) + /// .unwrap(); + /// ``` + /// + /// Take advantage of the [`IntoPrimitiveOptions`] API to easily build up the + /// client options: + /// + /// ``` + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// let client = node.create_client::( + /// "my_service" + /// .keep_all() + /// .transient_local() + /// ) + /// .unwrap(); + /// ``` + /// + /// Any quality of service options that you explicitly specify will override + /// the default service options. Any that you do not explicitly specify will + /// remain the default service options. Note that clients are generally + /// expected to use [reliable][2], so it's best not to change the reliability + /// setting unless you know what you are doing. + /// /// [1]: crate::Client + /// [2]: crate::QoSReliabilityPolicy::Reliable // TODO: make client's lifetime depend on node's lifetime - pub fn create_client(&self, topic: &str) -> Result>, RclrsError> + pub fn create_client<'a, T>( + &self, + options: impl Into>, + ) -> Result>, RclrsError> where T: rosidl_runtime_rs::Service, { - let client = Arc::new(Client::::new(Arc::clone(&self.handle), topic)?); + let client = Arc::new(Client::::new(Arc::clone(&self.handle), options)?); { self.clients_mtx.lock().unwrap() }.push(Arc::downgrade(&client) as Weak); Ok(client) } @@ -213,12 +272,12 @@ impl Node { /// Creates a [`GuardCondition`][1] with no callback. /// /// A weak pointer to the `GuardCondition` is stored within this node. - /// When this node is added to a wait set (e.g. when calling `spin_once`[2] + /// When this node is added to a wait set (e.g. when [spinning][2] /// with this node as an argument), the guard condition can be used to /// interrupt the wait. /// /// [1]: crate::GuardCondition - /// [2]: crate::spin_once + /// [2]: crate::Executor::spin pub fn create_guard_condition(&self) -> Arc { let guard_condition = Arc::new(GuardCondition::new_with_context_handle( Arc::clone(&self.handle.context_handle), @@ -232,12 +291,12 @@ impl Node { /// Creates a [`GuardCondition`][1] with a callback. /// /// A weak pointer to the `GuardCondition` is stored within this node. - /// When this node is added to a wait set (e.g. when calling `spin_once`[2] + /// When this node is added to a wait set (e.g. when [spinning][2] /// with this node as an argument), the guard condition can be used to /// interrupt the wait. /// /// [1]: crate::GuardCondition - /// [2]: crate::spin_once + /// [2]: crate::Executor::spin pub fn create_guard_condition_with_callback(&mut self, callback: F) -> Arc where F: Fn() + Send + Sync + 'static, @@ -253,36 +312,107 @@ impl Node { /// Creates a [`Publisher`][1]. /// + /// Pass in only the topic name for the `options` argument to use all default publisher options: + /// ``` + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// let publisher = node.create_publisher::( + /// "my_topic" + /// ) + /// .unwrap(); + /// ``` + /// + /// Take advantage of the [`IntoPrimitiveOptions`] API to easily build up the + /// publisher options: + /// + /// ``` + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// let publisher = node.create_publisher::( + /// "my_topic" + /// .keep_last(100) + /// .transient_local() + /// ) + /// .unwrap(); + /// + /// let reliable_publisher = node.create_publisher::( + /// "my_topic" + /// .reliable() + /// ) + /// .unwrap(); + /// ``` + /// /// [1]: crate::Publisher - // TODO: make publisher's lifetime depend on node's lifetime - pub fn create_publisher( + pub fn create_publisher<'a, T>( &self, - topic: &str, - qos: QoSProfile, - ) -> Result>, RclrsError> + options: impl Into>, + ) -> Result, RclrsError> where T: Message, { - let publisher = Arc::new(Publisher::::new(Arc::clone(&self.handle), topic, qos)?); + let publisher = Arc::new(PublisherState::::new(Arc::clone(&self.handle), options)?); Ok(publisher) } /// Creates a [`Service`][1]. /// + /// Pass in only the service name for the `options` argument to use all default service options: + /// ``` + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// let service = node.create_service::( + /// "my_service", + /// |_info, _request| { + /// println!("Received request!"); + /// test_msgs::srv::Empty_Response::default() + /// }, + /// ) + /// .unwrap(); + /// ``` + /// + /// Take advantage of the [`IntoPrimitiveOptions`] API to easily build up the + /// service options: + /// + /// ``` + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// let service = node.create_service::( + /// "my_service" + /// .keep_all() + /// .transient_local(), + /// |_info, _request| { + /// println!("Received request!"); + /// test_msgs::srv::Empty_Response::default() + /// }, + /// ) + /// .unwrap(); + /// ``` + /// + /// Any quality of service options that you explicitly specify will override + /// the default service options. Any that you do not explicitly specify will + /// remain the default service options. Note that services are generally + /// expected to use [reliable][2], so it's best not to change the reliability + /// setting unless you know what you are doing. + /// /// [1]: crate::Service + /// [2]: crate::QoSReliabilityPolicy::Reliable // TODO: make service's lifetime depend on node's lifetime - pub fn create_service( + pub fn create_service<'a, T, F>( &self, - topic: &str, + options: impl Into>, callback: F, - ) -> Result>, RclrsError> + ) -> Result, RclrsError> where T: rosidl_runtime_rs::Service, F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send, { - let service = Arc::new(Service::::new( + let service = Arc::new(ServiceState::::new( Arc::clone(&self.handle), - topic, + options, callback, )?); { self.services_mtx.lock().unwrap() } @@ -292,21 +422,60 @@ impl Node { /// Creates a [`Subscription`][1]. /// + /// Pass in only the topic name for the `options` argument to use all default subscription options: + /// ``` + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// let subscription = node.create_subscription( + /// "my_topic", + /// |_msg: test_msgs::msg::Empty| { + /// println!("Received message!"); + /// }, + /// ) + /// .unwrap(); + /// ``` + /// + /// Take advantage of the [`IntoPrimitiveOptions`] API to easily build up the + /// subscription options: + /// + /// ``` + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// let subscription = node.create_subscription( + /// "my_topic" + /// .keep_last(100) + /// .transient_local(), + /// |_msg: test_msgs::msg::Empty| { + /// println!("Received message!"); + /// }, + /// ) + /// .unwrap(); + /// + /// let reliable_subscription = node.create_subscription( + /// "my_reliable_topic" + /// .reliable(), + /// |_msg: test_msgs::msg::Empty| { + /// println!("Received message!"); + /// }, + /// ) + /// .unwrap(); + /// ``` + /// /// [1]: crate::Subscription // TODO: make subscription's lifetime depend on node's lifetime - pub fn create_subscription( + pub fn create_subscription<'a, T, Args>( &self, - topic: &str, - qos: QoSProfile, + options: impl Into>, callback: impl SubscriptionCallback, - ) -> Result>, RclrsError> + ) -> Result, RclrsError> where T: Message, { - let subscription = Arc::new(Subscription::::new( + let subscription = Arc::new(SubscriptionState::::new( Arc::clone(&self.handle), - topic, - qos, + options, callback, )?); { self.subscriptions_mtx.lock() } @@ -347,24 +516,24 @@ impl Node { /// Returns the ROS domain ID that the node is using. /// /// The domain ID controls which nodes can send messages to each other, see the [ROS 2 concept article][1]. - /// It can be set through the `ROS_DOMAIN_ID` environment variable. + /// It can be set through the `ROS_DOMAIN_ID` environment variable or by + /// passing custom [`NodeOptions`] into [`Context::new`][2] or [`Context::from_env`][3]. /// /// [1]: https://docs.ros.org/en/rolling/Concepts/About-Domain-ID.html + /// [2]: crate::Context::new + /// [3]: crate::Context::from_env /// /// # Example /// ``` /// # use rclrs::{Context, RclrsError}; /// // Set default ROS domain ID to 10 here /// std::env::set_var("ROS_DOMAIN_ID", "10"); - /// let context = Context::new([])?; - /// let node = rclrs::create_node(&context, "domain_id_node")?; + /// let executor = Context::default().create_basic_executor(); + /// let node = executor.create_node("domain_id_node")?; /// let domain_id = node.domain_id(); /// assert_eq!(domain_id, 10); /// # Ok::<(), RclrsError>(()) /// ``` - // TODO: If node option is supported, - // add description about this function is for getting actual domain_id - // and about override of domain_id via node option pub fn domain_id(&self) -> usize { let rcl_node = self.handle.rcl_node.lock().unwrap(); let mut domain_id: usize = 0; @@ -385,8 +554,8 @@ impl Node { /// # Example /// ``` /// # use rclrs::{Context, ParameterRange, RclrsError}; - /// let context = Context::new([])?; - /// let node = rclrs::create_node(&context, "domain_id_node")?; + /// let executor = Context::default().create_basic_executor(); + /// let node = executor.create_node("domain_id_node")?; /// // Set it to a range of 0-100, with a step of 2 /// let range = ParameterRange { /// lower: Some(0), @@ -421,25 +590,6 @@ impl Node { interface: &self.parameter, } } - - /// Creates a [`NodeBuilder`][1] with the given name. - /// - /// Convenience function equivalent to [`NodeBuilder::new()`][2]. - /// - /// [1]: crate::NodeBuilder - /// [2]: crate::NodeBuilder::new - /// - /// # Example - /// ``` - /// # use rclrs::{Context, Node, RclrsError}; - /// let context = Context::new([])?; - /// let node = Node::builder(&context, "my_node").build()?; - /// assert_eq!(node.name(), "my_node"); - /// # Ok::<(), RclrsError>(()) - /// ``` - pub fn builder(context: &Context, node_name: &str) -> NodeBuilder { - NodeBuilder::new(context, node_name) - } } // Helper used to implement call_string_getter(), but also used to get the FQN in the Node::new() @@ -461,36 +611,30 @@ pub(crate) unsafe fn call_string_getter_with_rcl_node( #[cfg(test)] mod tests { - use super::*; - use crate::test_helpers::*; + use crate::{test_helpers::*, *}; #[test] fn traits() { - assert_send::(); - assert_sync::(); + assert_send::(); + assert_sync::(); } #[test] fn test_topic_names_and_types() -> Result<(), RclrsError> { - use crate::QOS_PROFILE_SYSTEM_DEFAULT; use test_msgs::msg; let graph = construct_test_graph("test_topics_graph")?; let _node_1_defaults_subscription = graph.node1.create_subscription::( "graph_test_topic_3", - QOS_PROFILE_SYSTEM_DEFAULT, |_msg: msg::Defaults| {}, )?; - let _node_2_empty_subscription = graph.node2.create_subscription::( - "graph_test_topic_1", - QOS_PROFILE_SYSTEM_DEFAULT, - |_msg: msg::Empty| {}, - )?; + let _node_2_empty_subscription = graph + .node2 + .create_subscription::("graph_test_topic_1", |_msg: msg::Empty| {})?; let _node_2_basic_types_subscription = graph.node2.create_subscription::( "graph_test_topic_2", - QOS_PROFILE_SYSTEM_DEFAULT, |_msg: msg::BasicTypes| {}, )?; diff --git a/rclrs/src/node/graph.rs b/rclrs/src/node/graph.rs index 343fa61d3..eaacdb979 100644 --- a/rclrs/src/node/graph.rs +++ b/rclrs/src/node/graph.rs @@ -3,7 +3,7 @@ use std::{ ffi::{CStr, CString}, }; -use crate::{rcl_bindings::*, Node, RclrsError, ToResult}; +use crate::{rcl_bindings::*, NodeState, RclrsError, ToResult}; impl Drop for rmw_names_and_types_t { fn drop(&mut self) { @@ -57,7 +57,7 @@ pub struct TopicEndpointInfo { pub topic_type: String, } -impl Node { +impl NodeState { /// Returns a list of topic names and types for publishers associated with a node. pub fn get_publisher_names_and_types_by_node( &self, @@ -482,11 +482,11 @@ mod tests { .map(|value: usize| if value != 99 { 99 } else { 98 }) .unwrap_or(99); - let context = - Context::new_with_options([], InitOptions::new().with_domain_id(Some(domain_id))) - .unwrap(); + let executor = Context::new([], InitOptions::new().with_domain_id(Some(domain_id))) + .unwrap() + .create_basic_executor(); let node_name = "test_publisher_names_and_types"; - let node = Node::new(&context, node_name).unwrap(); + let node = executor.create_node(node_name).unwrap(); // Test that the graph has no publishers let names_and_topics = node .get_publisher_names_and_types_by_node(node_name, "") @@ -543,9 +543,9 @@ mod tests { #[test] fn test_node_names() { - let context = Context::new([]).unwrap(); + let executor = Context::default().create_basic_executor(); let node_name = "test_node_names"; - let node = Node::new(&context, node_name).unwrap(); + let node = executor.create_node(node_name).unwrap(); let names_and_namespaces = node.get_node_names().unwrap(); @@ -559,9 +559,9 @@ mod tests { #[test] fn test_node_names_with_enclaves() { - let context = Context::new([]).unwrap(); + let executor = Context::default().create_basic_executor(); let node_name = "test_node_names_with_enclaves"; - let node = Node::new(&context, node_name).unwrap(); + let node = executor.create_node(node_name).unwrap(); let names_and_namespaces = node.get_node_names_with_enclaves().unwrap(); diff --git a/rclrs/src/node/builder.rs b/rclrs/src/node/node_options.rs similarity index 75% rename from rclrs/src/node/builder.rs rename to rclrs/src/node/node_options.rs index 011d5d2f3..ef4ac1dea 100644 --- a/rclrs/src/node/builder.rs +++ b/rclrs/src/node/node_options.rs @@ -4,14 +4,13 @@ use std::{ }; use crate::{ - rcl_bindings::*, ClockType, Context, ContextHandle, Node, NodeHandle, ParameterInterface, + rcl_bindings::*, ClockType, ContextHandle, Node, NodeHandle, NodeState, ParameterInterface, QoSProfile, RclrsError, TimeSource, ToResult, ENTITY_LIFECYCLE_MUTEX, QOS_PROFILE_CLOCK, }; -/// A builder for creating a [`Node`][1]. +/// A set of options for creating a [`Node`][1]. /// /// The builder pattern allows selectively setting some fields, and leaving all others at their default values. -/// This struct instance can be created via [`Node::builder()`][2]. /// /// The default values for optional fields are: /// - `namespace: "/"` @@ -24,27 +23,25 @@ use crate::{ /// /// # Example /// ``` -/// # use rclrs::{Context, NodeBuilder, Node, RclrsError}; -/// let context = Context::new([])?; +/// # use rclrs::{Context, NodeOptions, Node, RclrsError}; +/// let executor = Context::default().create_basic_executor(); /// // Building a node in a single expression -/// let node = NodeBuilder::new(&context, "foo_node").namespace("/bar").build()?; +/// let node = executor.create_node(NodeOptions::new("foo_node").namespace("/bar"))?; /// assert_eq!(node.name(), "foo_node"); /// assert_eq!(node.namespace(), "/bar"); -/// // Building a node via Node::builder() -/// let node = Node::builder(&context, "bar_node").build()?; +/// // Building a node via NodeOptions +/// let node = executor.create_node(NodeOptions::new("bar_node"))?; /// assert_eq!(node.name(), "bar_node"); /// // Building a node step-by-step -/// let mut builder = Node::builder(&context, "goose"); -/// builder = builder.namespace("/duck/duck"); -/// let node = builder.build()?; +/// let mut options = NodeOptions::new("goose"); +/// options = options.namespace("/duck/duck"); +/// let node = executor.create_node(options)?; /// assert_eq!(node.fully_qualified_name(), "/duck/duck/goose"); /// # Ok::<(), RclrsError>(()) /// ``` /// /// [1]: crate::Node -/// [2]: crate::Node::builder -pub struct NodeBuilder { - context: Arc, +pub struct NodeOptions { name: String, namespace: String, use_global_arguments: bool, @@ -55,7 +52,7 @@ pub struct NodeBuilder { clock_qos: QoSProfile, } -impl NodeBuilder { +impl NodeOptions { /// Creates a builder for a node with the given name. /// /// See the [`Node` docs][1] for general information on node names. @@ -68,21 +65,19 @@ impl NodeBuilder { /// - Must not be empty and not be longer than `RMW_NODE_NAME_MAX_NAME_LENGTH` /// - Must not start with a number /// - /// Note that node name validation is delayed until [`NodeBuilder::build()`][3]. + /// Note that node name validation is delayed until [`Executor::create_node`][3]. /// /// # Example /// ``` - /// # use rclrs::{Context, NodeBuilder, RclrsError, RclReturnCode}; - /// let context = Context::new([])?; + /// # use rclrs::{Context, NodeOptions, RclrsError, RclReturnCode}; + /// let executor = Context::default().create_basic_executor(); /// // This is a valid node name - /// assert!(NodeBuilder::new(&context, "my_node").build().is_ok()); + /// assert!(executor.create_node(NodeOptions::new("my_node")).is_ok()); /// // This is another valid node name (although not a good one) - /// assert!(NodeBuilder::new(&context, "_______").build().is_ok()); + /// assert!(executor.create_node(NodeOptions::new("_______")).is_ok()); /// // This is an invalid node name /// assert!(matches!( - /// NodeBuilder::new(&context, "röböt") - /// .build() - /// .unwrap_err(), + /// executor.create_node(NodeOptions::new("röböt")).unwrap_err(), /// RclrsError::RclError { code: RclReturnCode::NodeInvalidName, .. } /// )); /// # Ok::<(), RclrsError>(()) @@ -90,10 +85,9 @@ impl NodeBuilder { /// /// [1]: crate::Node#naming /// [2]: https://docs.ros2.org/latest/api/rmw/validate__node__name_8h.html#a5690a285aed9735f89ef11950b6e39e3 - /// [3]: NodeBuilder::build - pub fn new(context: &Context, name: &str) -> NodeBuilder { - NodeBuilder { - context: Arc::clone(&context.handle), + /// [3]: crate::Executor::create_node + pub fn new(name: impl ToString) -> NodeOptions { + NodeOptions { name: name.to_string(), namespace: "/".to_string(), use_global_arguments: true, @@ -122,29 +116,29 @@ impl NodeBuilder { /// - Must not contain two or more `/` characters in a row /// - Must not have a `/` character at the end, except if `/` is the full namespace /// - /// Note that namespace validation is delayed until [`NodeBuilder::build()`][4]. + /// Note that namespace validation is delayed until [`Executor::create_node`][4]. /// /// # Example /// ``` - /// # use rclrs::{Context, Node, RclrsError, RclReturnCode}; - /// let context = Context::new([])?; + /// # use rclrs::{Context, Node, NodeOptions, RclrsError, RclReturnCode}; + /// let executor = Context::default().create_basic_executor(); /// // This is a valid namespace - /// let builder_ok_ns = Node::builder(&context, "my_node").namespace("/some/nested/namespace"); - /// assert!(builder_ok_ns.build().is_ok()); + /// let options_ok_ns = NodeOptions::new("my_node").namespace("/some/nested/namespace"); + /// assert!(executor.create_node(options_ok_ns).is_ok()); /// // This is an invalid namespace /// assert!(matches!( - /// Node::builder(&context, "my_node") + /// executor.create_node( + /// NodeOptions::new("my_node") /// .namespace("/10_percent_luck/20_percent_skill") - /// .build() - /// .unwrap_err(), + /// ).unwrap_err(), /// RclrsError::RclError { code: RclReturnCode::NodeInvalidNamespace, .. } /// )); /// // A missing forward slash at the beginning is automatically added /// assert_eq!( - /// Node::builder(&context, "my_node") + /// executor.create_node( + /// NodeOptions::new("my_node") /// .namespace("foo") - /// .build()? - /// .namespace(), + /// )?.namespace(), /// "/foo" /// ); /// # Ok::<(), RclrsError>(()) @@ -153,8 +147,8 @@ impl NodeBuilder { /// [1]: crate::Node#naming /// [2]: http://design.ros2.org/articles/topic_and_service_names.html /// [3]: https://docs.ros2.org/latest/api/rmw/validate__namespace_8h.html#a043f17d240cf13df01321b19a469ee49 - /// [4]: NodeBuilder::build - pub fn namespace(mut self, namespace: &str) -> Self { + /// [4]: crate::Executor::create_node + pub fn namespace(mut self, namespace: impl ToString) -> Self { self.namespace = namespace.to_string(); self } @@ -165,21 +159,21 @@ impl NodeBuilder { /// /// # Example /// ``` - /// # use rclrs::{Context, Node, NodeBuilder, RclrsError}; + /// # use rclrs::{Context, InitOptions, Node, NodeOptions, RclrsError}; /// let context_args = ["--ros-args", "--remap", "__node:=your_node"] /// .map(String::from); - /// let context = Context::new(context_args)?; + /// let executor = Context::new(context_args, InitOptions::default())?.create_basic_executor(); /// // Ignore the global arguments: - /// let node_without_global_args = - /// rclrs::create_node_builder(&context, "my_node") - /// .use_global_arguments(false) - /// .build()?; + /// let node_without_global_args = executor.create_node( + /// NodeOptions::new("my_node") + /// .use_global_arguments(false) + /// )?; /// assert_eq!(node_without_global_args.name(), "my_node"); /// // Do not ignore the global arguments: - /// let node_with_global_args = - /// rclrs::create_node_builder(&context, "my_other_node") - /// .use_global_arguments(true) - /// .build()?; + /// let node_with_global_args = executor.create_node( + /// NodeOptions::new("my_other_node") + /// .use_global_arguments(true) + /// )?; /// assert_eq!(node_with_global_args.name(), "your_node"); /// # Ok::<(), RclrsError>(()) /// ``` @@ -200,26 +194,29 @@ impl NodeBuilder { /// /// # Example /// ``` - /// # use rclrs::{Context, Node, NodeBuilder, RclrsError}; + /// # use rclrs::{Context, InitOptions, Node, NodeOptions, RclrsError}; /// // Usually, this would change the name of "my_node" to "context_args_node": /// let context_args = ["--ros-args", "--remap", "my_node:__node:=context_args_node"] /// .map(String::from); - /// let context = Context::new(context_args)?; + /// let executor = Context::new(context_args, InitOptions::default())?.create_basic_executor(); /// // But the node arguments will change it to "node_args_node": /// let node_args = ["--ros-args", "--remap", "my_node:__node:=node_args_node"] /// .map(String::from); - /// let node = - /// rclrs::create_node_builder(&context, "my_node") - /// .arguments(node_args) - /// .build()?; + /// let node = executor.create_node( + /// NodeOptions::new("my_node") + /// .arguments(node_args) + /// )?; /// assert_eq!(node.name(), "node_args_node"); /// # Ok::<(), RclrsError>(()) /// ``` /// /// [1]: crate::Context::new /// [2]: https://design.ros2.org/articles/ros_command_line_arguments.html - pub fn arguments(mut self, arguments: impl IntoIterator) -> Self { - self.arguments = arguments.into_iter().collect(); + pub fn arguments(mut self, arguments: Args) -> Self + where + Args::Item: ToString, + { + self.arguments = arguments.into_iter().map(|item| item.to_string()).collect(); self } @@ -257,12 +254,9 @@ impl NodeBuilder { /// Builds the node instance. /// - /// Node name and namespace validation is performed in this method. - /// - /// For example usage, see the [`NodeBuilder`][1] docs. - /// - /// [1]: crate::NodeBuilder - pub fn build(&self) -> Result, RclrsError> { + /// Only used internally. Downstream users should call + /// [`Executor::create_node`]. + pub(crate) fn build(self, context: &Arc) -> Result { let node_name = CString::new(self.name.as_str()).map_err(|err| RclrsError::StringContainsNul { err, @@ -274,7 +268,7 @@ impl NodeBuilder { s: self.namespace.clone(), })?; let rcl_node_options = self.create_rcl_node_options()?; - let rcl_context = &mut *self.context.rcl_context.lock().unwrap(); + let rcl_context = &mut *context.rcl_context.lock().unwrap(); // SAFETY: Getting a zero-initialized value is always safe. let mut rcl_node = unsafe { rcl_get_zero_initialized_node() }; @@ -298,7 +292,7 @@ impl NodeBuilder { let handle = Arc::new(NodeHandle { rcl_node: Mutex::new(rcl_node), - context_handle: Arc::clone(&self.context), + context_handle: Arc::clone(context), }); let parameter = { let rcl_node = handle.rcl_node.lock().unwrap(); @@ -308,21 +302,24 @@ impl NodeBuilder { &rcl_context.global_arguments, )? }; - let node = Arc::new(Node { - handle, - clients_mtx: Mutex::new(vec![]), - guard_conditions_mtx: Mutex::new(vec![]), - services_mtx: Mutex::new(vec![]), - subscriptions_mtx: Mutex::new(vec![]), + + let node = Arc::new(NodeState { + clients_mtx: Mutex::default(), + guard_conditions_mtx: Mutex::default(), + services_mtx: Mutex::default(), + subscriptions_mtx: Mutex::default(), time_source: TimeSource::builder(self.clock_type) .clock_qos(self.clock_qos) .build(), parameter, + handle, }); node.time_source.attach_node(&node); + if self.start_parameter_services { node.parameter.create_services(&node)?; } + Ok(node) } @@ -367,6 +364,12 @@ impl NodeBuilder { } } +impl From for NodeOptions { + fn from(name: T) -> Self { + NodeOptions::new(name) + } +} + impl Drop for rcl_node_options_t { fn drop(&mut self) { // SAFETY: Do not finish this struct except here. diff --git a/rclrs/src/node/primitive_options.rs b/rclrs/src/node/primitive_options.rs new file mode 100644 index 000000000..7a8aa3eb8 --- /dev/null +++ b/rclrs/src/node/primitive_options.rs @@ -0,0 +1,244 @@ +use crate::{ + QoSDurabilityPolicy, QoSDuration, QoSHistoryPolicy, QoSLivelinessPolicy, QoSProfile, + QoSReliabilityPolicy, +}; + +use std::{borrow::Borrow, time::Duration}; + +/// `PrimitiveOptions` are the subset of options that are relevant across all +/// primitives (e.g. [`Subscription`][1], [`Publisher`][2], [`Client`][3], and +/// [`Service`][4]). +/// +/// Each different primitive type may have its own defaults for the overall +/// quality of service settings, and we cannot know what the default will be +/// until the `PrimitiveOptions` gets converted into the more specific set of +/// options. Therefore we store each quality of service field separately so that +/// we will only override the settings that the user explicitly asked for, and +/// the rest will be determined by the default settings for each primitive. +/// +/// [1]: crate::Subscription +/// [2]: crate::Publisher +/// [3]: crate::Client +/// [4]: crate::Service +#[derive(Debug, Clone, Copy)] +#[non_exhaustive] +pub struct PrimitiveOptions<'a> { + /// The name that will be used for the primitive + pub name: &'a str, + /// Override the default [`QoSProfile::history`] for the primitive. + pub history: Option, + /// Override the default [`QoSProfile::reliability`] for the primitive. + pub reliability: Option, + /// Override the default [`QoSProfile::durability`] for the primitive. + pub durability: Option, + /// Override the default [`QoSProfile::deadline`] for the primitive. + pub deadline: Option, + /// Override the default [`QoSProfile::lifespan`] for the primitive. + pub lifespan: Option, + /// Override the default [`QoSProfile::liveliness`] for the primitive. + pub liveliness: Option, + /// Override the default [`QoSProfile::liveliness_lease_duration`] for the primitive. + pub liveliness_lease: Option, + /// Override the default [`QoSProfile::avoid_ros_namespace_conventions`] for the primitive. + pub avoid_ros_namespace_conventions: Option, +} + +/// Trait to implicitly convert a compatible object into [`PrimitiveOptions`]. +pub trait IntoPrimitiveOptions<'a>: Sized { + /// Convert the object into [`PrimitiveOptions`] with default settings. + fn into_primitive_options(self) -> PrimitiveOptions<'a>; + + /// Override all the quality of service settings for the primitive. + fn qos(self, profile: QoSProfile) -> PrimitiveOptions<'a> { + self.into_primitive_options().history(profile.history) + } + + /// Use the default topics quality of service profile. + fn topics_qos(self) -> PrimitiveOptions<'a> { + self.qos(QoSProfile::topics_default()) + } + + /// Use the default sensor data quality of service profile. + fn sensor_data_qos(self) -> PrimitiveOptions<'a> { + self.qos(QoSProfile::sensor_data_default()) + } + + /// Use the default services quality of service profile. + fn services_qos(self) -> PrimitiveOptions<'a> { + self.qos(QoSProfile::services_default()) + } + + /// Use the system-defined default quality of service profile. This profile + /// is determined by the underlying RMW implementation, so you cannot rely + /// on this profile being consistent or appropriate for your needs. + fn system_qos(self) -> PrimitiveOptions<'a> { + self.qos(QoSProfile::system_default()) + } + + /// Override the default [`QoSProfile::history`] for the primitive. + fn history(self, history: QoSHistoryPolicy) -> PrimitiveOptions<'a> { + let mut options = self.into_primitive_options(); + options.history = Some(history); + options + } + + /// Keep the last `depth` messages for the primitive. + fn keep_last(self, depth: u32) -> PrimitiveOptions<'a> { + self.history(QoSHistoryPolicy::KeepLast { depth }) + } + + /// Keep all messages for the primitive. + fn keep_all(self) -> PrimitiveOptions<'a> { + self.history(QoSHistoryPolicy::KeepAll) + } + + /// Override the default [`QoSProfile::reliability`] for the primitive. + fn reliability(self, reliability: QoSReliabilityPolicy) -> PrimitiveOptions<'a> { + let mut options = self.into_primitive_options(); + options.reliability = Some(reliability); + options + } + + /// Set the primitive to have [reliable][QoSReliabilityPolicy::Reliable] communication. + fn reliable(self) -> PrimitiveOptions<'a> { + self.reliability(QoSReliabilityPolicy::Reliable) + } + + /// Set the primitive to have [best-effort][QoSReliabilityPolicy::BestEffort] communication. + fn best_effort(self) -> PrimitiveOptions<'a> { + self.reliability(QoSReliabilityPolicy::BestEffort) + } + + /// Override the default [`QoSProfile::durability`] for the primitive. + fn durability(self, durability: QoSDurabilityPolicy) -> PrimitiveOptions<'a> { + let mut options = self.into_primitive_options(); + options.durability = Some(durability); + options + } + + /// Set the primitive to have [volatile][QoSDurabilityPolicy::Volatile] durability. + fn volatile(self) -> PrimitiveOptions<'a> { + self.durability(QoSDurabilityPolicy::Volatile) + } + + /// Set the primitive to have [transient local][QoSDurabilityPolicy::TransientLocal] durability. + fn transient_local(self) -> PrimitiveOptions<'a> { + self.durability(QoSDurabilityPolicy::TransientLocal) + } + + /// Override the default [`QoSProfile::lifespan`] for the primitive. + fn lifespan(self, lifespan: QoSDuration) -> PrimitiveOptions<'a> { + let mut options = self.into_primitive_options(); + options.lifespan = Some(lifespan); + options + } + + /// Set a custom duration for the [lifespan][QoSProfile::lifespan] of the primitive. + fn lifespan_duration(self, duration: Duration) -> PrimitiveOptions<'a> { + self.lifespan(QoSDuration::Custom(duration)) + } + + /// Make the [lifespan][QoSProfile::lifespan] of the primitive infinite. + fn infinite_lifespan(self) -> PrimitiveOptions<'a> { + self.lifespan(QoSDuration::Infinite) + } + + /// Override the default [`QoSProfile::deadline`] for the primitive. + fn deadline(self, deadline: QoSDuration) -> PrimitiveOptions<'a> { + let mut options = self.into_primitive_options(); + options.deadline = Some(deadline); + options + } + + /// Set the [`QoSProfile::deadline`] to a custom finite value. + fn deadline_duration(self, duration: Duration) -> PrimitiveOptions<'a> { + self.deadline(QoSDuration::Custom(duration)) + } + + /// Do not use a deadline for liveliness for this primitive. + fn no_deadline(self) -> PrimitiveOptions<'a> { + self.deadline(QoSDuration::Infinite) + } + + /// Override the default [`QoSProfile::liveliness_lease`] for the primitive. + fn liveliness_lease(self, lease: QoSDuration) -> PrimitiveOptions<'a> { + let mut options = self.into_primitive_options(); + options.liveliness_lease = Some(lease); + options + } + + /// Set a custom duration for the [liveliness lease][QoSProfile::liveliness_lease]. + fn liveliness_lease_duration(self, duration: Duration) -> PrimitiveOptions<'a> { + self.liveliness_lease(QoSDuration::Custom(duration)) + } +} + +impl<'a> IntoPrimitiveOptions<'a> for PrimitiveOptions<'a> { + fn into_primitive_options(self) -> PrimitiveOptions<'a> { + self + } +} + +impl<'a> IntoPrimitiveOptions<'a> for &'a str { + fn into_primitive_options(self) -> PrimitiveOptions<'a> { + PrimitiveOptions::new(self) + } +} + +impl<'a, T: Borrow> IntoPrimitiveOptions<'a> for &'a T { + fn into_primitive_options(self) -> PrimitiveOptions<'a> { + self.borrow().into_primitive_options() + } +} + +impl<'a> PrimitiveOptions<'a> { + /// Begin building a new set of `PrimitiveOptions` with only the name set. + pub fn new(name: &'a str) -> Self { + Self { + name, + history: None, + reliability: None, + durability: None, + deadline: None, + lifespan: None, + liveliness: None, + liveliness_lease: None, + avoid_ros_namespace_conventions: None, + } + } + + /// Apply the user-specified options to a pre-initialized [`QoSProfile`]. + pub fn apply(&self, qos: &mut QoSProfile) { + if let Some(history) = self.history { + qos.history = history; + } + + if let Some(reliability) = self.reliability { + qos.reliability = reliability; + } + + if let Some(durability) = self.durability { + qos.durability = durability; + } + + if let Some(deadline) = self.deadline { + qos.deadline = deadline; + } + + if let Some(lifespan) = self.lifespan { + qos.lifespan = lifespan; + } + + if let Some(liveliness) = self.liveliness { + qos.liveliness = liveliness; + } + + if let Some(liveliness_lease) = self.liveliness_lease { + qos.liveliness_lease = liveliness_lease; + } + + if let Some(convention) = self.avoid_ros_namespace_conventions { + qos.avoid_ros_namespace_conventions = convention; + } + } +} diff --git a/rclrs/src/parameter.rs b/rclrs/src/parameter.rs index c7153ce01..8a01af36a 100644 --- a/rclrs/src/parameter.rs +++ b/rclrs/src/parameter.rs @@ -82,7 +82,9 @@ enum DeclaredValue { } /// Builder used to declare a parameter. Obtain this by calling -/// [`crate::Node::declare_parameter`]. +/// [`Node::declare_parameter`][1]. +/// +/// [1]: crate::NodeState::declare_parameter #[must_use] pub struct ParameterBuilder<'a, T: ParameterVariant> { name: Arc, @@ -871,18 +873,23 @@ impl ParameterInterface { #[cfg(test)] mod tests { use super::*; - use crate::{create_node, Context}; + use crate::{Context, InitOptions}; #[test] fn test_parameter_override_errors() { // Create a new node with a few parameter overrides - let ctx = Context::new([ - String::from("--ros-args"), - String::from("-p"), - String::from("declared_int:=10"), - ]) - .unwrap(); - let node = create_node(&ctx, "param_test_node").unwrap(); + let executor = Context::new( + [ + String::from("--ros-args"), + String::from("-p"), + String::from("declared_int:=10"), + ], + InitOptions::default(), + ) + .unwrap() + .create_basic_executor(); + + let node = executor.create_node("param_test_node").unwrap(); // Declaring a parameter with a different type than what was overridden should return an // error @@ -928,19 +935,24 @@ mod tests { #[test] fn test_parameter_setting_declaring() { // Create a new node with a few parameter overrides - let ctx = Context::new([ - String::from("--ros-args"), - String::from("-p"), - String::from("declared_int:=10"), - String::from("-p"), - String::from("double_array:=[1.0, 2.0]"), - String::from("-p"), - String::from("optional_bool:=true"), - String::from("-p"), - String::from("non_declared_string:='param'"), - ]) - .unwrap(); - let node = create_node(&ctx, "param_test_node").unwrap(); + let executor = Context::new( + [ + String::from("--ros-args"), + String::from("-p"), + String::from("declared_int:=10"), + String::from("-p"), + String::from("double_array:=[1.0, 2.0]"), + String::from("-p"), + String::from("optional_bool:=true"), + String::from("-p"), + String::from("non_declared_string:='param'"), + ], + InitOptions::default(), + ) + .unwrap() + .create_basic_executor(); + + let node = executor.create_node("param_test_node").unwrap(); let overridden_int = node .declare_parameter("declared_int") @@ -1084,13 +1096,18 @@ mod tests { #[test] fn test_override_undeclared_set_priority() { - let ctx = Context::new([ - String::from("--ros-args"), - String::from("-p"), - String::from("declared_int:=10"), - ]) - .unwrap(); - let node = create_node(&ctx, "param_test_node").unwrap(); + let executor = Context::new( + [ + String::from("--ros-args"), + String::from("-p"), + String::from("declared_int:=10"), + ], + InitOptions::default(), + ) + .unwrap() + .create_basic_executor(); + + let node = executor.create_node("param_test_node").unwrap(); // If a parameter was set as an override and as an undeclared parameter, the undeclared // value should get priority node.use_undeclared_parameters() @@ -1106,13 +1123,18 @@ mod tests { #[test] fn test_parameter_scope_redeclaring() { - let ctx = Context::new([ - String::from("--ros-args"), - String::from("-p"), - String::from("declared_int:=10"), - ]) - .unwrap(); - let node = create_node(&ctx, "param_test_node").unwrap(); + let executor = Context::new( + [ + String::from("--ros-args"), + String::from("-p"), + String::from("declared_int:=10"), + ], + InitOptions::default(), + ) + .unwrap() + .create_basic_executor(); + + let node = executor.create_node("param_test_node").unwrap(); { // Setting a parameter with an override let param = node @@ -1157,8 +1179,10 @@ mod tests { #[test] fn test_parameter_ranges() { - let ctx = Context::new([]).unwrap(); - let node = create_node(&ctx, "param_test_node").unwrap(); + let node = Context::default() + .create_basic_executor() + .create_node("param_test_node") + .unwrap(); // Setting invalid ranges should fail let range = ParameterRange { lower: Some(10), @@ -1285,8 +1309,10 @@ mod tests { #[test] fn test_readonly_parameters() { - let ctx = Context::new([]).unwrap(); - let node = create_node(&ctx, "param_test_node").unwrap(); + let node = Context::default() + .create_basic_executor() + .create_node("param_test_node") + .unwrap(); let param = node .declare_parameter("int_param") .default(100) @@ -1312,8 +1338,10 @@ mod tests { #[test] fn test_preexisting_value_error() { - let ctx = Context::new([]).unwrap(); - let node = create_node(&ctx, "param_test_node").unwrap(); + let node = Context::default() + .create_basic_executor() + .create_node("param_test_node") + .unwrap(); node.use_undeclared_parameters() .set("int_param", 100) .unwrap(); @@ -1365,8 +1393,10 @@ mod tests { #[test] fn test_optional_parameter_apis() { - let ctx = Context::new([]).unwrap(); - let node = create_node(&ctx, "param_test_node").unwrap(); + let node = Context::default() + .create_basic_executor() + .create_node("param_test_node") + .unwrap(); node.declare_parameter::("int_param") .optional() .unwrap(); diff --git a/rclrs/src/parameter/service.rs b/rclrs/src/parameter/service.rs index 7c8ffe62d..a880328e5 100644 --- a/rclrs/src/parameter/service.rs +++ b/rclrs/src/parameter/service.rs @@ -9,24 +9,24 @@ use rosidl_runtime_rs::Sequence; use super::ParameterMap; use crate::{ parameter::{DeclaredValue, ParameterKind, ParameterStorage}, - rmw_request_id_t, Node, RclrsError, Service, + rmw_request_id_t, IntoPrimitiveOptions, Node, QoSProfile, RclrsError, Service, }; // The variables only exist to keep a strong reference to the services and are technically unused. // What is used is the Weak that is stored in the node, and is upgraded when spinning. pub struct ParameterService { #[allow(dead_code)] - describe_parameters_service: Arc>, + describe_parameters_service: Service, #[allow(dead_code)] - get_parameter_types_service: Arc>, + get_parameter_types_service: Service, #[allow(dead_code)] - get_parameters_service: Arc>, + get_parameters_service: Service, #[allow(dead_code)] - list_parameters_service: Arc>, + list_parameters_service: Service, #[allow(dead_code)] - set_parameters_service: Arc>, + set_parameters_service: Service, #[allow(dead_code)] - set_parameters_atomically_service: Arc>, + set_parameters_atomically_service: Service, } fn describe_parameters( @@ -247,7 +247,7 @@ impl ParameterService { // destruction is made for the parameter map. let map = parameter_map.clone(); let describe_parameters_service = node.create_service( - &(fqn.clone() + "/describe_parameters"), + (fqn.clone() + "/describe_parameters").qos(QoSProfile::parameter_services_default()), move |_req_id: &rmw_request_id_t, req: DescribeParameters_Request| { let map = map.lock().unwrap(); describe_parameters(req, &map) @@ -255,7 +255,7 @@ impl ParameterService { )?; let map = parameter_map.clone(); let get_parameter_types_service = node.create_service( - &(fqn.clone() + "/get_parameter_types"), + (fqn.clone() + "/get_parameter_types").qos(QoSProfile::parameter_services_default()), move |_req_id: &rmw_request_id_t, req: GetParameterTypes_Request| { let map = map.lock().unwrap(); get_parameter_types(req, &map) @@ -263,7 +263,7 @@ impl ParameterService { )?; let map = parameter_map.clone(); let get_parameters_service = node.create_service( - &(fqn.clone() + "/get_parameters"), + (fqn.clone() + "/get_parameters").qos(QoSProfile::parameter_services_default()), move |_req_id: &rmw_request_id_t, req: GetParameters_Request| { let map = map.lock().unwrap(); get_parameters(req, &map) @@ -271,7 +271,7 @@ impl ParameterService { )?; let map = parameter_map.clone(); let list_parameters_service = node.create_service( - &(fqn.clone() + "/list_parameters"), + (fqn.clone() + "/list_parameters").qos(QoSProfile::parameter_services_default()), move |_req_id: &rmw_request_id_t, req: ListParameters_Request| { let map = map.lock().unwrap(); list_parameters(req, &map) @@ -279,14 +279,15 @@ impl ParameterService { )?; let map = parameter_map.clone(); let set_parameters_service = node.create_service( - &(fqn.clone() + "/set_parameters"), + (fqn.clone() + "/set_parameters").qos(QoSProfile::parameter_services_default()), move |_req_id: &rmw_request_id_t, req: SetParameters_Request| { let mut map = map.lock().unwrap(); set_parameters(req, &mut map) }, )?; let set_parameters_atomically_service = node.create_service( - &(fqn.clone() + "/set_parameters_atomically"), + (fqn.clone() + "/set_parameters_atomically") + .qos(QoSProfile::parameter_services_default()), move |_req_id: &rmw_request_id_t, req: SetParametersAtomically_Request| { let mut map = parameter_map.lock().unwrap(); set_parameters_atomically(req, &mut map) @@ -312,23 +313,26 @@ mod tests { }, srv::rmw::*, }, - Context, MandatoryParameter, Node, NodeBuilder, ParameterRange, ParameterValue, RclrsError, - ReadOnlyParameter, + Context, Executor, MandatoryParameter, Node, NodeOptions, ParameterRange, ParameterValue, + RclrsError, RclrsErrorFilter, ReadOnlyParameter, SpinOptions, }; use rosidl_runtime_rs::{seq, Sequence}; - use std::sync::{Arc, RwLock}; + use std::{ + sync::{Arc, RwLock}, + time::Duration, + }; struct TestNode { - node: Arc, + node: Node, bool_param: MandatoryParameter, _ns_param: MandatoryParameter, _read_only_param: ReadOnlyParameter, dynamic_param: MandatoryParameter, } - async fn try_until_timeout(f: F) -> Result<(), ()> + async fn try_until_timeout(mut f: F) -> Result<(), ()> where - F: FnOnce() -> bool + Copy, + F: FnMut() -> bool, { let mut retry_count = 0; while !f() { @@ -341,10 +345,10 @@ mod tests { Ok(()) } - fn construct_test_nodes(context: &Context, ns: &str) -> (TestNode, Arc) { - let node = NodeBuilder::new(context, "node") - .namespace(ns) - .build() + fn construct_test_nodes(ns: &str) -> (Executor, TestNode, Node) { + let executor = Context::default().create_basic_executor(); + let node = executor + .create_node(NodeOptions::new("node").namespace(ns)) .unwrap(); let range = ParameterRange { lower: Some(0), @@ -375,12 +379,12 @@ mod tests { .mandatory() .unwrap(); - let client = NodeBuilder::new(context, "client") - .namespace(ns) - .build() + let client = executor + .create_node(NodeOptions::new("client").namespace(ns)) .unwrap(); ( + executor, TestNode { node, bool_param, @@ -394,8 +398,7 @@ mod tests { #[test] fn test_parameter_services_names_and_types() -> Result<(), RclrsError> { - let context = Context::new([]).unwrap(); - let (node, _client) = construct_test_nodes(&context, "names_types"); + let (_, node, _client) = construct_test_nodes("names_types"); std::thread::sleep(std::time::Duration::from_millis(100)); @@ -429,8 +432,7 @@ mod tests { #[tokio::test] async fn test_list_parameters_service() -> Result<(), RclrsError> { - let context = Context::new([]).unwrap(); - let (node, client) = construct_test_nodes(&context, "list"); + let (mut executor, _test, client) = construct_test_nodes("list"); let list_client = client.create_client::("/list/node/list_parameters")?; try_until_timeout(|| list_client.service_is_ready().unwrap()) @@ -441,9 +443,12 @@ mod tests { let inner_done = done.clone(); let rclrs_spin = tokio::task::spawn(async move { - try_until_timeout(|| { - crate::spin_once(node.node.clone(), Some(std::time::Duration::ZERO)).ok(); - crate::spin_once(client.clone(), Some(std::time::Duration::ZERO)).ok(); + try_until_timeout(move || { + executor + .spin(SpinOptions::spin_once().timeout(Duration::ZERO)) + .timeout_ok() + .unwrap(); + *inner_done.read().unwrap() }) .await @@ -542,7 +547,6 @@ mod tests { move |response: ListParameters_Response| { *call_done.write().unwrap() = true; let names = response.result.names; - dbg!(&names); assert_eq!(names.len(), 2); assert_eq!(names[0].to_string(), "bool"); assert_eq!(names[1].to_string(), "use_sim_time"); @@ -564,14 +568,14 @@ mod tests { #[tokio::test] async fn test_get_set_parameters_service() -> Result<(), RclrsError> { - let context = Context::new([]).unwrap(); - let (node, client) = construct_test_nodes(&context, "get_set"); + let (mut executor, test, client) = construct_test_nodes("get_set"); let get_client = client.create_client::("/get_set/node/get_parameters")?; let set_client = client.create_client::("/get_set/node/set_parameters")?; let set_atomically_client = client .create_client::("/get_set/node/set_parameters_atomically")?; try_until_timeout(|| { + println!(" >> testing services"); get_client.service_is_ready().unwrap() && set_client.service_is_ready().unwrap() && set_atomically_client.service_is_ready().unwrap() @@ -581,18 +585,24 @@ mod tests { let done = Arc::new(RwLock::new(false)); - let inner_node = node.node.clone(); let inner_done = done.clone(); let rclrs_spin = tokio::task::spawn(async move { - try_until_timeout(|| { - crate::spin_once(inner_node.clone(), Some(std::time::Duration::ZERO)).ok(); - crate::spin_once(client.clone(), Some(std::time::Duration::ZERO)).ok(); + try_until_timeout(move || { + println!(" -- spin"); + executor + .spin(SpinOptions::spin_once().timeout(Duration::ZERO)) + .timeout_ok() + .unwrap(); + *inner_done.read().unwrap() }) .await .unwrap(); }); + let _hold_node = test.node.clone(); + let _hold_client = client.clone(); + let res = tokio::task::spawn(async move { // Get an existing parameter let request = GetParameters_Request { @@ -636,9 +646,12 @@ mod tests { }, ) .unwrap(); - try_until_timeout(|| *client_finished.read().unwrap()) - .await - .unwrap(); + try_until_timeout(|| { + println!("checking client"); + *client_finished.read().unwrap() + }) + .await + .unwrap(); // Set a mix of existing, non existing, dynamic and out of range parameters let bool_parameter = RmwParameter { @@ -711,7 +724,7 @@ mod tests { let client_finished = Arc::new(RwLock::new(false)); let call_done = client_finished.clone(); // Parameter is assigned a default of true at declaration time - assert!(node.bool_param.get()); + assert!(test.bool_param.get()); set_client .async_send_request_with_callback( &request, @@ -721,14 +734,14 @@ mod tests { // Setting a bool value set for a bool parameter assert!(response.results[0].successful); // Value was set to false, node parameter get should reflect this - assert!(!node.bool_param.get()); + assert!(!test.bool_param.get()); // Setting a parameter to the wrong type assert!(!response.results[1].successful); // Setting a read only parameter assert!(!response.results[2].successful); // Setting a dynamic parameter to a new type assert!(response.results[3].successful); - assert_eq!(node.dynamic_param.get(), ParameterValue::Bool(true)); + assert_eq!(test.dynamic_param.get(), ParameterValue::Bool(true)); // Setting a value out of range assert!(!response.results[4].successful); // Setting an invalid type @@ -743,7 +756,7 @@ mod tests { .unwrap(); // Set the node to use undeclared parameters and try to set one - node.node.use_undeclared_parameters(); + test.node.use_undeclared_parameters(); let request = SetParameters_Request { parameters: seq![undeclared_bool], }; @@ -758,7 +771,7 @@ mod tests { // Setting the undeclared parameter is now allowed assert!(response.results[0].successful); assert_eq!( - node.node.use_undeclared_parameters().get("undeclared_bool"), + test.node.use_undeclared_parameters().get("undeclared_bool"), Some(ParameterValue::Bool(true)) ); }, @@ -783,9 +796,12 @@ mod tests { }, ) .unwrap(); - try_until_timeout(|| *client_finished.read().unwrap()) - .await - .unwrap(); + try_until_timeout(|| { + println!("checking client finished"); + *client_finished.read().unwrap() + }) + .await + .unwrap(); *done.write().unwrap() = true; }); @@ -797,8 +813,7 @@ mod tests { #[tokio::test] async fn test_describe_get_types_parameters_service() -> Result<(), RclrsError> { - let context = Context::new([]).unwrap(); - let (node, client) = construct_test_nodes(&context, "describe"); + let (mut executor, _test, client) = construct_test_nodes("describe"); let describe_client = client.create_client::("/describe/node/describe_parameters")?; let get_types_client = @@ -814,11 +829,13 @@ mod tests { let done = Arc::new(RwLock::new(false)); let inner_done = done.clone(); - let inner_node = node.node.clone(); let rclrs_spin = tokio::task::spawn(async move { - try_until_timeout(|| { - crate::spin_once(inner_node.clone(), Some(std::time::Duration::ZERO)).ok(); - crate::spin_once(client.clone(), Some(std::time::Duration::ZERO)).ok(); + try_until_timeout(move || { + executor + .spin(SpinOptions::spin_once().timeout(Duration::ZERO)) + .timeout_ok() + .unwrap(); + *inner_done.read().unwrap() }) .await diff --git a/rclrs/src/parameter/value.rs b/rclrs/src/parameter/value.rs index 82fe31ebb..ff0c86c46 100644 --- a/rclrs/src/parameter/value.rs +++ b/rclrs/src/parameter/value.rs @@ -537,7 +537,7 @@ impl ParameterValue { #[cfg(test)] mod tests { use super::*; - use crate::{Context, RclrsError, ToResult}; + use crate::{Context, InitOptions, RclrsError, ToResult}; // TODO(luca) tests for all from / to ParameterVariant functions @@ -565,11 +565,14 @@ mod tests { ), ]; for pair in input_output_pairs { - let ctx = Context::new([ - String::from("--ros-args"), - String::from("-p"), - format!("foo:={}", pair.0), - ])?; + let ctx = Context::new( + [ + String::from("--ros-args"), + String::from("-p"), + format!("foo:={}", pair.0), + ], + InitOptions::default(), + )?; let mut rcl_params = std::ptr::null_mut(); unsafe { rcl_arguments_get_param_overrides( diff --git a/rclrs/src/publisher.rs b/rclrs/src/publisher.rs index 2935ca322..d27c15cba 100644 --- a/rclrs/src/publisher.rs +++ b/rclrs/src/publisher.rs @@ -11,7 +11,7 @@ use crate::{ error::{RclrsError, ToResult}, qos::QoSProfile, rcl_bindings::*, - NodeHandle, ENTITY_LIFECYCLE_MUTEX, + IntoPrimitiveOptions, NodeHandle, ENTITY_LIFECYCLE_MUTEX, }; mod loaned_message; @@ -45,15 +45,32 @@ impl Drop for PublisherHandle { /// Struct for sending messages of type `T`. /// +/// Create a publisher using [`Node::create_publisher`][1]. +/// /// Multiple publishers can be created for the same topic, in different nodes or the same node. +/// A clone of a `Publisher` will refer to the same publisher instance as the original. +/// The underlying instance is tied to [`PublisherState`] which implements the [`Publisher`] API. /// /// The underlying RMW will decide on the concrete delivery mechanism (network stack, shared /// memory, or intraprocess). /// -/// Sending messages does not require calling [`spin`][1] on the publisher's node. +/// Sending messages does not require the node's executor to [spin][2]. +/// +/// [1]: crate::NodeState::create_publisher +/// [2]: crate::Executor::spin +pub type Publisher = Arc>; + +/// The inner state of a [`Publisher`]. +/// +/// This is public so that you can choose to create a [`Weak`][1] reference to it +/// if you want to be able to refer to a [`Publisher`] in a non-owning way. It is +/// generally recommended to manage the `PublisherState` inside of an [`Arc`], +/// and [`Publisher`] is provided as a convenience alias for that. /// -/// [1]: crate::spin -pub struct Publisher +/// The public API of the [`Publisher`] type is implemented via `PublisherState`. +/// +/// [1]: std::sync::Weak +pub struct PublisherState where T: Message, { @@ -66,26 +83,26 @@ where // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread // they are running in. Therefore, this type can be safely sent to another thread. -unsafe impl Send for Publisher where T: Message {} +unsafe impl Send for PublisherState where T: Message {} // SAFETY: The type_support_ptr prevents the default Sync impl. // rosidl_message_type_support_t is a read-only type without interior mutability. -unsafe impl Sync for Publisher where T: Message {} +unsafe impl Sync for PublisherState where T: Message {} -impl Publisher +impl PublisherState where T: Message, { /// Creates a new `Publisher`. /// /// Node and namespace changes are always applied _before_ topic remapping. - pub(crate) fn new( + pub(crate) fn new<'a>( node_handle: Arc, - topic: &str, - qos: QoSProfile, + options: impl Into>, ) -> Result where T: Message, { + let PublisherOptions { topic, qos } = options.into(); // SAFETY: Getting a zero-initialized value is always safe. let mut rcl_publisher = unsafe { rcl_get_zero_initialized_publisher() }; let type_support_ptr = @@ -179,7 +196,7 @@ where } } -impl Publisher +impl PublisherState where T: RmwMessage, { @@ -231,7 +248,39 @@ where } } -/// Convenience trait for [`Publisher::publish`]. +/// `PublisherOptions` are used by [`Node::create_publisher`][1] to initialize +/// a [`Publisher`]. +/// +/// [1]: crate::NodeState::create_publisher +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct PublisherOptions<'a> { + /// The topic name for the publisher. + pub topic: &'a str, + /// The quality of service settings for the publisher. + pub qos: QoSProfile, +} + +impl<'a> PublisherOptions<'a> { + /// Initialize a new [`PublisherOptions`] with default settings. + pub fn new(topic: &'a str) -> Self { + Self { + topic, + qos: QoSProfile::topics_default(), + } + } +} + +impl<'a, T: IntoPrimitiveOptions<'a>> From for PublisherOptions<'a> { + fn from(value: T) -> Self { + let primitive = value.into_primitive_options(); + let mut options = Self::new(primitive.name); + primitive.apply(&mut options.qos); + options + } +} + +/// Convenience trait for [`PublisherState::publish`]. pub trait MessageCow<'a, T: Message> { /// Wrap the owned or borrowed message in a `Cow`. fn into_cow(self) -> Cow<'a, T>; @@ -262,7 +311,7 @@ mod tests { #[test] fn test_publishers() -> Result<(), RclrsError> { - use crate::{TopicEndpointInfo, QOS_PROFILE_SYSTEM_DEFAULT}; + use crate::TopicEndpointInfo; use test_msgs::msg; let namespace = "/test_publishers_graph"; @@ -270,16 +319,15 @@ mod tests { let node_1_empty_publisher = graph .node1 - .create_publisher::("graph_test_topic_1", QOS_PROFILE_SYSTEM_DEFAULT)?; + .create_publisher::("graph_test_topic_1")?; let topic1 = node_1_empty_publisher.topic_name(); - let node_1_basic_types_publisher = graph.node1.create_publisher::( - "graph_test_topic_2", - QOS_PROFILE_SYSTEM_DEFAULT, - )?; + let node_1_basic_types_publisher = graph + .node1 + .create_publisher::("graph_test_topic_2")?; let topic2 = node_1_basic_types_publisher.topic_name(); let node_2_default_publisher = graph .node2 - .create_publisher::("graph_test_topic_3", QOS_PROFILE_SYSTEM_DEFAULT)?; + .create_publisher::("graph_test_topic_3")?; let topic3 = node_2_default_publisher.topic_name(); std::thread::sleep(std::time::Duration::from_millis(100)); diff --git a/rclrs/src/publisher/loaned_message.rs b/rclrs/src/publisher/loaned_message.rs index c03fc300f..66ad1046a 100644 --- a/rclrs/src/publisher/loaned_message.rs +++ b/rclrs/src/publisher/loaned_message.rs @@ -2,13 +2,13 @@ use std::ops::{Deref, DerefMut}; use rosidl_runtime_rs::RmwMessage; -use crate::{rcl_bindings::*, Publisher, RclrsError, ToResult}; +use crate::{rcl_bindings::*, PublisherState, RclrsError, ToResult}; /// A message that is owned by the middleware, loaned for publishing. /// /// It dereferences to a `&mut T`. /// -/// This type is returned by [`Publisher::borrow_loaned_message()`], see the documentation of +/// This type is returned by [`PublisherState::borrow_loaned_message()`], see the documentation of /// that function for more information. /// /// The loan is returned by dropping the message or [publishing it][1]. @@ -19,7 +19,7 @@ where T: RmwMessage, { pub(super) msg_ptr: *mut T, - pub(super) publisher: &'a Publisher, + pub(super) publisher: &'a PublisherState, } impl<'a, T> Deref for LoanedMessage<'a, T> diff --git a/rclrs/src/qos.rs b/rclrs/src/qos.rs index b26f01ef8..83eb797d7 100644 --- a/rclrs/src/qos.rs +++ b/rclrs/src/qos.rs @@ -166,7 +166,7 @@ pub struct QoSProfile { /// The time within which the RMW publisher must show that it is alive. /// /// If this is `Infinite`, liveliness is not enforced. - pub liveliness_lease_duration: QoSDuration, + pub liveliness_lease: QoSDuration, /// If true, any ROS specific namespacing conventions will be circumvented. /// /// In the case of DDS and topics, for example, this means the typical @@ -200,7 +200,7 @@ impl From for rmw_qos_profile_t { deadline: qos.deadline.into(), lifespan: qos.lifespan.into(), liveliness: qos.liveliness.into(), - liveliness_lease_duration: qos.liveliness_lease_duration.into(), + liveliness_lease_duration: qos.liveliness_lease.into(), avoid_ros_namespace_conventions: qos.avoid_ros_namespace_conventions, } } @@ -251,7 +251,7 @@ impl QoSProfile { /// Sets the QoS profile liveliness lease duration to the specified `Duration`. pub fn liveliness_lease_duration(mut self, lease_duration: Duration) -> Self { - self.liveliness_lease_duration = QoSDuration::Custom(lease_duration); + self.liveliness_lease = QoSDuration::Custom(lease_duration); self } @@ -260,6 +260,38 @@ impl QoSProfile { self.lifespan = QoSDuration::Custom(lifespan); self } + + /// Get the default QoS profile for ordinary topics. + pub fn topics_default() -> Self { + QOS_PROFILE_DEFAULT + } + + /// Get the default QoS profile for topics that transmit sensor data. + pub fn sensor_data_default() -> Self { + QOS_PROFILE_SENSOR_DATA + } + + /// Get the default QoS profile for services. + pub fn services_default() -> Self { + QOS_PROFILE_SERVICES_DEFAULT + } + + /// Get the default QoS profile for parameter services. + pub fn parameter_services_default() -> Self { + QOS_PROFILE_PARAMETERS + } + + /// Get the default QoS profile for parameter event topics. + pub fn parameter_events_default() -> Self { + QOS_PROFILE_PARAMETER_EVENTS + } + + /// Get the system-defined default quality of service profile. This profile + /// is determined by the underlying RMW implementation, so you cannot rely + /// on this profile being consistent or appropriate for your needs. + pub fn system_default() -> Self { + QOS_PROFILE_SYSTEM_DEFAULT + } } impl From for rmw_qos_history_policy_t { @@ -355,7 +387,7 @@ pub const QOS_PROFILE_SENSOR_DATA: QoSProfile = QoSProfile { deadline: QoSDuration::SystemDefault, lifespan: QoSDuration::SystemDefault, liveliness: QoSLivelinessPolicy::SystemDefault, - liveliness_lease_duration: QoSDuration::SystemDefault, + liveliness_lease: QoSDuration::SystemDefault, avoid_ros_namespace_conventions: false, }; @@ -370,7 +402,7 @@ pub const QOS_PROFILE_CLOCK: QoSProfile = QoSProfile { deadline: QoSDuration::SystemDefault, lifespan: QoSDuration::SystemDefault, liveliness: QoSLivelinessPolicy::SystemDefault, - liveliness_lease_duration: QoSDuration::SystemDefault, + liveliness_lease: QoSDuration::SystemDefault, avoid_ros_namespace_conventions: false, }; @@ -384,7 +416,7 @@ pub const QOS_PROFILE_PARAMETERS: QoSProfile = QoSProfile { deadline: QoSDuration::SystemDefault, lifespan: QoSDuration::SystemDefault, liveliness: QoSLivelinessPolicy::SystemDefault, - liveliness_lease_duration: QoSDuration::SystemDefault, + liveliness_lease: QoSDuration::SystemDefault, avoid_ros_namespace_conventions: false, }; @@ -398,7 +430,7 @@ pub const QOS_PROFILE_DEFAULT: QoSProfile = QoSProfile { deadline: QoSDuration::SystemDefault, lifespan: QoSDuration::SystemDefault, liveliness: QoSLivelinessPolicy::SystemDefault, - liveliness_lease_duration: QoSDuration::SystemDefault, + liveliness_lease: QoSDuration::SystemDefault, avoid_ros_namespace_conventions: false, }; @@ -412,7 +444,7 @@ pub const QOS_PROFILE_SERVICES_DEFAULT: QoSProfile = QoSProfile { deadline: QoSDuration::SystemDefault, lifespan: QoSDuration::SystemDefault, liveliness: QoSLivelinessPolicy::SystemDefault, - liveliness_lease_duration: QoSDuration::SystemDefault, + liveliness_lease: QoSDuration::SystemDefault, avoid_ros_namespace_conventions: false, }; @@ -426,7 +458,7 @@ pub const QOS_PROFILE_PARAMETER_EVENTS: QoSProfile = QoSProfile { deadline: QoSDuration::SystemDefault, lifespan: QoSDuration::SystemDefault, liveliness: QoSLivelinessPolicy::SystemDefault, - liveliness_lease_duration: QoSDuration::SystemDefault, + liveliness_lease: QoSDuration::SystemDefault, avoid_ros_namespace_conventions: false, }; @@ -440,6 +472,6 @@ pub const QOS_PROFILE_SYSTEM_DEFAULT: QoSProfile = QoSProfile { deadline: QoSDuration::SystemDefault, lifespan: QoSDuration::SystemDefault, liveliness: QoSLivelinessPolicy::SystemDefault, - liveliness_lease_duration: QoSDuration::SystemDefault, + liveliness_lease: QoSDuration::SystemDefault, avoid_ros_namespace_conventions: false, }; diff --git a/rclrs/src/service.rs b/rclrs/src/service.rs index ac43e51a8..688d71c7e 100644 --- a/rclrs/src/service.rs +++ b/rclrs/src/service.rs @@ -9,7 +9,7 @@ use rosidl_runtime_rs::Message; use crate::{ error::{RclReturnCode, ToResult}, rcl_bindings::*, - MessageCow, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX, + IntoPrimitiveOptions, MessageCow, NodeHandle, QoSProfile, RclrsError, ENTITY_LIFECYCLE_MUTEX, }; // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread @@ -59,14 +59,33 @@ pub trait ServiceBase: Send + Sync { type ServiceCallback = Box Response + 'static + Send>; -/// Main class responsible for responding to requests sent by ROS clients. +/// Provide a service that can respond to requests sent by ROS service clients. /// -/// The only available way to instantiate services is via [`Node::create_service()`][1], this is to -/// ensure that [`Node`][2]s can track all the services that have been created. +/// Create a service using [`Node::create_service`][1]. /// -/// [1]: crate::Node::create_service -/// [2]: crate::Node -pub struct Service +/// ROS only supports having one service provider for any given fully-qualified +/// service name. "Fully-qualified" means the namespace is also taken into account +/// for uniqueness. A clone of a `Service` will refer to the same service provider +/// instance as the original. The underlying instance is tied to [`ServiceState`] +/// which implements the [`Service`] API. +/// +/// Responding to requests requires the node's executor to [spin][2]. +/// +/// [1]: crate::NodeState::create_service +/// [2]: crate::Executor::spin +pub type Service = Arc>; + +/// The inner state of a [`Service`]. +/// +/// This is public so that you can choose to create a [`Weak`][1] reference to it +/// if you want to be able to refer to a [`Service`] in a non-owning way. It is +/// generally recommended to manage the `ServiceState` inside of an [`Arc`], +/// and [`Service`] is provided as a convenience alias for that. +/// +/// The public API of the [`Service`] type is implemented via `ServiceState`. +/// +/// [1]: std::sync::Weak +pub struct ServiceState where T: rosidl_runtime_rs::Service, { @@ -75,14 +94,14 @@ where pub callback: Mutex>, } -impl Service +impl ServiceState where T: rosidl_runtime_rs::Service, { /// Creates a new service. - pub(crate) fn new( + pub(crate) fn new<'a, F>( node_handle: Arc, - topic: &str, + options: impl Into>, callback: F, ) -> Result // This uses pub(crate) visibility to avoid instantiating this struct outside @@ -91,17 +110,19 @@ where T: rosidl_runtime_rs::Service, F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send, { + let ServiceOptions { name, qos } = options.into(); // SAFETY: Getting a zero-initialized value is always safe. let mut rcl_service = unsafe { rcl_get_zero_initialized_service() }; let type_support = ::get_type_support() as *const rosidl_service_type_support_t; - let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul { + let topic_c_string = CString::new(name).map_err(|err| RclrsError::StringContainsNul { err, - s: topic.into(), + s: name.into(), })?; // SAFETY: No preconditions for this function. - let service_options = unsafe { rcl_service_get_default_options() }; + let mut service_options = unsafe { rcl_service_get_default_options() }; + service_options.qos = qos.into(); { let rcl_node = node_handle.rcl_node.lock().unwrap(); @@ -181,7 +202,39 @@ where } } -impl ServiceBase for Service +/// `ServiceOptions are used by [`Node::create_service`][1] to initialize a +/// [`Service`] provider. +/// +/// [1]: crate::NodeState::create_service +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ServiceOptions<'a> { + /// The name for the service + pub name: &'a str, + /// The quality of service profile for the service. + pub qos: QoSProfile, +} + +impl<'a> ServiceOptions<'a> { + /// Initialize a new [`ServiceOptions`] with default settings. + pub fn new(name: &'a str) -> Self { + Self { + name, + qos: QoSProfile::services_default(), + } + } +} + +impl<'a, T: IntoPrimitiveOptions<'a>> From for ServiceOptions<'a> { + fn from(value: T) -> Self { + let primitive = value.into_primitive_options(); + let mut options = Self::new(primitive.name); + primitive.apply(&mut options.qos); + options + } +} + +impl ServiceBase for ServiceState where T: rosidl_runtime_rs::Service, { diff --git a/rclrs/src/subscription.rs b/rclrs/src/subscription.rs index fbd518c21..bb861f950 100644 --- a/rclrs/src/subscription.rs +++ b/rclrs/src/subscription.rs @@ -10,7 +10,7 @@ use crate::{ error::{RclReturnCode, ToResult}, qos::QoSProfile, rcl_bindings::*, - NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX, + IntoPrimitiveOptions, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX, }; mod callback; @@ -64,21 +64,32 @@ pub trait SubscriptionBase: Send + Sync { /// Struct for receiving messages of type `T`. /// +/// Create a subscription using [`Node::create_subscription`][1]. +/// /// There can be multiple subscriptions for the same topic, in different nodes or the same node. +/// A clone of a `Subscription` will refer to the same subscription instance as the original. +/// The underlying instance is tied to [`SubscriptionState`] which implements the [`Subscription`] API. /// -/// Receiving messages requires calling [`spin_once`][1] or [`spin`][2] on the subscription's node. +/// Receiving messages requires the node's executor to [spin][2]. /// /// When a subscription is created, it may take some time to get "matched" with a corresponding /// publisher. /// -/// The only available way to instantiate subscriptions is via [`Node::create_subscription()`][3], this -/// is to ensure that [`Node`][4]s can track all the subscriptions that have been created. +/// [1]: crate::NodeState::create_subscription +/// [2]: crate::Executor::spin +pub type Subscription = Arc>; + +/// The inner state of a [`Subscription`]. +/// +/// This is public so that you can choose to create a [`Weak`][1] reference to it +/// if you want to be able to refer to a [`Subscription`] in a non-owning way. It is +/// generally recommended to manage the `SubscriptionState` inside of an [`Arc`], +/// and [`Subscription`] is provided as a convenience alias for that. +/// +/// The public API of the [`Subscription`] type is implemented via `SubscriptionState`. /// -/// [1]: crate::spin_once -/// [2]: crate::spin -/// [3]: crate::Node::create_subscription -/// [4]: crate::Node -pub struct Subscription +/// [1]: std::sync::Weak +pub struct SubscriptionState where T: Message, { @@ -88,15 +99,14 @@ where message: PhantomData, } -impl Subscription +impl SubscriptionState where T: Message, { /// Creates a new subscription. - pub(crate) fn new( + pub(crate) fn new<'a, Args>( node_handle: Arc, - topic: &str, - qos: QoSProfile, + options: impl Into>, callback: impl SubscriptionCallback, ) -> Result // This uses pub(crate) visibility to avoid instantiating this struct outside @@ -104,6 +114,7 @@ where where T: Message, { + let SubscriptionOptions { topic, qos } = options.into(); // SAFETY: Getting a zero-initialized value is always safe. let mut rcl_subscription = unsafe { rcl_get_zero_initialized_subscription() }; let type_support = @@ -114,8 +125,8 @@ where })?; // SAFETY: No preconditions for this function. - let mut subscription_options = unsafe { rcl_subscription_get_default_options() }; - subscription_options.qos = qos.into(); + let mut rcl_subscription_options = unsafe { rcl_subscription_get_default_options() }; + rcl_subscription_options.qos = qos.into(); { let rcl_node = node_handle.rcl_node.lock().unwrap(); @@ -132,7 +143,7 @@ where &*rcl_node, type_support, topic_c_string.as_ptr(), - &subscription_options, + &rcl_subscription_options, ) .ok()?; } @@ -237,7 +248,7 @@ where /// for more information. /// /// [1]: crate::RclrsError - /// [2]: crate::Publisher::borrow_loaned_message + /// [2]: crate::PublisherState::borrow_loaned_message pub fn take_loaned(&self) -> Result<(ReadOnlyLoanedMessage<'_, T>, MessageInfo), RclrsError> { let mut msg_ptr = std::ptr::null_mut(); let mut message_info = unsafe { rmw_get_zero_initialized_message_info() }; @@ -263,7 +274,39 @@ where } } -impl SubscriptionBase for Subscription +/// `SubscriptionOptions` are used by [`Node::create_subscription`][1] +/// to initialize a [`Subscription`]. +/// +/// [1]: crate::NodeState::create_subscription +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct SubscriptionOptions<'a> { + /// The topic name for the subscription. + pub topic: &'a str, + /// The quality of service settings for the subscription. + pub qos: QoSProfile, +} + +impl<'a> SubscriptionOptions<'a> { + /// Initialize a new [`SubscriptionOptions`] with default settings. + pub fn new(topic: &'a str) -> Self { + Self { + topic, + qos: QoSProfile::topics_default(), + } + } +} + +impl<'a, T: IntoPrimitiveOptions<'a>> From for SubscriptionOptions<'a> { + fn from(value: T) -> Self { + let primitive = value.into_primitive_options(); + let mut options = Self::new(primitive.name); + primitive.apply(&mut options.qos); + options + } +} + +impl SubscriptionBase for SubscriptionState where T: Message, { @@ -332,27 +375,23 @@ mod tests { #[test] fn test_subscriptions() -> Result<(), RclrsError> { - use crate::{TopicEndpointInfo, QOS_PROFILE_SYSTEM_DEFAULT}; + use crate::TopicEndpointInfo; let namespace = "/test_subscriptions_graph"; let graph = construct_test_graph(namespace)?; - let node_2_empty_subscription = graph.node2.create_subscription::( - "graph_test_topic_1", - QOS_PROFILE_SYSTEM_DEFAULT, - |_msg: msg::Empty| {}, - )?; + let node_2_empty_subscription = graph + .node2 + .create_subscription::("graph_test_topic_1", |_msg: msg::Empty| {})?; let topic1 = node_2_empty_subscription.topic_name(); let node_2_basic_types_subscription = graph.node2.create_subscription::( "graph_test_topic_2", - QOS_PROFILE_SYSTEM_DEFAULT, |_msg: msg::BasicTypes| {}, )?; let topic2 = node_2_basic_types_subscription.topic_name(); let node_1_defaults_subscription = graph.node1.create_subscription::( "graph_test_topic_3", - QOS_PROFILE_SYSTEM_DEFAULT, |_msg: msg::Defaults| {}, )?; let topic3 = node_1_defaults_subscription.topic_name(); diff --git a/rclrs/src/subscription/readonly_loaned_message.rs b/rclrs/src/subscription/readonly_loaned_message.rs index df4ad6a5b..d287cc1b8 100644 --- a/rclrs/src/subscription/readonly_loaned_message.rs +++ b/rclrs/src/subscription/readonly_loaned_message.rs @@ -2,7 +2,7 @@ use std::ops::Deref; use rosidl_runtime_rs::Message; -use crate::{rcl_bindings::*, Subscription, ToResult}; +use crate::{rcl_bindings::*, SubscriptionState, ToResult}; /// A message that is owned by the middleware, loaned out for reading. /// @@ -10,7 +10,7 @@ use crate::{rcl_bindings::*, Subscription, ToResult}; /// message, it's the same as `&T`, and otherwise it's the corresponding RMW-native /// message. /// -/// This type is returned by [`Subscription::take_loaned()`] and may be used in +/// This type is returned by [`SubscriptionState::take_loaned()`] and may be used in /// subscription callbacks. /// /// The loan is returned by dropping the `ReadOnlyLoanedMessage`. @@ -19,7 +19,7 @@ where T: Message, { pub(super) msg_ptr: *const T::RmwMsg, - pub(super) subscription: &'a Subscription, + pub(super) subscription: &'a SubscriptionState, } impl<'a, T> Deref for ReadOnlyLoanedMessage<'a, T> diff --git a/rclrs/src/test_helpers/graph_helpers.rs b/rclrs/src/test_helpers/graph_helpers.rs index 1e9b581ae..8596cebd1 100644 --- a/rclrs/src/test_helpers/graph_helpers.rs +++ b/rclrs/src/test_helpers/graph_helpers.rs @@ -1,19 +1,14 @@ -use crate::{Context, Node, NodeBuilder, RclrsError}; -use std::sync::Arc; +use crate::{Context, Node, NodeOptions, RclrsError}; pub(crate) struct TestGraph { - pub node1: Arc, - pub node2: Arc, + pub node1: Node, + pub node2: Node, } pub(crate) fn construct_test_graph(namespace: &str) -> Result { - let context = Context::new([])?; + let executor = Context::default().create_basic_executor(); Ok(TestGraph { - node1: NodeBuilder::new(&context, "graph_test_node_1") - .namespace(namespace) - .build()?, - node2: NodeBuilder::new(&context, "graph_test_node_2") - .namespace(namespace) - .build()?, + node1: executor.create_node(NodeOptions::new("graph_test_node_1").namespace(namespace))?, + node2: executor.create_node(NodeOptions::new("graph_test_node_2").namespace(namespace))?, }) } diff --git a/rclrs/src/time_source.rs b/rclrs/src/time_source.rs index a6b600800..c8e6341b5 100644 --- a/rclrs/src/time_source.rs +++ b/rclrs/src/time_source.rs @@ -1,20 +1,19 @@ use crate::{ clock::{Clock, ClockSource, ClockType}, vendor::rosgraph_msgs::msg::Clock as ClockMsg, - Node, QoSProfile, ReadOnlyParameter, Subscription, QOS_PROFILE_CLOCK, + IntoPrimitiveOptions, Node, QoSProfile, ReadOnlyParameter, Subscription, QOS_PROFILE_CLOCK, }; -use std::sync::{Arc, Mutex, RwLock, Weak}; +use std::sync::{Arc, Mutex, RwLock}; /// Time source for a node that drives the attached clock. /// If the node's `use_sim_time` parameter is set to `true`, the `TimeSource` will subscribe /// to the `/clock` topic and drive the attached clock pub(crate) struct TimeSource { - node: Mutex>, clock: RwLock, clock_source: Arc>>, requested_clock_type: ClockType, clock_qos: QoSProfile, - clock_subscription: Mutex>>>, + clock_subscription: Mutex>>, last_received_time: Arc>>, // TODO(luca) Make this parameter editable when we have parameter callbacks implemented and can // safely change clock type at runtime @@ -60,7 +59,6 @@ impl TimeSourceBuilder { ClockType::SteadyTime => Clock::steady(), }; TimeSource { - node: Mutex::new(Weak::new()), clock: RwLock::new(clock), clock_source: Arc::new(Mutex::new(None)), requested_clock_type: self.clock_type, @@ -85,7 +83,7 @@ impl TimeSource { /// Attaches the given node to to the `TimeSource`, using its interface to read the /// `use_sim_time` parameter and create the clock subscription. - pub(crate) fn attach_node(&self, node: &Arc) { + pub(crate) fn attach_node(&self, node: &Node) { // TODO(luca) Make this parameter editable and register a parameter callback // that calls set_ros_time(bool) once parameter callbacks are implemented. let param = node @@ -93,12 +91,11 @@ impl TimeSource { .default(false) .read_only() .unwrap(); - *self.node.lock().unwrap() = Arc::downgrade(node); - self.set_ros_time_enable(param.get()); + self.set_ros_time_enable(node, param.get()); *self.use_sim_time.lock().unwrap() = Some(param); } - fn set_ros_time_enable(&self, enable: bool) { + fn set_ros_time_enable(&self, node: &Node, enable: bool) { if matches!(self.requested_clock_type, ClockType::RosTime) { let mut clock = self.clock.write().unwrap(); if enable && matches!(clock.clock_type(), ClockType::SystemTime) { @@ -108,7 +105,7 @@ impl TimeSource { } *clock = new_clock; *self.clock_source.lock().unwrap() = Some(clock_source); - *self.clock_subscription.lock().unwrap() = Some(self.create_clock_sub()); + *self.clock_subscription.lock().unwrap() = Some(self.create_clock_sub(node)); } if !enable && matches!(clock.clock_type(), ClockType::RosTime) { *clock = Clock::system(); @@ -122,47 +119,53 @@ impl TimeSource { clock.set_ros_time_override(nanoseconds); } - fn create_clock_sub(&self) -> Arc> { + fn create_clock_sub(&self, node: &Node) -> Subscription { let clock = self.clock_source.clone(); let last_received_time = self.last_received_time.clone(); // Safe to unwrap since the function will only fail if invalid arguments are provided - self.node - .lock() - .unwrap() - .upgrade() - .unwrap() - .create_subscription::("/clock", self.clock_qos, move |msg: ClockMsg| { + node.create_subscription::( + "/clock".qos(self.clock_qos), + move |msg: ClockMsg| { let nanoseconds: i64 = (msg.clock.sec as i64 * 1_000_000_000) + msg.clock.nanosec as i64; *last_received_time.lock().unwrap() = Some(nanoseconds); if let Some(clock) = clock.lock().unwrap().as_mut() { Self::update_clock(clock, nanoseconds); } - }) - .unwrap() + }, + ) + .unwrap() } } #[cfg(test)] mod tests { - use crate::{create_node, Context}; + use crate::{Context, InitOptions}; #[test] fn time_source_default_clock() { - let node = create_node(&Context::new([]).unwrap(), "test_node").unwrap(); + let node = Context::default() + .create_basic_executor() + .create_node("test_node") + .unwrap(); // Default clock should be above 0 (use_sim_time is default false) assert!(node.get_clock().now().nsec > 0); } #[test] fn time_source_sim_time() { - let ctx = Context::new([ - String::from("--ros-args"), - String::from("-p"), - String::from("use_sim_time:=true"), - ]) - .unwrap(); - let node = create_node(&ctx, "test_node").unwrap(); + let executor = Context::new( + [ + String::from("--ros-args"), + String::from("-p"), + String::from("use_sim_time:=true"), + ], + InitOptions::default(), + ) + .unwrap() + .create_basic_executor(); + + let node = executor.create_node("test_node").unwrap(); // Default sim time value should be 0 (no message received) assert_eq!(node.get_clock().now().nsec, 0); } diff --git a/rclrs/src/wait.rs b/rclrs/src/wait.rs index 243c9d857..9eed11101 100644 --- a/rclrs/src/wait.rs +++ b/rclrs/src/wait.rs @@ -20,7 +20,7 @@ use std::{sync::Arc, time::Duration, vec::Vec}; use crate::{ error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult}, rcl_bindings::*, - ClientBase, Context, ContextHandle, Node, ServiceBase, SubscriptionBase, + ClientBase, Context, ContextHandle, NodeState, ServiceBase, SubscriptionBase, }; mod exclusivity_guard; @@ -133,7 +133,7 @@ impl WaitSet { /// Creates a new wait set and adds all waitable entities in the node to it. /// /// The wait set is sized to fit the node exactly, so there is no capacity for adding other entities. - pub fn new_for_node(node: &Node) -> Result { + pub fn new_for_node(node: &NodeState) -> Result { let live_subscriptions = node.live_subscriptions(); let live_clients = node.live_clients(); let live_guard_conditions = node.live_guard_conditions(); @@ -427,7 +427,7 @@ mod tests { #[test] fn guard_condition_in_wait_set_readies() -> Result<(), RclrsError> { - let context = Context::new([])?; + let context = Context::default(); let guard_condition = Arc::new(GuardCondition::new(&context)); diff --git a/rclrs/src/wait/guard_condition.rs b/rclrs/src/wait/guard_condition.rs index 92a6acd00..d9d8b62d9 100644 --- a/rclrs/src/wait/guard_condition.rs +++ b/rclrs/src/wait/guard_condition.rs @@ -16,7 +16,7 @@ use crate::{rcl_bindings::*, Context, ContextHandle, RclrsError, ToResult}; /// # use rclrs::{Context, GuardCondition, WaitSet, RclrsError}; /// # use std::sync::{Arc, atomic::Ordering}; /// -/// let context = Context::new([])?; +/// let context = Context::default(); /// /// let atomic_bool = Arc::new(std::sync::atomic::AtomicBool::new(false)); /// let atomic_bool_for_closure = Arc::clone(&atomic_bool); @@ -162,7 +162,7 @@ mod tests { #[test] fn test_guard_condition() -> Result<(), RclrsError> { - let context = Context::new([])?; + let context = Context::default(); let atomic_bool = Arc::new(std::sync::atomic::AtomicBool::new(false)); let atomic_bool_for_closure = Arc::clone(&atomic_bool); @@ -180,7 +180,7 @@ mod tests { #[test] fn test_guard_condition_wait() -> Result<(), RclrsError> { - let context = Context::new([])?; + let context = Context::default(); let atomic_bool = Arc::new(std::sync::atomic::AtomicBool::new(false)); let atomic_bool_for_closure = Arc::clone(&atomic_bool);