Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution structure (spin-off of #427) #428

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
24 changes: 12 additions & 12 deletions examples/message_demo/src/message_demo.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::{convert::TryInto, env, sync::Arc};
use std::convert::TryInto;

use anyhow::{Error, Result};
use rosidl_runtime_rs::{seq, BoundedSequence, Message, Sequence};

use rclrs::*;

fn check_default_values() {
let msg = rclrs_example_msgs::msg::rmw::VariousTypes::default();
assert!(msg.bool_member);
Expand Down Expand Up @@ -138,38 +140,36 @@ fn demonstrate_sequences() {
fn demonstrate_pubsub() -> Result<(), Error> {
println!("================== Interoperability demo ==================");
// Demonstrate interoperability between idiomatic and RMW-native message types
let context = rclrs::Context::new(env::args())?;
let node = rclrs::create_node(&context, "message_demo")?;
let mut executor = Context::default_from_env()?.create_basic_executor();
let node = executor.create_node("message_demo")?;

let idiomatic_publisher = node.create_publisher::<rclrs_example_msgs::msg::VariousTypes>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
)?;
let idiomatic_publisher = node
.create_publisher::<rclrs_example_msgs::msg::VariousTypes>("topic", QOS_PROFILE_DEFAULT)?;
let direct_publisher = node.create_publisher::<rclrs_example_msgs::msg::rmw::VariousTypes>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
QOS_PROFILE_DEFAULT,
)?;

let _idiomatic_subscription = node
.create_subscription::<rclrs_example_msgs::msg::VariousTypes, _>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
QOS_PROFILE_DEFAULT,
move |_msg: rclrs_example_msgs::msg::VariousTypes| println!("Got idiomatic message!"),
)?;
let _direct_subscription = node
.create_subscription::<rclrs_example_msgs::msg::rmw::VariousTypes, _>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
QOS_PROFILE_DEFAULT,
move |_msg: rclrs_example_msgs::msg::rmw::VariousTypes| {
println!("Got RMW-native message!")
},
)?;
println!("Sending idiomatic message.");
idiomatic_publisher.publish(rclrs_example_msgs::msg::VariousTypes::default())?;
rclrs::spin_once(Arc::clone(&node), None)?;
executor.spin(SpinOptions::spin_once()).first_error()?;
println!("Sending RMW-native message.");
direct_publisher.publish(rclrs_example_msgs::msg::rmw::VariousTypes::default())?;
rclrs::spin_once(Arc::clone(&node), None)?;
executor.spin(SpinOptions::spin_once()).first_error()?;

Ok(())
}
Expand Down
12 changes: 7 additions & 5 deletions examples/minimal_client_service/src/minimal_client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::env;

use anyhow::{Error, Result};
use rclrs::*;

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let mut executor = Context::default_from_env()?.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_client")?;
let node = executor.create_node("minimal_client")?;

let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;

Expand All @@ -30,5 +29,8 @@ fn main() -> Result<(), Error> {
std::thread::sleep(std::time::Duration::from_millis(500));

println!("Waiting for response");
rclrs::spin(node).map_err(|err| err.into())
executor
.spin(SpinOptions::default())
.first_error()
.map_err(|err| err.into())
}
9 changes: 4 additions & 5 deletions examples/minimal_client_service/src/minimal_client_async.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::env;

use anyhow::{Error, Result};
use rclrs::*;

#[tokio::main]
async fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let mut executor = Context::default_from_env()?.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_client")?;
let node = executor.create_node("minimal_client")?;

let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;

