Skip to content

Basic Timer implementation #440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 27 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6f9543c
Basic Timer type compiling :).
agalbachicar Nov 28, 2024
7b42c02
Evaluates the Timer::new() againts different clock types.
agalbachicar Nov 29, 2024
a60d9f3
Implement time_until_next_call
agalbachicar Nov 29, 2024
d233b47
Added cancel behavior
JesusSilvaUtrera Nov 29, 2024
3de2bb9
Merge pull request #1 from JesusSilvaUtrera/jsilva/add_cancel
agalbachicar Nov 29, 2024
189606f
Implement rcl_timer_reset
agalbachicar Nov 29, 2024
59ed7e2
Adds Timer::call().
agalbachicar Nov 29, 2024
9af9dd9
Added timer_period_ns (#2)
JesusSilvaUtrera Nov 29, 2024
e46224f
Adds Timer::is_ready().
agalbachicar Nov 29, 2024
501439d
WIP Timer callback implementation.
agalbachicar Nov 29, 2024
132c9db
Preliminary callback.
agalbachicar Nov 29, 2024
1095351
Added comments to avoid warnings (#3)
JesusSilvaUtrera Nov 29, 2024
965ca22
Integrated the Timer into the WaitSet.
agalbachicar Nov 29, 2024
f503c84
Add create_timer to node (WIP) (#4)
JesusSilvaUtrera Nov 29, 2024
ed78b35
Makes it work with the integration demo.
agalbachicar Nov 29, 2024
214a991
Working E2E timer with node.
agalbachicar Nov 29, 2024
1eb1acc
Format fix.
agalbachicar Nov 29, 2024
91756ca
Fix a TODO for peace of mind.
agalbachicar Nov 29, 2024
fbb8629
Adds an example.
agalbachicar Dec 1, 2024
ab3d63a
Fix format for the example.
agalbachicar Dec 1, 2024
4515e9a
Adds tests, documentation and removes dead code in node.rs.
agalbachicar Dec 1, 2024
85930a3
Fix documentation style in clock.rs.
agalbachicar Dec 1, 2024
1563895
Removes duplicated test in node.rs
agalbachicar Dec 1, 2024
08acef5
Fix warnings while running tests in node.rs.
agalbachicar Dec 1, 2024
a4c1a97
Fix missing documentation in wait.rs.
agalbachicar Dec 1, 2024
655185d
Improvements to timer.
agalbachicar Dec 1, 2024
30a6717
Makes rustdoc pass in the examples.
agalbachicar Dec 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions examples/rclrs_timer_demo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "rclrs_timer_demo"
version = "0.1.0"
edition = "2021"

[[bin]]
name="rclrs_timer_demo"
path="src/rclrs_timer_demo.rs"


[dependencies]
rclrs = "*"
13 changes: 13 additions & 0 deletions examples/rclrs_timer_demo/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<package format="3">
<name>rclrs_timer_demo</name>
<version>0.1.0</version>
<description>Shows how to implement a timer within a Node using rclrs.</description>
<maintainer email="[email protected]">user</maintainer>
<license>TODO: License declaration.</license>

<depend>rclrs</depend>

<export>
<build_type>ament_cargo</build_type>
</export>
</package>
48 changes: 48 additions & 0 deletions examples/rclrs_timer_demo/src/rclrs_timer_demo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/// Creates a SimpleTimerNode, initializes a node and the timer with a callback
/// that prints the timer callback execution iteration. The callback is executed
/// thanks to the spin, which is in charge of executing the timer's events among
/// other entities' events.
use rclrs::{create_node, Context, Node, RclrsError, Timer};
use std::{
env,
sync::{Arc, Mutex},
};

/// Contains both the node and timer.
struct SimpleTimerNode {
node: Arc<Node>,
timer: Arc<Timer>,
}

impl SimpleTimerNode {
/// Creates a node and a timer with a callback.
///
/// The callback will simply print to stdout:
/// "Drinking 🧉 for the xth time every p nanoseconds."
/// where x is the iteration callback counter and p is the period of the timer.
fn new(context: &Context, timer_period_ns: i64) -> Result<Self, RclrsError> {
let node = create_node(context, "simple_timer_node")?;
let count: Arc<Mutex<i32>> = Arc::new(Mutex::new(0));
let timer = node.create_timer(
timer_period_ns,
context,
Some(Box::new(move |_| {
let x = *count.lock().unwrap();
println!(
"Drinking 🧉 for the {}th time every {} nanoseconds.",
x, timer_period_ns
);
*count.lock().unwrap() = x + 1;
})),
None,
)?;
Ok(Self { node, timer })
}
}

fn main() -> Result<(), RclrsError> {
let timer_period: i64 = 1e9 as i64; // 1 seconds.
let context = Context::new(env::args()).unwrap();
let simple_timer_node = Arc::new(SimpleTimerNode::new(&context, timer_period).unwrap());
rclrs::spin(simple_timer_node.node.clone())
}
5 changes: 5 additions & 0 deletions rclrs/src/clock.rs
Original file line number Diff line number Diff line change
@@ -83,6 +83,11 @@ impl Clock {
}
}

/// Returns the clock's `rcl_clock_t`.
pub(crate) fn get_rcl_clock(&self) -> &Arc<Mutex<rcl_clock_t>> {
&self.rcl_clock
}

/// Returns the clock's `ClockType`.
pub fn clock_type(&self) -> ClockType {
self.kind
6 changes: 5 additions & 1 deletion rclrs/src/executor.rs
Original file line number Diff line number Diff line change
@@ -48,7 +48,11 @@ impl SingleThreadedExecutor {
})
{
let wait_set = WaitSet::new_for_node(&node)?;
let ready_entities = wait_set.wait(timeout)?;
let mut ready_entities = wait_set.wait(timeout)?;

for ready_timer in ready_entities.timers.iter_mut() {
ready_timer.execute()?;
}

for ready_subscription in ready_entities.subscriptions {
ready_subscription.execute()?;
2 changes: 2 additions & 0 deletions rclrs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ mod service;
mod subscription;
mod time;
mod time_source;
mod timer;
mod vendor;
mod wait;

@@ -49,6 +50,7 @@ pub use service::*;
pub use subscription::*;
pub use time::*;
use time_source::*;
pub use timer::*;
pub use wait::*;

/// Polls the node for new messages and executes the corresponding callbacks.
49 changes: 48 additions & 1 deletion rclrs/src/node.rs
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ use crate::{
rcl_bindings::*, Client, ClientBase, Clock, Context, ContextHandle, GuardCondition, LogParams,
Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Publisher,
QoSProfile, RclrsError, Service, ServiceBase, Subscription, SubscriptionBase,
SubscriptionCallback, TimeSource, ToLogParams, ENTITY_LIFECYCLE_MUTEX,
SubscriptionCallback, TimeSource, Timer, TimerCallback, ToLogParams, ENTITY_LIFECYCLE_MUTEX,
};

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
@@ -63,6 +63,7 @@ pub struct Node {
pub(crate) guard_conditions_mtx: Mutex<Vec<Weak<GuardCondition>>>,
pub(crate) services_mtx: Mutex<Vec<Weak<dyn ServiceBase>>>,
pub(crate) subscriptions_mtx: Mutex<Vec<Weak<dyn SubscriptionBase>>>,
pub(crate) timers_mtx: Mutex<Vec<Weak<Timer>>>,
time_source: TimeSource,
parameter: ParameterInterface,
pub(crate) handle: Arc<NodeHandle>,
@@ -340,6 +341,30 @@ impl Node {
Ok(subscription)
}

/// Creates a [`Timer`][1].
///
/// [1]: crate::Timer
/// TODO: make timer's lifetime depend on node's lifetime.
pub fn create_timer(
&self,
period_ns: i64,
context: &Context,
callback: Option<TimerCallback>,
clock: Option<Clock>,
) -> Result<Arc<Timer>, RclrsError> {
let clock_used = match clock {
Some(value) => value,
None => self.get_clock(),
};
let timer = Timer::new(&clock_used, &context, period_ns, callback)?;
let timer = Arc::new(timer);
self.timers_mtx
.lock()
.unwrap()
.push(Arc::downgrade(&timer) as Weak<Timer>);
Ok(timer)
}

/// Returns the subscriptions that have not been dropped yet.
pub(crate) fn live_subscriptions(&self) -> Vec<Arc<dyn SubscriptionBase>> {
{ self.subscriptions_mtx.lock().unwrap() }
@@ -369,6 +394,13 @@ impl Node {
.collect()
}

pub(crate) fn live_timers(&self) -> Vec<Arc<Timer>> {
{ self.timers_mtx.lock().unwrap() }
.iter()
.filter_map(Weak::upgrade)
.collect()
}

/// Returns the ROS domain ID that the node is using.
///
/// The domain ID controls which nodes can send messages to each other, see the [ROS 2 concept article][1].
@@ -551,6 +583,21 @@ mod tests {
Ok(())
}

#[test]
fn test_create_timer_without_clock_source() -> Result<(), RclrsError> {
let timer_period_ns: i64 = 1e6 as i64; // 1 millisecond.
let context = Context::new([])?;
let dut = NodeBuilder::new(&context, "node_with_timer")
.namespace("test_create_timer")
.build()?;

let _timer =
dut.create_timer(timer_period_ns, &context, Some(Box::new(move |_| {})), None)?;
assert_eq!(dut.live_timers().len(), 1);

Ok(())
}

#[test]
fn test_logger_name() -> Result<(), RclrsError> {
// Use helper to create 2 nodes for us
1 change: 1 addition & 0 deletions rclrs/src/node/builder.rs
Original file line number Diff line number Diff line change
@@ -340,6 +340,7 @@ impl NodeBuilder {
guard_conditions_mtx: Mutex::new(vec![]),
services_mtx: Mutex::new(vec![]),
subscriptions_mtx: Mutex::new(vec![]),
timers_mtx: Mutex::new(vec![]),
time_source: TimeSource::builder(self.clock_type)
.clock_qos(self.clock_qos)
.build(),
4 changes: 4 additions & 0 deletions rclrs/src/rcl_bindings.rs
Original file line number Diff line number Diff line change
@@ -89,6 +89,10 @@ cfg_if::cfg_if! {
#[derive(Debug)]
pub struct rcl_wait_set_t;

#[repr(C)]
#[derive(Debug)]
pub struct rcl_timer_t;

#[repr(C)]
#[derive(Debug)]
pub struct rcutils_string_array_t;
398 changes: 398 additions & 0 deletions rclrs/src/timer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,398 @@
use crate::{clock::Clock, context::Context, error::RclrsError, rcl_bindings::*, to_rclrs_result};
// TODO: fix me when the callback type is properly defined.
// use std::fmt::Debug;
use std::sync::{atomic::AtomicBool, Arc, Mutex};

/// Type alias for the `Timer` callback.
pub type TimerCallback = Box<dyn Fn(i64) + Send + Sync>;

/// Struct for executing periodic events.
///
/// The execution of the callbacks is tied to [`spin_once`][1] or [`spin`][2] on the timers's node.
///
/// Timer can be created via [`Node::create_timer()`][3], this is to ensure that [`Node`][4]s can
/// track all the timers that have been created. However, a user of a `Timer` can also
/// use it standalone.
///
/// [1]: crate::spin_once
/// [2]: crate::spin
/// [3]: crate::Node::create_timer
/// [4]: crate::Node
// TODO: callback type prevents us from making the Timer implement the Debug trait.
// #[derive(Debug)]
pub struct Timer {
pub(crate) rcl_timer: Arc<Mutex<rcl_timer_t>>,
/// The callback function that runs when the timer is due.
callback: Option<TimerCallback>,
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
}

impl Timer {
/// Creates a new timer.
pub fn new(
clock: &Clock,
context: &Context,
period: i64,
callback: Option<TimerCallback>,
) -> Result<Timer, RclrsError> {
let mut rcl_timer;
let timer_init_result = unsafe {
// SAFETY: Getting a default value is always safe.
rcl_timer = rcl_get_zero_initialized_timer();
let mut rcl_clock = clock.get_rcl_clock().lock().unwrap();
let allocator = rcutils_get_default_allocator();
let mut rcl_context = context.handle.rcl_context.lock().unwrap();
// Callbacks will be handled in the WaitSet.
let rcl_timer_callback: rcl_timer_callback_t = None;
// Function will return Err(_) only if there isn't enough memory to allocate a clock
// object.
rcl_timer_init(
&mut rcl_timer,
&mut *rcl_clock,
&mut *rcl_context,
period,
rcl_timer_callback,
allocator,
)
};
to_rclrs_result(timer_init_result).map(|_| Timer {
rcl_timer: Arc::new(Mutex::new(rcl_timer)),
callback,
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
})
}

/// Gets the period of the timer in nanoseconds
pub fn get_timer_period_ns(&self) -> Result<i64, RclrsError> {
let mut timer_period_ns = 0;
let get_period_result = unsafe {
let rcl_timer = self.rcl_timer.lock().unwrap();
rcl_timer_get_period(&*rcl_timer, &mut timer_period_ns)
};
to_rclrs_result(get_period_result).map(|_| timer_period_ns)
}

/// Cancels the timer, stopping the execution of the callback
pub fn cancel(&self) -> Result<(), RclrsError> {
let mut rcl_timer = self.rcl_timer.lock().unwrap();
let cancel_result = unsafe { rcl_timer_cancel(&mut *rcl_timer) };
to_rclrs_result(cancel_result)
}

/// Checks whether the timer is canceled or not
pub fn is_canceled(&self) -> Result<bool, RclrsError> {
let mut is_canceled = false;
let is_canceled_result = unsafe {
let rcl_timer = self.rcl_timer.lock().unwrap();
rcl_timer_is_canceled(&*rcl_timer, &mut is_canceled)
};
to_rclrs_result(is_canceled_result).map(|_| is_canceled)
}

/// Retrieves the time since the last call to the callback
pub fn time_since_last_call(&self) -> Result<i64, RclrsError> {
let mut time_value_ns: i64 = 0;
let time_since_last_call_result = unsafe {
let rcl_timer = self.rcl_timer.lock().unwrap();
rcl_timer_get_time_since_last_call(&*rcl_timer, &mut time_value_ns)
};
to_rclrs_result(time_since_last_call_result).map(|_| time_value_ns)
}

/// Retrieves the time until the next call of the callback
pub fn time_until_next_call(&self) -> Result<i64, RclrsError> {
let mut time_value_ns: i64 = 0;
let time_until_next_call_result = unsafe {
let rcl_timer = self.rcl_timer.lock().unwrap();
rcl_timer_get_time_until_next_call(&*rcl_timer, &mut time_value_ns)
};
to_rclrs_result(time_until_next_call_result).map(|_| time_value_ns)
}

/// Resets the timer.
pub fn reset(&self) -> Result<(), RclrsError> {
let mut rcl_timer = self.rcl_timer.lock().unwrap();
to_rclrs_result(unsafe { rcl_timer_reset(&mut *rcl_timer) })
}

/// Executes the callback of the timer (this is triggered by the executor or the node directly)
pub fn call(&self) -> Result<(), RclrsError> {
let mut rcl_timer = self.rcl_timer.lock().unwrap();
to_rclrs_result(unsafe { rcl_timer_call(&mut *rcl_timer) })
}

/// Checks if the timer is ready (not canceled)
pub fn is_ready(&self) -> Result<bool, RclrsError> {
let (is_ready, is_ready_result) = unsafe {
let mut is_ready: bool = false;
let rcl_timer = self.rcl_timer.lock().unwrap();
let is_ready_result = rcl_timer_is_ready(&*rcl_timer, &mut is_ready);
(is_ready, is_ready_result)
};
to_rclrs_result(is_ready_result).map(|_| is_ready)
}

pub(crate) fn execute(&self) -> Result<(), RclrsError> {
if self.is_ready()? {
let time_since_last_call = self.time_since_last_call()?;
self.call()?;
if let Some(ref callback) = self.callback {
callback(time_since_last_call);
}
}
Ok(())
}
}

/// 'Drop' trait implementation to be able to release the resources
impl Drop for rcl_timer_t {
fn drop(&mut self) {
// SAFETY: No preconditions for this function
let rc = unsafe { rcl_timer_fini(&mut *self) };
if let Err(e) = to_rclrs_result(rc) {
panic!("Unable to release Timer. {:?}", e)
}
}
}

impl PartialEq for Timer {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.rcl_timer, &other.rcl_timer)
}
}

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
// they are running in. Therefore, this type can be safely sent to another thread.
unsafe impl Send for rcl_timer_t {}

#[cfg(test)]
mod tests {
use super::*;
use std::{thread, time};

fn create_dummy_callback() -> Option<TimerCallback> {
Some(Box::new(move |_| {}))
}

#[test]
fn traits() {
use crate::test_helpers::*;

assert_send::<Timer>();
assert_sync::<Timer>();
}

#[test]
fn test_new_with_system_clock() {
let clock = Clock::system();
let context = Context::new(vec![]).unwrap();
let period: i64 = 1e6 as i64; // 1 milliseconds.
let dut = Timer::new(&clock, &context, period, create_dummy_callback());
assert!(dut.is_ok());
}

#[test]
fn test_new_with_steady_clock() {
let clock = Clock::steady();
let context = Context::new(vec![]).unwrap();
let period: i64 = 1e6 as i64; // 1 milliseconds.
let dut = Timer::new(&clock, &context, period, create_dummy_callback());
assert!(dut.is_ok());
}

#[ignore = "SIGSEGV when creating the timer with Clock::with_source()."]
#[test]
fn test_new_with_source_clock() {
let (clock, source) = Clock::with_source();
// No manual time set, it should default to 0
assert!(clock.now().nsec == 0);
let set_time = 1234i64;
source.set_ros_time_override(set_time);
// Ros time is set, should return the value that was set
assert_eq!(clock.now().nsec, set_time);
let context = Context::new(vec![]).unwrap();
let period: i64 = 1e6 as i64; // 1 milliseconds..
let dut = Timer::new(&clock, &context, period, create_dummy_callback());
assert!(dut.is_ok());
}

#[test]
fn test_get_period() {
let clock = Clock::steady();
let context = Context::new(vec![]).unwrap();
let period: i64 = 1e6 as i64; // 1 milliseconds.
let dut = Timer::new(&clock, &context, period, create_dummy_callback());
assert!(dut.is_ok());
let dut = dut.unwrap();
let period_result = dut.get_timer_period_ns();
assert!(period_result.is_ok());
let period_result = period_result.unwrap();
assert_eq!(period_result, 1e6 as i64);
}

#[test]
fn test_cancel() {
let clock = Clock::steady();
let context = Context::new(vec![]).unwrap();
let period: i64 = 1e6 as i64; // 1 milliseconds.
let dut = Timer::new(&clock, &context, period, create_dummy_callback());
assert!(dut.is_ok());
let dut = dut.unwrap();
assert!(dut.is_canceled().is_ok());
assert!(!dut.is_canceled().unwrap());
let cancel_result = dut.cancel();
assert!(cancel_result.is_ok());
assert!(dut.is_canceled().is_ok());
assert!(dut.is_canceled().unwrap());
}

#[test]
fn test_time_since_last_call_before_first_event() {
let clock = Clock::steady();
let context = Context::new(vec![]).unwrap();
let period_ns: i64 = 2e6 as i64; // 2 milliseconds.
let sleep_period_ms = time::Duration::from_millis(1);
let dut = Timer::new(&clock, &context, period_ns, create_dummy_callback());
assert!(dut.is_ok());
let dut = dut.unwrap();
thread::sleep(sleep_period_ms);
let time_since_last_call = dut.time_since_last_call();
assert!(time_since_last_call.is_ok());
let time_since_last_call = time_since_last_call.unwrap();
assert!(
time_since_last_call > 9e5 as i64,
"time_since_last_call: {}",
time_since_last_call
);
}

#[test]
fn test_time_until_next_call_before_first_event() {
let clock = Clock::steady();
let context = Context::new(vec![]).unwrap();
let period_ns: i64 = 2e6 as i64; // 2 milliseconds.
let dut = Timer::new(&clock, &context, period_ns, create_dummy_callback());
assert!(dut.is_ok());
let dut = dut.unwrap();
let time_until_next_call = dut.time_until_next_call();
assert!(time_until_next_call.is_ok());
let time_until_next_call = time_until_next_call.unwrap();
assert!(
time_until_next_call < period_ns,
"time_until_next_call: {}",
time_until_next_call
);
}

#[test]
fn test_reset() {
let tolerance = 20e4 as i64;
let clock = Clock::steady();
let context = Context::new(vec![]).unwrap();
let period_ns: i64 = 2e6 as i64; // 2 milliseconds.
let dut = Timer::new(&clock, &context, period_ns, create_dummy_callback()).unwrap();
let elapsed = period_ns - dut.time_until_next_call().unwrap();
assert!(elapsed < tolerance, "elapsed before reset: {}", elapsed);
thread::sleep(time::Duration::from_millis(1));
assert!(dut.reset().is_ok());
let elapsed = period_ns - dut.time_until_next_call().unwrap();
assert!(elapsed < tolerance, "elapsed after reset: {}", elapsed);
}

#[test]
fn test_call() {
let tolerance = 20e4 as i64;
let clock = Clock::steady();
let context = Context::new(vec![]).unwrap();
let period_ns: i64 = 1e6 as i64; // 1 millisecond.
let dut = Timer::new(&clock, &context, period_ns, create_dummy_callback()).unwrap();
let elapsed = period_ns - dut.time_until_next_call().unwrap();
assert!(elapsed < tolerance, "elapsed before reset: {}", elapsed);
thread::sleep(time::Duration::from_micros(1500));
let elapsed = period_ns - dut.time_until_next_call().unwrap();
assert!(
elapsed > 1500000i64,
"time_until_next_call before call: {}",
elapsed
);
assert!(dut.call().is_ok());
let elapsed = dut.time_until_next_call().unwrap();
assert!(
elapsed < 500000i64,
"time_until_next_call after call: {}",
elapsed
);
}

#[test]
fn test_is_ready() {
let clock = Clock::steady();
let context = Context::new(vec![]).unwrap();
let period_ns: i64 = 1e6 as i64; // 1 millisecond.
let dut = Timer::new(&clock, &context, period_ns, create_dummy_callback()).unwrap();
let is_ready = dut.is_ready();
assert!(is_ready.is_ok());
assert!(!is_ready.unwrap());
thread::sleep(time::Duration::from_micros(1100));
let is_ready = dut.is_ready();
assert!(is_ready.is_ok());
assert!(is_ready.unwrap());
}

#[test]
fn test_callback() {
let clock = Clock::steady();
let context = Context::new(vec![]).unwrap();
let period_ns: i64 = 1e6 as i64; // 1 millisecond.
let foo = Arc::new(Mutex::new(0i64));
let foo_callback = foo.clone();
let dut = Timer::new(
&clock,
&context,
period_ns,
Some(Box::new(move |x| *foo_callback.lock().unwrap() = x)),
)
.unwrap();
dut.callback.unwrap()(123);
assert_eq!(*foo.lock().unwrap(), 123);
}

#[test]
fn test_execute_when_is_not_ready() {
let clock = Clock::steady();
let context = Context::new(vec![]).unwrap();
let period_ns: i64 = 1e6 as i64; // 1 millisecond.
let foo = Arc::new(Mutex::new(0i64));
let foo_callback = foo.clone();
let dut = Timer::new(
&clock,
&context,
period_ns,
Some(Box::new(move |x| *foo_callback.lock().unwrap() = x)),
)
.unwrap();
assert!(dut.execute().is_ok());
assert_eq!(*foo.lock().unwrap(), 0i64);
}

#[test]
fn test_execute_when_is_ready() {
let clock = Clock::steady();
let context = Context::new(vec![]).unwrap();
let period_ns: i64 = 1e6 as i64; // 1 millisecond.
let foo = Arc::new(Mutex::new(0i64));
let foo_callback = foo.clone();
let dut = Timer::new(
&clock,
&context,
period_ns,
Some(Box::new(move |x| *foo_callback.lock().unwrap() = x)),
)
.unwrap();
thread::sleep(time::Duration::from_micros(1500));
assert!(dut.execute().is_ok());
let x = *foo.lock().unwrap();
assert!(x > 1500000i64);
assert!(x < 1600000i64);
}
}
89 changes: 87 additions & 2 deletions rclrs/src/wait.rs
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ use std::{sync::Arc, time::Duration, vec::Vec};
use crate::{
error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult},
rcl_bindings::*,
ClientBase, Context, ContextHandle, Node, ServiceBase, SubscriptionBase,
ClientBase, Context, ContextHandle, Node, ServiceBase, SubscriptionBase, Timer,
};

mod exclusivity_guard;
@@ -51,6 +51,7 @@ pub struct WaitSet {
guard_conditions: Vec<ExclusivityGuard<Arc<GuardCondition>>>,
services: Vec<ExclusivityGuard<Arc<dyn ServiceBase>>>,
handle: WaitSetHandle,
timers: Vec<ExclusivityGuard<Arc<Timer>>>,
}

/// A list of entities that are ready, returned by [`WaitSet::wait`].
@@ -63,6 +64,8 @@ pub struct ReadyEntities {
pub guard_conditions: Vec<Arc<GuardCondition>>,
/// A list of services that have potentially received requests.
pub services: Vec<Arc<dyn ServiceBase>>,
/// A list of timers that are potentially due.
pub timers: Vec<Arc<Timer>>,
}

impl Drop for rcl_wait_set_t {
@@ -127,6 +130,7 @@ impl WaitSet {
rcl_wait_set,
context_handle: Arc::clone(&context.handle),
},
timers: Vec::new(),
})
}

@@ -138,13 +142,14 @@ impl WaitSet {
let live_clients = node.live_clients();
let live_guard_conditions = node.live_guard_conditions();
let live_services = node.live_services();
let live_timers = node.live_timers();
let ctx = Context {
handle: Arc::clone(&node.handle.context_handle),
};
let mut wait_set = WaitSet::new(
live_subscriptions.len(),
live_guard_conditions.len(),
0,
live_timers.len(),
live_clients.len(),
live_services.len(),
0,
@@ -166,6 +171,10 @@ impl WaitSet {
for live_service in &live_services {
wait_set.add_service(live_service.clone())?;
}

for live_timer in &live_timers {
wait_set.add_timer(live_timer.clone())?;
}
Ok(wait_set)
}

@@ -178,6 +187,7 @@ impl WaitSet {
self.guard_conditions.clear();
self.clients.clear();
self.services.clear();
self.timers.clear();
// This cannot fail – the rcl_wait_set_clear function only checks that the input handle is
// valid, which it always is in our case. Hence, only debug_assert instead of returning
// Result.
@@ -311,6 +321,34 @@ impl WaitSet {
Ok(())
}

/// Adds a timer to the wait set.
///
/// # Errors
/// - If the timer was already added to this wait set or another one,
/// [`AlreadyAddedToWaitSet`][1] will be returned
/// - If the number of timer in the wait set is larger than the
/// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
///
/// [1]: crate::RclrsError
/// [2]: crate::RclReturnCode
pub fn add_timer(&mut self, timer: Arc<Timer>) -> Result<(), RclrsError> {
let exclusive_timer =
ExclusivityGuard::new(Arc::clone(&timer), Arc::clone(&timer.in_use_by_wait_set))?;
unsafe {
// SAFETY: I'm not sure if it's required, but the timer pointer will remain valid
// for as long as the wait set exists, because it's stored in self.timers.
// Passing in a null pointer for the third argument is explicitly allowed.
rcl_wait_set_add_timer(
&mut self.handle.rcl_wait_set,
&*timer.rcl_timer.lock().unwrap() as *const _,
core::ptr::null_mut(),
)
}
.ok()?;
self.timers.push(exclusive_timer);
Ok(())
}

/// Blocks until the wait set is ready, or until the timeout has been exceeded.
///
/// If the timeout is `None` then this function will block indefinitely until
@@ -365,6 +403,7 @@ impl WaitSet {
clients: Vec::new(),
guard_conditions: Vec::new(),
services: Vec::new(),
timers: Vec::new(),
};
for (i, subscription) in self.subscriptions.iter().enumerate() {
// SAFETY: The `subscriptions` entry is an array of pointers, and this dereferencing is
@@ -409,13 +448,25 @@ impl WaitSet {
ready_entities.services.push(Arc::clone(&service.waitable));
}
}

for (i, timer) in self.timers.iter().enumerate() {
// SAFETY: The `timers` entry is an array of pointers, and this dereferencing is
// equivalent to
// https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
let wait_set_entry = unsafe { *self.handle.rcl_wait_set.timers.add(i) };
if !wait_set_entry.is_null() {
ready_entities.timers.push(Arc::clone(&timer.waitable));
}
}
Ok(ready_entities)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::clock::Clock;
use crate::timer::TimerCallback;

#[test]
fn traits() {
@@ -440,4 +491,38 @@ mod tests {

Ok(())
}

#[test]
fn timer_in_wait_not_set_readies() -> Result<(), RclrsError> {
let context = Context::new([])?;
let clock = Clock::steady();
let period: i64 = 1e6 as i64; // 1 millisecond.
let callback: Option<TimerCallback> = Some(Box::new(move |_| {}));
let timer = Arc::new(Timer::new(&clock, &context, period, callback)?);

let mut wait_set = WaitSet::new(0, 0, 1, 0, 0, 0, &context)?;
wait_set.add_timer(timer.clone())?;

let readies = wait_set.wait(Some(std::time::Duration::from_micros(0)))?;
assert!(!readies.timers.contains(&timer));

Ok(())
}

#[test]
fn timer_in_wait_set_readies() -> Result<(), RclrsError> {
let context = Context::new([])?;
let clock = Clock::steady();
let period: i64 = 1e6 as i64; // 1 millisecond.
let callback: Option<TimerCallback> = Some(Box::new(move |_| {}));
let timer = Arc::new(Timer::new(&clock, &context, period, callback)?);

let mut wait_set = WaitSet::new(0, 0, 1, 0, 0, 0, &context)?;
wait_set.add_timer(timer.clone())?;

let readies = wait_set.wait(Some(std::time::Duration::from_micros(1500)))?;
assert!(readies.timers.contains(&timer));

Ok(())
}
}