diff --git a/examples/message_demo/src/message_demo.rs b/examples/message_demo/src/message_demo.rs index 31b791ae..e66d6ef0 100644 --- a/examples/message_demo/src/message_demo.rs +++ b/examples/message_demo/src/message_demo.rs @@ -143,23 +143,19 @@ fn demonstrate_pubsub() -> Result<(), Error> { let mut executor = Context::default_from_env()?.create_basic_executor(); let node = executor.create_node("message_demo")?; - let idiomatic_publisher = node - .create_publisher::("topic", QOS_PROFILE_DEFAULT)?; - let direct_publisher = node.create_publisher::( - "topic", - QOS_PROFILE_DEFAULT, - )?; + let idiomatic_publisher = + node.create_publisher::("topic")?; + let direct_publisher = + node.create_publisher::("topic")?; let _idiomatic_subscription = node .create_subscription::( "topic", - QOS_PROFILE_DEFAULT, move |_msg: rclrs_example_msgs::msg::VariousTypes| println!("Got idiomatic message!"), )?; let _direct_subscription = node .create_subscription::( "topic", - QOS_PROFILE_DEFAULT, move |_msg: rclrs_example_msgs::msg::rmw::VariousTypes| { println!("Got RMW-native message!") }, diff --git a/examples/minimal_pub_sub/src/minimal_publisher.rs b/examples/minimal_pub_sub/src/minimal_publisher.rs index e74f4e05..f6f5d54d 100644 --- a/examples/minimal_pub_sub/src/minimal_publisher.rs +++ b/examples/minimal_pub_sub/src/minimal_publisher.rs @@ -7,7 +7,7 @@ fn main() -> Result<(), Error> { let node = executor.create_node("minimal_publisher")?; - let publisher = node.create_publisher::("topic", QOS_PROFILE_DEFAULT)?; + let publisher = node.create_publisher::("topic")?; let mut message = std_msgs::msg::String::default(); diff --git a/examples/minimal_pub_sub/src/minimal_subscriber.rs b/examples/minimal_pub_sub/src/minimal_subscriber.rs index f835bb2b..a29b23e5 100644 --- a/examples/minimal_pub_sub/src/minimal_subscriber.rs +++ b/examples/minimal_pub_sub/src/minimal_subscriber.rs @@ -11,7 +11,6 @@ fn main() -> Result<(), Error> { let _subscription = node.create_subscription::( "topic", - QOS_PROFILE_DEFAULT, move |msg: std_msgs::msg::String| { num_messages += 1; println!("I heard: '{}'", msg.data); diff --git a/examples/minimal_pub_sub/src/minimal_two_nodes.rs b/examples/minimal_pub_sub/src/minimal_two_nodes.rs index 72eb5512..7c25f313 100644 --- a/examples/minimal_pub_sub/src/minimal_two_nodes.rs +++ b/examples/minimal_pub_sub/src/minimal_two_nodes.rs @@ -26,7 +26,6 @@ impl MinimalSubscriber { .node .create_subscription::( topic, - QOS_PROFILE_DEFAULT, move |msg: std_msgs::msg::String| { minimal_subscriber_aux.callback(msg); }, @@ -55,8 +54,7 @@ fn main() -> Result<(), Error> { let _subscriber_node_two = MinimalSubscriber::new(&executor, "minimal_subscriber_two", "topic")?; - let publisher = - publisher_node.create_publisher::("topic", QOS_PROFILE_DEFAULT)?; + let publisher = publisher_node.create_publisher::("topic")?; std::thread::spawn(move || -> Result<(), RclrsError> { let mut message = std_msgs::msg::String::default(); diff --git a/examples/minimal_pub_sub/src/zero_copy_publisher.rs b/examples/minimal_pub_sub/src/zero_copy_publisher.rs index e950ab8d..0de7dcb5 100644 --- a/examples/minimal_pub_sub/src/zero_copy_publisher.rs +++ b/examples/minimal_pub_sub/src/zero_copy_publisher.rs @@ -7,8 +7,7 @@ fn main() -> Result<(), Error> { let node = executor.create_node("minimal_publisher")?; - let publisher = - node.create_publisher::("topic", QOS_PROFILE_DEFAULT)?; + let publisher = node.create_publisher::("topic")?; let mut publish_count: u32 = 1; diff --git a/examples/minimal_pub_sub/src/zero_copy_subscriber.rs b/examples/minimal_pub_sub/src/zero_copy_subscriber.rs index 47d87f11..b3fa72e7 100644 --- a/examples/minimal_pub_sub/src/zero_copy_subscriber.rs +++ b/examples/minimal_pub_sub/src/zero_copy_subscriber.rs @@ -10,8 +10,7 @@ fn main() -> Result<(), Error> { let _subscription = node.create_subscription::( "topic", - QOS_PROFILE_DEFAULT, - move |msg: ReadOnlyLoanedMessage<'_, std_msgs::msg::UInt32>| { + move |msg: rclrs::ReadOnlyLoanedMessage<'_, std_msgs::msg::UInt32>| { num_messages += 1; println!("I heard: '{}'", msg.data); println!("(Got {} messages so far)", num_messages); diff --git a/examples/rust_pubsub/src/simple_publisher.rs b/examples/rust_pubsub/src/simple_publisher.rs index ae0739e1..ae599c0b 100644 --- a/examples/rust_pubsub/src/simple_publisher.rs +++ b/examples/rust_pubsub/src/simple_publisher.rs @@ -9,9 +9,7 @@ struct SimplePublisher { impl SimplePublisher { fn new(executor: &Executor) -> Result { let node = executor.create_node("simple_publisher").unwrap(); - let publisher = node - .create_publisher("publish_hello", QOS_PROFILE_DEFAULT) - .unwrap(); + let publisher = node.create_publisher("publish_hello").unwrap(); Ok(Self { publisher }) } diff --git a/examples/rust_pubsub/src/simple_subscriber.rs b/examples/rust_pubsub/src/simple_subscriber.rs index b0d26dea..acda1aea 100644 --- a/examples/rust_pubsub/src/simple_subscriber.rs +++ b/examples/rust_pubsub/src/simple_subscriber.rs @@ -17,13 +17,9 @@ impl SimpleSubscriptionNode { let data: Arc>> = Arc::new(Mutex::new(None)); let data_mut: Arc>> = Arc::clone(&data); let _subscriber = node - .create_subscription::( - "publish_hello", - QOS_PROFILE_DEFAULT, - move |msg: StringMsg| { - *data_mut.lock().unwrap() = Some(msg); - }, - ) + .create_subscription::("publish_hello", move |msg: StringMsg| { + *data_mut.lock().unwrap() = Some(msg); + }) .unwrap(); Ok(Self { _subscriber, data }) } diff --git a/rclrs/src/client.rs b/rclrs/src/client.rs index 5b27fc66..f02e44e7 100644 --- a/rclrs/src/client.rs +++ b/rclrs/src/client.rs @@ -11,7 +11,8 @@ use rosidl_runtime_rs::Message; use crate::{ error::{RclReturnCode, ToResult}, rcl_bindings::*, - MessageCow, Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX, + IntoPrimitiveOptions, MessageCow, Node, NodeHandle, QoSProfile, RclrsError, + ENTITY_LIFECYCLE_MUTEX, }; // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread @@ -87,23 +88,29 @@ where T: rosidl_runtime_rs::Service, { /// Creates a new client. - pub(crate) fn new(node: &Arc, topic: &str) -> Result + pub(crate) fn new<'a>( + node: &Arc, + options: impl Into>, + ) -> Result // This uses pub(crate) visibility to avoid instantiating this struct outside // [`Node::create_client`], see the struct's documentation for the rationale where T: rosidl_runtime_rs::Service, { + let ClientOptions { service_name, qos } = options.into(); // SAFETY: Getting a zero-initialized value is always safe. let mut rcl_client = unsafe { rcl_get_zero_initialized_client() }; let type_support = ::get_type_support() as *const rosidl_service_type_support_t; - let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul { - err, - s: topic.into(), - })?; + let topic_c_string = + CString::new(service_name).map_err(|err| RclrsError::StringContainsNul { + err, + s: service_name.into(), + })?; // SAFETY: No preconditions for this function. - let client_options = unsafe { rcl_client_get_default_options() }; + let mut client_options = unsafe { rcl_client_get_default_options() }; + client_options.qos = qos.into(); { let rcl_node = node.handle.rcl_node.lock().unwrap(); @@ -280,6 +287,38 @@ where } } +/// `ClientOptions` are used by [`Node::create_client`][1] to initialize a +/// [`Client`] for a service. +/// +/// [1]: crate::Node::create_client +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ClientOptions<'a> { + /// The name of the service that this client will send requests to + pub service_name: &'a str, + /// The quality of the service profile for this client + pub qos: QoSProfile, +} + +impl<'a> ClientOptions<'a> { + /// Initialize a new [`ClientOptions`] with default settings. + pub fn new(service_name: &'a str) -> Self { + Self { + service_name, + qos: QoSProfile::services_default(), + } + } +} + +impl<'a, T: IntoPrimitiveOptions<'a>> From for ClientOptions<'a> { + fn from(value: T) -> Self { + let primitive = value.into_primitive_options(); + let mut options = Self::new(primitive.name); + primitive.apply_to(&mut options.qos); + options + } +} + impl ClientBase for Client where T: rosidl_runtime_rs::Service, diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index 21532c8e..6e7dc597 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -1,5 +1,12 @@ mod graph; +pub use graph::*; + mod node_options; +pub use node_options::*; + +mod primitive_options; +pub use primitive_options::*; + use std::{ cmp::PartialEq, ffi::CStr, @@ -11,12 +18,12 @@ use std::{ use rosidl_runtime_rs::Message; -pub use self::{graph::*, node_options::*}; use crate::{ - rcl_bindings::*, Client, ClientBase, Clock, ContextHandle, GuardCondition, LogParams, Logger, - ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Publisher, QoSProfile, - RclrsError, Service, ServiceBase, Subscription, SubscriptionBase, SubscriptionCallback, - TimeSource, ToLogParams, ENTITY_LIFECYCLE_MUTEX, + rcl_bindings::*, Client, ClientBase, ClientOptions, Clock, ContextHandle, GuardCondition, + LogParams, Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, + Publisher, PublisherOptions, RclrsError, Service, ServiceBase, ServiceOptions, Subscription, + SubscriptionBase, SubscriptionCallback, SubscriptionOptions, TimeSource, ToLogParams, + ENTITY_LIFECYCLE_MUTEX, }; // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread @@ -221,15 +228,49 @@ impl Node { unsafe { call_string_getter_with_rcl_node(&rcl_node, getter) } } - /// Creates a [`Client`][1]. + /// Creates a [`Client`]. + /// + /// Pass in only the service name for the `options` argument to use all default client options: + /// ``` + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// let client = node.create_client::( + /// "my_service" + /// ) + /// .unwrap(); + /// ``` + /// + /// Take advantage of the [`IntoPrimitiveOptions`] API to easily build up the + /// client options: + /// + /// ``` + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// let client = node.create_client::( + /// "my_service" + /// .keep_all() + /// .transient_local() + /// ) + /// .unwrap(); + /// ``` /// - /// [1]: crate::Client - // TODO: make client's lifetime depend on node's lifetime - pub fn create_client(self: &Arc, topic: &str) -> Result>, RclrsError> + /// Any quality of service options that you explicitly specify will override + /// the default service options. Any that you do not explicitly specify will + /// remain the default service options. Note that clients are generally + /// expected to use [reliable][1], so it's best not to change the reliability + /// setting unless you know what you are doing. + /// + /// [1]: crate::QoSReliabilityPolicy::Reliable + pub fn create_client<'a, T>( + self: &Arc, + options: impl Into>, + ) -> Result>, RclrsError> where T: rosidl_runtime_rs::Service, { - let client = Arc::new(Client::::new(self, topic)?); + let client = Arc::new(Client::::new(self, options)?); { self.clients_mtx.lock().unwrap() }.push(Arc::downgrade(&client) as Weak); Ok(client) } @@ -273,52 +314,156 @@ impl Node { guard_condition } - /// Creates a [`Publisher`][1]. + /// Pass in only the topic name for the `options` argument to use all default publisher options: + /// ``` + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// let publisher = node.create_publisher::( + /// "my_topic" + /// ) + /// .unwrap(); + /// ``` + /// + /// Take advantage of the [`IntoPrimitiveOptions`] API to easily build up the + /// publisher options: + /// + /// ``` + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// let publisher = node.create_publisher::( + /// "my_topic" + /// .keep_last(100) + /// .transient_local() + /// ) + /// .unwrap(); + /// + /// let reliable_publisher = node.create_publisher::( + /// "my_topic" + /// .reliable() + /// ) + /// .unwrap(); + /// ``` /// - /// [1]: crate::Publisher - pub fn create_publisher( + pub fn create_publisher<'a, T>( &self, - topic: &str, - qos: QoSProfile, + options: impl Into>, ) -> Result>, RclrsError> where T: Message, { - let publisher = Arc::new(Publisher::::new(Arc::clone(&self.handle), topic, qos)?); + let publisher = Arc::new(Publisher::::new(Arc::clone(&self.handle), options)?); Ok(publisher) } /// Creates a [`Service`][1]. /// + /// Pass in only the service name for the `options` argument to use all default service options: + /// ``` + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// let service = node.create_service::( + /// "my_service", + /// |_info, _request| { + /// println!("Received request!"); + /// test_msgs::srv::Empty_Response::default() + /// }, + /// ); + /// ``` + /// + /// Take advantage of the [`IntoPrimitiveOptions`] API to easily build up the + /// service options: + /// + /// ``` + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// let service = node.create_service::( + /// "my_service" + /// .keep_all() + /// .transient_local(), + /// |_info, _request| { + /// println!("Received request!"); + /// test_msgs::srv::Empty_Response::default() + /// }, + /// ); + /// ``` + /// + /// Any quality of service options that you explicitly specify will override + /// the default service options. Any that you do not explicitly specify will + /// remain the default service options. Note that services are generally + /// expected to use [reliable][2], so it's best not to change the reliability + /// setting unless you know what you are doing. + /// /// [1]: crate::Service - pub fn create_service( + /// [2]: crate::QoSReliabilityPolicy::Reliable + pub fn create_service<'a, T, F>( self: &Arc, - topic: &str, + options: impl Into>, callback: F, ) -> Result>, RclrsError> where T: rosidl_runtime_rs::Service, F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send, { - let service = Arc::new(Service::::new(self, topic, callback)?); + let service = Arc::new(Service::::new(self, options, callback)?); { self.services_mtx.lock().unwrap() } .push(Arc::downgrade(&service) as Weak); Ok(service) } - /// Creates a [`Subscription`][1]. + /// Creates a [`Subscription`]. + /// /// - /// [1]: crate::Subscription - pub fn create_subscription( + /// Pass in only the topic name for the `options` argument to use all default subscription options: + /// ``` + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// let subscription = node.create_subscription( + /// "my_topic", + /// |_msg: test_msgs::msg::Empty| { + /// println!("Received message!"); + /// }, + /// ); + /// ``` + /// + /// Take advantage of the [`IntoPrimitiveOptions`] API to easily build up the + /// subscription options: + /// + /// ``` + /// # use rclrs::*; + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// let subscription = node.create_subscription( + /// "my_topic" + /// .keep_last(100) + /// .transient_local(), + /// |_msg: test_msgs::msg::Empty| { + /// println!("Received message!"); + /// }, + /// ); + /// + /// let reliable_subscription = node.create_subscription( + /// "my_reliable_topic" + /// .reliable(), + /// |_msg: test_msgs::msg::Empty| { + /// println!("Received message!"); + /// }, + /// ); + /// ``` + /// + pub fn create_subscription<'a, T, Args>( self: &Arc, - topic: &str, - qos: QoSProfile, + options: impl Into>, callback: impl SubscriptionCallback, ) -> Result>, RclrsError> where T: Message, { - let subscription = Arc::new(Subscription::::new(self, topic, qos, callback)?); + let subscription = Arc::new(Subscription::::new(self, options, callback)?); { self.subscriptions_mtx.lock() } .unwrap() .push(Arc::downgrade(&subscription) as Weak); @@ -463,8 +608,7 @@ pub(crate) unsafe fn call_string_getter_with_rcl_node( #[cfg(test)] mod tests { - use super::*; - use crate::test_helpers::*; + use crate::{test_helpers::*, *}; #[test] fn traits() { @@ -474,25 +618,20 @@ mod tests { #[test] fn test_topic_names_and_types() -> Result<(), RclrsError> { - use crate::QOS_PROFILE_SYSTEM_DEFAULT; use test_msgs::msg; let graph = construct_test_graph("test_topics_graph")?; let _node_1_defaults_subscription = graph.node1.create_subscription::( "graph_test_topic_3", - QOS_PROFILE_SYSTEM_DEFAULT, |_msg: msg::Defaults| {}, )?; - let _node_2_empty_subscription = graph.node2.create_subscription::( - "graph_test_topic_1", - QOS_PROFILE_SYSTEM_DEFAULT, - |_msg: msg::Empty| {}, - )?; + let _node_2_empty_subscription = graph + .node2 + .create_subscription::("graph_test_topic_1", |_msg: msg::Empty| {})?; let _node_2_basic_types_subscription = graph.node2.create_subscription::( "graph_test_topic_2", - QOS_PROFILE_SYSTEM_DEFAULT, |_msg: msg::BasicTypes| {}, )?; diff --git a/rclrs/src/node/primitive_options.rs b/rclrs/src/node/primitive_options.rs new file mode 100644 index 00000000..60973570 --- /dev/null +++ b/rclrs/src/node/primitive_options.rs @@ -0,0 +1,287 @@ +use crate::{ + QoSDurabilityPolicy, QoSDuration, QoSHistoryPolicy, QoSLivelinessPolicy, QoSProfile, + QoSReliabilityPolicy, +}; + +use std::{borrow::Borrow, time::Duration}; + +/// `PrimitiveOptions` are the subset of options that are relevant across all +/// primitives (e.g. [`Subscription`][1], [`Publisher`][2], [`Client`][3], and +/// [`Service`][4]). +/// +/// Each different primitive type may have its own defaults for the overall +/// quality of service settings, and we cannot know what the default will be +/// until the `PrimitiveOptions` gets converted into the more specific set of +/// options. Therefore we store each quality of service field separately so that +/// we will only override the settings that the user explicitly asked for, and +/// the rest will be determined by the default settings for each primitive. +/// +/// [1]: crate::Subscription +/// [2]: crate::Publisher +/// [3]: crate::Client +/// [4]: crate::Service +#[derive(Debug, Clone, Copy)] +#[non_exhaustive] +pub struct PrimitiveOptions<'a> { + /// The name that will be used for the primitive + pub name: &'a str, + /// Override the default [`QoSProfile::history`] for the primitive. + pub history: Option, + /// Override the default [`QoSProfile::reliability`] for the primitive. + pub reliability: Option, + /// Override the default [`QoSProfile::durability`] for the primitive. + pub durability: Option, + /// Override the default [`QoSProfile::deadline`] for the primitive. + pub deadline: Option, + /// Override the default [`QoSProfile::lifespan`] for the primitive. + pub lifespan: Option, + /// Override the default [`QoSProfile::liveliness`] for the primitive. + pub liveliness: Option, + /// Override the default [`QoSProfile::liveliness_lease`] for the primitive. + pub liveliness_lease: Option, + /// Override the default [`QoSProfile::avoid_ros_namespace_conventions`] for the primitive. + pub avoid_ros_namespace_conventions: Option, +} + +/// Trait to implicitly convert a compatible object into [`PrimitiveOptions`]. +pub trait IntoPrimitiveOptions<'a>: Sized { + /// Convert the object into [`PrimitiveOptions`] with default settings. + fn into_primitive_options(self) -> PrimitiveOptions<'a>; + + /// Override all the quality of service settings for the primitive. + fn qos(self, profile: QoSProfile) -> PrimitiveOptions<'a> { + let mut options = self + .into_primitive_options() + .history(profile.history) + .reliability(profile.reliability) + .durability(profile.durability) + .deadline(profile.deadline) + .lifespan(profile.lifespan) + .liveliness(profile.liveliness) + .liveliness_lease(profile.liveliness_lease); + + if profile.avoid_ros_namespace_conventions { + options.avoid_ros_namespace_conventions = Some(true); + } + + options + } + + /// Use the default topics quality of service profile. + fn topics_qos(self) -> PrimitiveOptions<'a> { + self.qos(QoSProfile::topics_default()) + } + + /// Use the default sensor data quality of service profile. + fn sensor_data_qos(self) -> PrimitiveOptions<'a> { + self.qos(QoSProfile::sensor_data_default()) + } + + /// Use the default services quality of service profile. + fn services_qos(self) -> PrimitiveOptions<'a> { + self.qos(QoSProfile::services_default()) + } + + /// Use the system-defined default quality of service profile. Topics and + /// services created with this default do not use the recommended ROS + /// defaults; they will instead use the default as defined by the underlying + /// RMW implementation (rmw_fastrtps, rmw_connextdds, etc). These defaults + /// may not always be appropriate for every use-case, and may be different + /// depending on which RMW implementation you are using, so use caution! + fn system_qos(self) -> PrimitiveOptions<'a> { + self.qos(QoSProfile::system_default()) + } + + /// Override the default [`QoSProfile::history`] for the primitive. + fn history(self, history: QoSHistoryPolicy) -> PrimitiveOptions<'a> { + let mut options = self.into_primitive_options(); + options.history = Some(history); + options + } + + /// Keep the last `depth` messages for the primitive. + fn keep_last(self, depth: u32) -> PrimitiveOptions<'a> { + self.history(QoSHistoryPolicy::KeepLast { depth }) + } + + /// Keep all messages for the primitive. + fn keep_all(self) -> PrimitiveOptions<'a> { + self.history(QoSHistoryPolicy::KeepAll) + } + + /// Override the default [`QoSProfile::reliability`] for the primitive. + fn reliability(self, reliability: QoSReliabilityPolicy) -> PrimitiveOptions<'a> { + let mut options = self.into_primitive_options(); + options.reliability = Some(reliability); + options + } + + /// Set the primitive to have [reliable][QoSReliabilityPolicy::Reliable] communication. + fn reliable(self) -> PrimitiveOptions<'a> { + self.reliability(QoSReliabilityPolicy::Reliable) + } + + /// Set the primitive to have [best-effort][QoSReliabilityPolicy::BestEffort] communication. + fn best_effort(self) -> PrimitiveOptions<'a> { + self.reliability(QoSReliabilityPolicy::BestEffort) + } + + /// Override the default [`QoSProfile::durability`] for the primitive. + fn durability(self, durability: QoSDurabilityPolicy) -> PrimitiveOptions<'a> { + let mut options = self.into_primitive_options(); + options.durability = Some(durability); + options + } + + /// Set the primitive to have [volatile][QoSDurabilityPolicy::Volatile] durability. + fn volatile(self) -> PrimitiveOptions<'a> { + self.durability(QoSDurabilityPolicy::Volatile) + } + + /// Set the primitive to have [transient local][QoSDurabilityPolicy::TransientLocal] durability. + fn transient_local(self) -> PrimitiveOptions<'a> { + self.durability(QoSDurabilityPolicy::TransientLocal) + } + + /// Override the default [`QoSProfile::lifespan`] for the primitive. + fn lifespan(self, lifespan: QoSDuration) -> PrimitiveOptions<'a> { + let mut options = self.into_primitive_options(); + options.lifespan = Some(lifespan); + options + } + + /// Set a custom duration for the [lifespan][QoSProfile::lifespan] of the primitive. + fn lifespan_duration(self, duration: Duration) -> PrimitiveOptions<'a> { + self.lifespan(QoSDuration::Custom(duration)) + } + + /// Make the [lifespan][QoSProfile::lifespan] of the primitive infinite. + fn infinite_lifespan(self) -> PrimitiveOptions<'a> { + self.lifespan(QoSDuration::Infinite) + } + + /// Override the default [`QoSProfile::deadline`] for the primitive. + fn deadline(self, deadline: QoSDuration) -> PrimitiveOptions<'a> { + let mut options = self.into_primitive_options(); + options.deadline = Some(deadline); + options + } + + /// Set the [`QoSProfile::deadline`] to a custom finite value. + fn deadline_duration(self, duration: Duration) -> PrimitiveOptions<'a> { + self.deadline(QoSDuration::Custom(duration)) + } + + /// Do not use a deadline for liveliness for this primitive. + fn no_deadline(self) -> PrimitiveOptions<'a> { + self.deadline(QoSDuration::Infinite) + } + + /// Override the default [`QoSProfile::liveliness`] for the primitive. + fn liveliness(self, liveliness: QoSLivelinessPolicy) -> PrimitiveOptions<'a> { + let mut options = self.into_primitive_options(); + options.liveliness = Some(liveliness); + options + } + + /// Set liveliness to [`QoSLivelinessPolicy::Automatic`]. + fn liveliness_automatic(self) -> PrimitiveOptions<'a> { + self.liveliness(QoSLivelinessPolicy::Automatic) + } + + /// Set liveliness to [`QoSLivelinessPolicy::ManualByTopic`] + fn liveliness_manual(self) -> PrimitiveOptions<'a> { + self.liveliness(QoSLivelinessPolicy::ManualByTopic) + } + + /// Override the default [`QoSProfile::liveliness_lease`] for the primitive. + fn liveliness_lease(self, lease: QoSDuration) -> PrimitiveOptions<'a> { + let mut options = self.into_primitive_options(); + options.liveliness_lease = Some(lease); + options + } + + /// Set a custom duration for the [liveliness lease][QoSProfile::liveliness_lease]. + fn liveliness_lease_duration(self, duration: Duration) -> PrimitiveOptions<'a> { + self.liveliness_lease(QoSDuration::Custom(duration)) + } + + /// [Avoid the ROS namespace conventions][1] for the primitive. + /// + /// [1]: QoSProfile::avoid_ros_namespace_conventions + fn avoid_ros_namespace_conventions(self) -> PrimitiveOptions<'a> { + let mut options = self.into_primitive_options(); + options.avoid_ros_namespace_conventions = Some(true); + options + } +} + +impl<'a> IntoPrimitiveOptions<'a> for PrimitiveOptions<'a> { + fn into_primitive_options(self) -> PrimitiveOptions<'a> { + self + } +} + +impl<'a> IntoPrimitiveOptions<'a> for &'a str { + fn into_primitive_options(self) -> PrimitiveOptions<'a> { + PrimitiveOptions::new(self) + } +} + +impl<'a, T: Borrow> IntoPrimitiveOptions<'a> for &'a T { + fn into_primitive_options(self) -> PrimitiveOptions<'a> { + self.borrow().into_primitive_options() + } +} + +impl<'a> PrimitiveOptions<'a> { + /// Begin building a new set of `PrimitiveOptions` with only the name set. + pub fn new(name: &'a str) -> Self { + Self { + name, + history: None, + reliability: None, + durability: None, + deadline: None, + lifespan: None, + liveliness: None, + liveliness_lease: None, + avoid_ros_namespace_conventions: None, + } + } + + /// Apply the user-specified options to a pre-initialized [`QoSProfile`]. + pub fn apply_to(&self, qos: &mut QoSProfile) { + if let Some(history) = self.history { + qos.history = history; + } + + if let Some(reliability) = self.reliability { + qos.reliability = reliability; + } + + if let Some(durability) = self.durability { + qos.durability = durability; + } + + if let Some(deadline) = self.deadline { + qos.deadline = deadline; + } + + if let Some(lifespan) = self.lifespan { + qos.lifespan = lifespan; + } + + if let Some(liveliness) = self.liveliness { + qos.liveliness = liveliness; + } + + if let Some(liveliness_lease) = self.liveliness_lease { + qos.liveliness_lease = liveliness_lease; + } + + if let Some(convention) = self.avoid_ros_namespace_conventions { + qos.avoid_ros_namespace_conventions = convention; + } + } +} diff --git a/rclrs/src/parameter/service.rs b/rclrs/src/parameter/service.rs index 7d22cb44..32917054 100644 --- a/rclrs/src/parameter/service.rs +++ b/rclrs/src/parameter/service.rs @@ -9,7 +9,7 @@ use rosidl_runtime_rs::Sequence; use super::ParameterMap; use crate::{ parameter::{DeclaredValue, ParameterKind, ParameterStorage}, - rmw_request_id_t, Node, RclrsError, Service, + rmw_request_id_t, IntoPrimitiveOptions, Node, QoSProfile, RclrsError, Service, }; // The variables only exist to keep a strong reference to the services and are technically unused. @@ -247,7 +247,7 @@ impl ParameterService { // destruction is made for the parameter map. let map = parameter_map.clone(); let describe_parameters_service = node.create_service( - &(fqn.clone() + "/describe_parameters"), + (fqn.clone() + "/describe_parameters").qos(QoSProfile::parameter_services_default()), move |_req_id: &rmw_request_id_t, req: DescribeParameters_Request| { let map = map.lock().unwrap(); describe_parameters(req, &map) @@ -255,7 +255,7 @@ impl ParameterService { )?; let map = parameter_map.clone(); let get_parameter_types_service = node.create_service( - &(fqn.clone() + "/get_parameter_types"), + (fqn.clone() + "/get_parameter_types").qos(QoSProfile::parameter_services_default()), move |_req_id: &rmw_request_id_t, req: GetParameterTypes_Request| { let map = map.lock().unwrap(); get_parameter_types(req, &map) @@ -263,7 +263,7 @@ impl ParameterService { )?; let map = parameter_map.clone(); let get_parameters_service = node.create_service( - &(fqn.clone() + "/get_parameters"), + (fqn.clone() + "/get_parameters").qos(QoSProfile::parameter_services_default()), move |_req_id: &rmw_request_id_t, req: GetParameters_Request| { let map = map.lock().unwrap(); get_parameters(req, &map) @@ -271,7 +271,7 @@ impl ParameterService { )?; let map = parameter_map.clone(); let list_parameters_service = node.create_service( - &(fqn.clone() + "/list_parameters"), + (fqn.clone() + "/list_parameters").qos(QoSProfile::parameter_services_default()), move |_req_id: &rmw_request_id_t, req: ListParameters_Request| { let map = map.lock().unwrap(); list_parameters(req, &map) @@ -279,14 +279,15 @@ impl ParameterService { )?; let map = parameter_map.clone(); let set_parameters_service = node.create_service( - &(fqn.clone() + "/set_parameters"), + (fqn.clone() + "/set_parameters").qos(QoSProfile::parameter_services_default()), move |_req_id: &rmw_request_id_t, req: SetParameters_Request| { let mut map = map.lock().unwrap(); set_parameters(req, &mut map) }, )?; let set_parameters_atomically_service = node.create_service( - &(fqn.clone() + "/set_parameters_atomically"), + (fqn.clone() + "/set_parameters_atomically") + .qos(QoSProfile::parameter_services_default()), move |_req_id: &rmw_request_id_t, req: SetParametersAtomically_Request| { let mut map = parameter_map.lock().unwrap(); set_parameters_atomically(req, &mut map) diff --git a/rclrs/src/publisher.rs b/rclrs/src/publisher.rs index 01a60d29..d92184cd 100644 --- a/rclrs/src/publisher.rs +++ b/rclrs/src/publisher.rs @@ -11,7 +11,7 @@ use crate::{ error::{RclrsError, ToResult}, qos::QoSProfile, rcl_bindings::*, - NodeHandle, ENTITY_LIFECYCLE_MUTEX, + IntoPrimitiveOptions, NodeHandle, ENTITY_LIFECYCLE_MUTEX, }; mod loaned_message; @@ -78,14 +78,14 @@ where /// Creates a new `Publisher`. /// /// Node and namespace changes are always applied _before_ topic remapping. - pub(crate) fn new( + pub(crate) fn new<'a>( node_handle: Arc, - topic: &str, - qos: QoSProfile, + options: impl Into>, ) -> Result where T: Message, { + let PublisherOptions { topic, qos } = options.into(); // SAFETY: Getting a zero-initialized value is always safe. let mut rcl_publisher = unsafe { rcl_get_zero_initialized_publisher() }; let type_support_ptr = @@ -250,6 +250,38 @@ where } } +/// `PublisherOptions` are used by [`Node::create_publisher`][1] to initialize +/// a [`Publisher`]. +/// +/// [1]: crate::Node::create_publisher +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct PublisherOptions<'a> { + /// The topic name for the publisher. + pub topic: &'a str, + /// The quality of service settings for the publisher. + pub qos: QoSProfile, +} + +impl<'a> PublisherOptions<'a> { + /// Initialize a new [`PublisherOptions`] with default settings. + pub fn new(topic: &'a str) -> Self { + Self { + topic, + qos: QoSProfile::topics_default(), + } + } +} + +impl<'a, T: IntoPrimitiveOptions<'a>> From for PublisherOptions<'a> { + fn from(value: T) -> Self { + let primitive = value.into_primitive_options(); + let mut options = Self::new(primitive.name); + primitive.apply_to(&mut options.qos); + options + } +} + /// Convenience trait for [`Publisher::publish`]. pub trait MessageCow<'a, T: Message> { /// Wrap the owned or borrowed message in a `Cow`. @@ -281,7 +313,7 @@ mod tests { #[test] fn test_publishers() -> Result<(), RclrsError> { - use crate::{TopicEndpointInfo, QOS_PROFILE_SYSTEM_DEFAULT}; + use crate::TopicEndpointInfo; use test_msgs::msg; let namespace = "/test_publishers_graph"; @@ -289,16 +321,15 @@ mod tests { let node_1_empty_publisher = graph .node1 - .create_publisher::("graph_test_topic_1", QOS_PROFILE_SYSTEM_DEFAULT)?; + .create_publisher::("graph_test_topic_1")?; let topic1 = node_1_empty_publisher.topic_name(); - let node_1_basic_types_publisher = graph.node1.create_publisher::( - "graph_test_topic_2", - QOS_PROFILE_SYSTEM_DEFAULT, - )?; + let node_1_basic_types_publisher = graph + .node1 + .create_publisher::("graph_test_topic_2")?; let topic2 = node_1_basic_types_publisher.topic_name(); let node_2_default_publisher = graph .node2 - .create_publisher::("graph_test_topic_3", QOS_PROFILE_SYSTEM_DEFAULT)?; + .create_publisher::("graph_test_topic_3")?; let topic3 = node_2_default_publisher.topic_name(); std::thread::sleep(std::time::Duration::from_millis(100)); @@ -345,21 +376,15 @@ mod tests { assert_eq!(node_1_empty_publisher.get_subscription_count(), Ok(0)); assert_eq!(node_1_basic_types_publisher.get_subscription_count(), Ok(0)); assert_eq!(node_2_default_publisher.get_subscription_count(), Ok(0)); - let _node_1_empty_subscriber = graph.node1.create_subscription( - "graph_test_topic_1", - QOS_PROFILE_SYSTEM_DEFAULT, - |_msg: msg::Empty| {}, - ); - let _node_1_basic_types_subscriber = graph.node1.create_subscription( - "graph_test_topic_2", - QOS_PROFILE_SYSTEM_DEFAULT, - |_msg: msg::BasicTypes| {}, - ); - let _node_2_default_subscriber = graph.node2.create_subscription( - "graph_test_topic_3", - QOS_PROFILE_SYSTEM_DEFAULT, - |_msg: msg::Defaults| {}, - ); + let _node_1_empty_subscriber = graph + .node1 + .create_subscription("graph_test_topic_1", |_msg: msg::Empty| {}); + let _node_1_basic_types_subscriber = graph + .node1 + .create_subscription("graph_test_topic_2", |_msg: msg::BasicTypes| {}); + let _node_2_default_subscriber = graph + .node2 + .create_subscription("graph_test_topic_3", |_msg: msg::Defaults| {}); assert_eq!(node_1_empty_publisher.get_subscription_count(), Ok(1)); assert_eq!(node_1_basic_types_publisher.get_subscription_count(), Ok(1)); assert_eq!(node_2_default_publisher.get_subscription_count(), Ok(1)); diff --git a/rclrs/src/qos.rs b/rclrs/src/qos.rs index b26f01ef..dcde5902 100644 --- a/rclrs/src/qos.rs +++ b/rclrs/src/qos.rs @@ -166,7 +166,7 @@ pub struct QoSProfile { /// The time within which the RMW publisher must show that it is alive. /// /// If this is `Infinite`, liveliness is not enforced. - pub liveliness_lease_duration: QoSDuration, + pub liveliness_lease: QoSDuration, /// If true, any ROS specific namespacing conventions will be circumvented. /// /// In the case of DDS and topics, for example, this means the typical @@ -200,7 +200,7 @@ impl From for rmw_qos_profile_t { deadline: qos.deadline.into(), lifespan: qos.lifespan.into(), liveliness: qos.liveliness.into(), - liveliness_lease_duration: qos.liveliness_lease_duration.into(), + liveliness_lease_duration: qos.liveliness_lease.into(), avoid_ros_namespace_conventions: qos.avoid_ros_namespace_conventions, } } @@ -244,22 +244,57 @@ impl QoSProfile { } /// Sets the QoS profile deadline to the specified `Duration`. - pub fn deadline(mut self, deadline: Duration) -> Self { + pub fn deadline_duration(mut self, deadline: Duration) -> Self { self.deadline = QoSDuration::Custom(deadline); self } /// Sets the QoS profile liveliness lease duration to the specified `Duration`. pub fn liveliness_lease_duration(mut self, lease_duration: Duration) -> Self { - self.liveliness_lease_duration = QoSDuration::Custom(lease_duration); + self.liveliness_lease = QoSDuration::Custom(lease_duration); self } /// Sets the QoS profile lifespan to the specified `Duration`. - pub fn lifespan(mut self, lifespan: Duration) -> Self { + pub fn lifespan_duration(mut self, lifespan: Duration) -> Self { self.lifespan = QoSDuration::Custom(lifespan); self } + + /// Get the default QoS profile for ordinary topics. + pub fn topics_default() -> Self { + QOS_PROFILE_DEFAULT + } + + /// Get the default QoS profile for topics that transmit sensor data. + pub fn sensor_data_default() -> Self { + QOS_PROFILE_SENSOR_DATA + } + + /// Get the default QoS profile for services. + pub fn services_default() -> Self { + QOS_PROFILE_SERVICES_DEFAULT + } + + /// Get the default QoS profile for parameter services. + pub fn parameter_services_default() -> Self { + QOS_PROFILE_PARAMETERS + } + + /// Get the default QoS profile for parameter event topics. + pub fn parameter_events_default() -> Self { + QOS_PROFILE_PARAMETER_EVENTS + } + + /// Get the system-defined default quality of service profile. Topics and + /// services created with this default do not use the recommended ROS + /// defaults; they will instead use the default as defined by the underlying + /// RMW implementation (rmw_fastrtps, rmw_connextdds, etc). These defaults + /// may not always be appropriate for every use-case, and may be different + /// depending on which RMW implementation you are using, so use caution! + pub fn system_default() -> Self { + QOS_PROFILE_SYSTEM_DEFAULT + } } impl From for rmw_qos_history_policy_t { @@ -355,7 +390,7 @@ pub const QOS_PROFILE_SENSOR_DATA: QoSProfile = QoSProfile { deadline: QoSDuration::SystemDefault, lifespan: QoSDuration::SystemDefault, liveliness: QoSLivelinessPolicy::SystemDefault, - liveliness_lease_duration: QoSDuration::SystemDefault, + liveliness_lease: QoSDuration::SystemDefault, avoid_ros_namespace_conventions: false, }; @@ -370,7 +405,7 @@ pub const QOS_PROFILE_CLOCK: QoSProfile = QoSProfile { deadline: QoSDuration::SystemDefault, lifespan: QoSDuration::SystemDefault, liveliness: QoSLivelinessPolicy::SystemDefault, - liveliness_lease_duration: QoSDuration::SystemDefault, + liveliness_lease: QoSDuration::SystemDefault, avoid_ros_namespace_conventions: false, }; @@ -384,7 +419,7 @@ pub const QOS_PROFILE_PARAMETERS: QoSProfile = QoSProfile { deadline: QoSDuration::SystemDefault, lifespan: QoSDuration::SystemDefault, liveliness: QoSLivelinessPolicy::SystemDefault, - liveliness_lease_duration: QoSDuration::SystemDefault, + liveliness_lease: QoSDuration::SystemDefault, avoid_ros_namespace_conventions: false, }; @@ -398,7 +433,7 @@ pub const QOS_PROFILE_DEFAULT: QoSProfile = QoSProfile { deadline: QoSDuration::SystemDefault, lifespan: QoSDuration::SystemDefault, liveliness: QoSLivelinessPolicy::SystemDefault, - liveliness_lease_duration: QoSDuration::SystemDefault, + liveliness_lease: QoSDuration::SystemDefault, avoid_ros_namespace_conventions: false, }; @@ -412,7 +447,7 @@ pub const QOS_PROFILE_SERVICES_DEFAULT: QoSProfile = QoSProfile { deadline: QoSDuration::SystemDefault, lifespan: QoSDuration::SystemDefault, liveliness: QoSLivelinessPolicy::SystemDefault, - liveliness_lease_duration: QoSDuration::SystemDefault, + liveliness_lease: QoSDuration::SystemDefault, avoid_ros_namespace_conventions: false, }; @@ -426,7 +461,7 @@ pub const QOS_PROFILE_PARAMETER_EVENTS: QoSProfile = QoSProfile { deadline: QoSDuration::SystemDefault, lifespan: QoSDuration::SystemDefault, liveliness: QoSLivelinessPolicy::SystemDefault, - liveliness_lease_duration: QoSDuration::SystemDefault, + liveliness_lease: QoSDuration::SystemDefault, avoid_ros_namespace_conventions: false, }; @@ -440,6 +475,6 @@ pub const QOS_PROFILE_SYSTEM_DEFAULT: QoSProfile = QoSProfile { deadline: QoSDuration::SystemDefault, lifespan: QoSDuration::SystemDefault, liveliness: QoSLivelinessPolicy::SystemDefault, - liveliness_lease_duration: QoSDuration::SystemDefault, + liveliness_lease: QoSDuration::SystemDefault, avoid_ros_namespace_conventions: false, }; diff --git a/rclrs/src/service.rs b/rclrs/src/service.rs index f901f16a..56ffae04 100644 --- a/rclrs/src/service.rs +++ b/rclrs/src/service.rs @@ -9,7 +9,8 @@ use rosidl_runtime_rs::Message; use crate::{ error::{RclReturnCode, ToResult}, rcl_bindings::*, - MessageCow, Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX, + IntoPrimitiveOptions, MessageCow, Node, NodeHandle, QoSProfile, RclrsError, + ENTITY_LIFECYCLE_MUTEX, }; // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread @@ -84,24 +85,30 @@ where T: rosidl_runtime_rs::Service, { /// Creates a new service. - pub(crate) fn new(node: &Arc, topic: &str, callback: F) -> Result + pub(crate) fn new<'a, F>( + node: &Arc, + options: impl Into>, + callback: F, + ) -> Result // This uses pub(crate) visibility to avoid instantiating this struct outside // [`Node::create_service`], see the struct's documentation for the rationale where T: rosidl_runtime_rs::Service, F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send, { + let ServiceOptions { name, qos } = options.into(); // SAFETY: Getting a zero-initialized value is always safe. let mut rcl_service = unsafe { rcl_get_zero_initialized_service() }; let type_support = ::get_type_support() as *const rosidl_service_type_support_t; - let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul { + let topic_c_string = CString::new(name).map_err(|err| RclrsError::StringContainsNul { err, - s: topic.into(), + s: name.into(), })?; // SAFETY: No preconditions for this function. - let service_options = unsafe { rcl_service_get_default_options() }; + let mut service_options = unsafe { rcl_service_get_default_options() }; + service_options.qos = qos.into(); { let rcl_node = node.handle.rcl_node.lock().unwrap(); @@ -182,6 +189,36 @@ where } } +/// `ServiceOptions are used by [`Node::create_service`][1] to initialize a +/// [`Service`] provider. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ServiceOptions<'a> { + /// The name for the service + pub name: &'a str, + /// The quality of service profile for the service. + pub qos: QoSProfile, +} + +impl<'a> ServiceOptions<'a> { + /// Initialize a new [`ServiceOptions`] with default settings. + pub fn new(name: &'a str) -> Self { + Self { + name, + qos: QoSProfile::services_default(), + } + } +} + +impl<'a, T: IntoPrimitiveOptions<'a>> From for ServiceOptions<'a> { + fn from(value: T) -> Self { + let primitive = value.into_primitive_options(); + let mut options = Self::new(primitive.name); + primitive.apply_to(&mut options.qos); + options + } +} + impl ServiceBase for Service where T: rosidl_runtime_rs::Service, diff --git a/rclrs/src/subscription.rs b/rclrs/src/subscription.rs index 7f0f24eb..d015aad4 100644 --- a/rclrs/src/subscription.rs +++ b/rclrs/src/subscription.rs @@ -10,7 +10,7 @@ use crate::{ error::{RclReturnCode, ToResult}, qos::QoSProfile, rcl_bindings::*, - Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX, + IntoPrimitiveOptions, Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX, }; mod callback; @@ -96,10 +96,9 @@ where T: Message, { /// Creates a new subscription. - pub(crate) fn new( + pub(crate) fn new<'a, Args>( node: &Arc, - topic: &str, - qos: QoSProfile, + options: impl Into>, callback: impl SubscriptionCallback, ) -> Result // This uses pub(crate) visibility to avoid instantiating this struct outside @@ -107,6 +106,7 @@ where where T: Message, { + let SubscriptionOptions { topic, qos } = options.into(); // SAFETY: Getting a zero-initialized value is always safe. let mut rcl_subscription = unsafe { rcl_get_zero_initialized_subscription() }; let type_support = @@ -117,8 +117,8 @@ where })?; // SAFETY: No preconditions for this function. - let mut subscription_options = unsafe { rcl_subscription_get_default_options() }; - subscription_options.qos = qos.into(); + let mut rcl_subscription_options = unsafe { rcl_subscription_get_default_options() }; + rcl_subscription_options.qos = qos.into(); { let rcl_node = node.handle.rcl_node.lock().unwrap(); @@ -135,7 +135,7 @@ where &*rcl_node, type_support, topic_c_string.as_ptr(), - &subscription_options, + &rcl_subscription_options, ) .ok()?; } @@ -267,6 +267,38 @@ where } } +/// `SubscriptionOptions` are used by [`Node::create_subscription`][1] to initialize +/// a [`Subscription`]. +/// +/// [1]: crate::Node::create_subscription +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct SubscriptionOptions<'a> { + /// The topic name for the subscription. + pub topic: &'a str, + /// The quality of service settings for the subscription. + pub qos: QoSProfile, +} + +impl<'a> SubscriptionOptions<'a> { + /// Initialize a new [`SubscriptionOptions`] with default settings. + pub fn new(topic: &'a str) -> Self { + Self { + topic, + qos: QoSProfile::topics_default(), + } + } +} + +impl<'a, T: IntoPrimitiveOptions<'a>> From for SubscriptionOptions<'a> { + fn from(value: T) -> Self { + let primitive = value.into_primitive_options(); + let mut options = Self::new(primitive.name); + primitive.apply_to(&mut options.qos); + options + } +} + impl SubscriptionBase for Subscription where T: Message, @@ -336,27 +368,23 @@ mod tests { #[test] fn test_subscriptions() -> Result<(), RclrsError> { - use crate::{TopicEndpointInfo, QOS_PROFILE_SYSTEM_DEFAULT}; + use crate::TopicEndpointInfo; let namespace = "/test_subscriptions_graph"; let graph = construct_test_graph(namespace)?; - let node_2_empty_subscription = graph.node2.create_subscription::( - "graph_test_topic_1", - QOS_PROFILE_SYSTEM_DEFAULT, - |_msg: msg::Empty| {}, - )?; + let node_2_empty_subscription = graph + .node2 + .create_subscription::("graph_test_topic_1", |_msg: msg::Empty| {})?; let topic1 = node_2_empty_subscription.topic_name(); let node_2_basic_types_subscription = graph.node2.create_subscription::( "graph_test_topic_2", - QOS_PROFILE_SYSTEM_DEFAULT, |_msg: msg::BasicTypes| {}, )?; let topic2 = node_2_basic_types_subscription.topic_name(); let node_1_defaults_subscription = graph.node1.create_subscription::( "graph_test_topic_3", - QOS_PROFILE_SYSTEM_DEFAULT, |_msg: msg::Defaults| {}, )?; let topic3 = node_1_defaults_subscription.topic_name(); @@ -422,10 +450,10 @@ mod tests { let qos = QoSProfile::default().keep_all().reliable(); let subscription = node - .create_subscription::("test_topic", qos, callback) + .create_subscription::("test_topic".qos(qos), callback) .unwrap(); let publisher = node - .create_publisher::("test_topic", qos) + .create_publisher::("test_topic".qos(qos)) .unwrap(); (subscription, publisher) diff --git a/rclrs/src/time_source.rs b/rclrs/src/time_source.rs index 1b5f1650..88476de7 100644 --- a/rclrs/src/time_source.rs +++ b/rclrs/src/time_source.rs @@ -1,7 +1,7 @@ use crate::{ clock::{Clock, ClockSource, ClockType}, vendor::rosgraph_msgs::msg::Clock as ClockMsg, - Node, QoSProfile, ReadOnlyParameter, Subscription, QOS_PROFILE_CLOCK, + IntoPrimitiveOptions, Node, QoSProfile, ReadOnlyParameter, Subscription, QOS_PROFILE_CLOCK, }; use std::sync::{Arc, Mutex, RwLock, Weak}; @@ -131,14 +131,17 @@ impl TimeSource { .unwrap() .upgrade() .unwrap() - .create_subscription::("/clock", self.clock_qos, move |msg: ClockMsg| { - let nanoseconds: i64 = - (msg.clock.sec as i64 * 1_000_000_000) + msg.clock.nanosec as i64; - *last_received_time.lock().unwrap() = Some(nanoseconds); - if let Some(clock) = clock.lock().unwrap().as_mut() { - Self::update_clock(clock, nanoseconds); - } - }) + .create_subscription::( + "/clock".qos(self.clock_qos), + move |msg: ClockMsg| { + let nanoseconds: i64 = + (msg.clock.sec as i64 * 1_000_000_000) + msg.clock.nanosec as i64; + *last_received_time.lock().unwrap() = Some(nanoseconds); + if let Some(clock) = clock.lock().unwrap().as_mut() { + Self::update_clock(clock, nanoseconds); + } + }, + ) .unwrap() } }