Expand All @@ -22,7 +21,7 @@ async fn main() -> Result<(), Error> {

println!("Waiting for response");

let rclrs_spin = tokio::task::spawn_blocking(move || rclrs::spin(node));
let rclrs_spin = tokio::task::spawn_blocking(move || executor.spin(SpinOptions::default()));

let response = future.await?;
println!(
Expand Down
12 changes: 7 additions & 5 deletions examples/minimal_client_service/src/minimal_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::env;

use anyhow::{Error, Result};
use rclrs::*;

fn handle_service(
_request_header: &rclrs::rmw_request_id_t,
Expand All @@ -13,13 +12,16 @@ fn handle_service(
}

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let mut executor = Context::default_from_env()?.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_service")?;
let node = executor.create_node("minimal_service")?;

let _server = node
.create_service::<example_interfaces::srv::AddTwoInts, _>("add_two_ints", handle_service)?;

println!("Starting server");
rclrs::spin(node).map_err(|err| err.into())
executor
.spin(SpinOptions::default())
.first_error()
.map_err(|err| err.into())
}
11 changes: 5 additions & 6 deletions examples/minimal_pub_sub/src/minimal_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use std::env;

use anyhow::{Error, Result};
use rclrs::*;

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let context = Context::default_from_env()?;
let executor = context.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_publisher")?;
let node = executor.create_node("minimal_publisher")?;

let publisher =
node.create_publisher::<std_msgs::msg::String>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
let publisher = node.create_publisher::<std_msgs::msg::String>("topic", QOS_PROFILE_DEFAULT)?;

let mut message = std_msgs::msg::String::default();

Expand Down
15 changes: 9 additions & 6 deletions examples/minimal_pub_sub/src/minimal_subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
use std::env;

use anyhow::{Error, Result};
use rclrs::*;

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let context = Context::default_from_env()?;
let mut executor = context.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_subscriber")?;
let node = executor.create_node("minimal_subscriber")?;

let mut num_messages: usize = 0;

let _subscription = node.create_subscription::<std_msgs::msg::String, _>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
QOS_PROFILE_DEFAULT,
move |msg: std_msgs::msg::String| {
num_messages += 1;
println!("I heard: '{}'", msg.data);
println!("(Got {} messages so far)", num_messages);
},
)?;

rclrs::spin(node).map_err(|err| err.into())
executor
.spin(SpinOptions::default())
.first_error()
.map_err(|err| err.into())
}
48 changes: 22 additions & 26 deletions examples/minimal_pub_sub/src/minimal_two_nodes.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
use std::{
env,
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
},
use rclrs::*;
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
};

use anyhow::{Error, Result};

struct MinimalSubscriber {
num_messages: AtomicU32,
node: Arc<rclrs::Node>,
subscription: Mutex<Option<Arc<rclrs::Subscription<std_msgs::msg::String>>>>,
node: Arc<Node>,
subscription: Mutex<Option<Arc<Subscription<std_msgs::msg::String>>>>,
}

impl MinimalSubscriber {
pub fn new(name: &str, topic: &str) -> Result<Arc<Self>, rclrs::RclrsError> {
let context = rclrs::Context::new(env::args())?;
let node = rclrs::create_node(&context, name)?;
pub fn new(executor: &Executor, name: &str, topic: &str) -> Result<Arc<Self>, RclrsError> {
let node = executor.create_node(name)?;
let minimal_subscriber = Arc::new(MinimalSubscriber {
num_messages: 0.into(),
node,
Expand All @@ -29,7 +26,7 @@ impl MinimalSubscriber {
.node
.create_subscription::<std_msgs::msg::String, _>(
topic,
rclrs::QOS_PROFILE_DEFAULT,
QOS_PROFILE_DEFAULT,
move |msg: std_msgs::msg::String| {
minimal_subscriber_aux.callback(msg);
},
Expand All @@ -50,16 +47,18 @@ impl MinimalSubscriber {
}

fn main() -> Result<(), Error> {
let publisher_context = rclrs::Context::new(env::args())?;
let publisher_node = rclrs::create_node(&publisher_context, "minimal_publisher")?;
let mut executor = Context::default_from_env()?.create_basic_executor();
let publisher_node = executor.create_node("minimal_publisher")?;

let subscriber_node_one = MinimalSubscriber::new("minimal_subscriber_one", "topic")?;
let subscriber_node_two = MinimalSubscriber::new("minimal_subscriber_two", "topic")?;
let _subscriber_node_one =
MinimalSubscriber::new(&executor, "minimal_subscriber_one", "topic")?;
let _subscriber_node_two =
MinimalSubscriber::new(&executor, "minimal_subscriber_two", "topic")?;

let publisher = publisher_node
.create_publisher::<std_msgs::msg::String>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
let publisher =
publisher_node.create_publisher::<std_msgs::msg::String>("topic", QOS_PROFILE_DEFAULT)?;

std::thread::spawn(move || -> Result<(), rclrs::RclrsError> {
std::thread::spawn(move || -> Result<(), RclrsError> {
let mut message = std_msgs::msg::String::default();
let mut publish_count: u32 = 1;
loop {
Expand All @@ -71,11 +70,8 @@ fn main() -> Result<(), Error> {
}
});

let executor = rclrs::SingleThreadedExecutor::new();

executor.add_node(&publisher_node)?;
executor.add_node(&subscriber_node_one.node)?;
executor.add_node(&subscriber_node_two.node)?;

executor.spin().map_err(|err| err.into())
executor
.spin(SpinOptions::default())
.first_error()
.map_err(|err| err.into())
}
10 changes: 5 additions & 5 deletions examples/minimal_pub_sub/src/zero_copy_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::env;

use anyhow::{Error, Result};
use rclrs::*;

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let context = Context::default_from_env()?;
let executor = context.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_publisher")?;
let node = executor.create_node("minimal_publisher")?;

let publisher =
node.create_publisher::<std_msgs::msg::rmw::UInt32>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
node.create_publisher::<std_msgs::msg::rmw::UInt32>("topic", QOS_PROFILE_DEFAULT)?;

let mut publish_count: u32 = 1;

Expand Down
16 changes: 9 additions & 7 deletions examples/minimal_pub_sub/src/zero_copy_subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
use std::env;

use anyhow::{Error, Result};
use rclrs::*;

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let mut executor = Context::default_from_env()?.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_subscriber")?;
let node = executor.create_node("minimal_subscriber")?;

let mut num_messages: usize = 0;

let _subscription = node.create_subscription::<std_msgs::msg::UInt32, _>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
move |msg: rclrs::ReadOnlyLoanedMessage<'_, std_msgs::msg::UInt32>| {
QOS_PROFILE_DEFAULT,
move |msg: ReadOnlyLoanedMessage<'_, std_msgs::msg::UInt32>| {
num_messages += 1;
println!("I heard: '{}'", msg.data);
println!("(Got {} messages so far)", num_messages);
},
)?;

rclrs::spin(node).map_err(|err| err.into())
executor
.spin(SpinOptions::default())
.first_error()
.map_err(|err| err.into())
}
27 changes: 14 additions & 13 deletions examples/rust_pubsub/src/simple_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,37 @@
use rclrs::{create_node, Context, Node, Publisher, RclrsError, QOS_PROFILE_DEFAULT};
use rclrs::*;
use std::{sync::Arc, thread, time::Duration};
use std_msgs::msg::String as StringMsg;
struct SimplePublisherNode {
node: Arc<Node>,
_publisher: Arc<Publisher<StringMsg>>,

struct SimplePublisher {
publisher: Arc<Publisher<StringMsg>>,
}
impl SimplePublisherNode {
fn new(context: &Context) -> Result<Self, RclrsError> {
let node = create_node(context, "simple_publisher").unwrap();
let _publisher = node

impl SimplePublisher {
fn new(executor: &Executor) -> Result<Self, RclrsError> {
let node = executor.create_node("simple_publisher").unwrap();
let publisher = node
.create_publisher("publish_hello", QOS_PROFILE_DEFAULT)
.unwrap();
Ok(Self { node, _publisher })
Ok(Self { publisher })
}

fn publish_data(&self, increment: i32) -> Result<i32, RclrsError> {
let msg: StringMsg = StringMsg {
data: format!("Hello World {}", increment),
};
self._publisher.publish(msg).unwrap();
self.publisher.publish(msg).unwrap();
Ok(increment + 1_i32)
}
}

fn main() -> Result<(), RclrsError> {
let context = Context::new(std::env::args()).unwrap();
let publisher = Arc::new(SimplePublisherNode::new(&context).unwrap());
let mut executor = Context::default_from_env().unwrap().create_basic_executor();
let publisher = Arc::new(SimplePublisher::new(&executor).unwrap());
let publisher_other_thread = Arc::clone(&publisher);
let mut count: i32 = 0;
thread::spawn(move || loop {
thread::sleep(Duration::from_millis(1000));
count = publisher_other_thread.publish_data(count).unwrap();
});
rclrs::spin(publisher.node.clone())
executor.spin(SpinOptions::default()).first_error()
}
Loading
Loading