Skip to content

Commit

Permalink
Seemed to actually have fixed the error (at least for one test) by sw…
Browse files Browse the repository at this point in the history
…itching away from dashmap
  • Loading branch information
carter committed Oct 22, 2023
1 parent 40a7015 commit 48fdc3a
Showing 1 changed file with 31 additions and 18 deletions.
49 changes: 31 additions & 18 deletions roslibrust/src/ros1/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ pub struct Node {
// Receiver for requests to the Node actor
node_msg_rx: mpsc::UnboundedReceiver<NodeMsg>,
// Map of topic names to the publishing channels associated with the topic
publishers: DashMap<String, Publication>,
publishers: tokio::sync::RwLock<std::collections::HashMap<String, Publication>>,
// Record of subscriptions this node has
subscriptions: DashMap<String, Subscription>,
// Record of what services this node is serving
Expand Down Expand Up @@ -283,7 +283,7 @@ impl Node {
client: rosmaster_client,
_xmlrpc_server: xmlrpc_server,
node_msg_rx: node_receiver,
publishers: DashMap::new(),
publishers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
subscriptions: DashMap::new(),
services: DashMap::new(),
host_addr: addr,
Expand Down Expand Up @@ -337,8 +337,10 @@ impl Node {
NodeMsg::GetPublications { reply } => {
let _ = reply.send(
self.publishers
.read()
.await
.iter()
.map(|entry| (entry.key().clone(), entry.topic_type().to_owned()))
.map(|(key, entry)| (key.clone(), entry.topic_type().to_owned()))
.collect(),
);
}
Expand Down Expand Up @@ -408,10 +410,12 @@ impl Node {
.find(|proto| proto.as_str() == "TCPROS")
.is_some()
{
if let Some(publishing_channel) = self
if let Some((_key, publishing_channel)) = self
.publishers
.read()
.await
.iter()
.find(|publisher| publisher.key() == &topic)
.find(|(key, _pub)| *key == &topic)
{
let protocol_params = ProtocolParams {
hostname: self.hostname.clone(),
Expand Down Expand Up @@ -483,20 +487,26 @@ impl Node {
md5sum: String,
) -> Result<mpsc::Sender<Vec<u8>>, Box<dyn std::error::Error>> {
log::trace!("Registering publisher for: {topic:?}");
if let Some(handle) = self.publishers.iter().find_map(|entry| {
log::trace!("Found existing entry for: {topic:?}");
if entry.key().as_str() == &topic {
if entry.topic_type() == topic_type {
Some(Ok(entry.get_sender()))

let existing_entry = {
let read_lock = self.publishers.read().await;
read_lock.iter().find_map(|(key, value)| {
log::trace!("Found existing entry for: {topic:?}");
if key.as_str() == &topic {
if value.topic_type() == topic_type {
Some(Ok(value.get_sender()))
} else {
Some(Err(Box::new(std::io::Error::from(
std::io::ErrorKind::AddrInUse,
))))
}
} else {
Some(Err(Box::new(std::io::Error::from(
std::io::ErrorKind::AddrInUse,
))))
None
}
} else {
None
}
}) {
})
};

if let Some(handle) = existing_entry {
Ok(handle?)
} else {
log::trace!("Creating new entry for {topic:?}");
Expand All @@ -517,7 +527,10 @@ impl Node {
})?;
log::trace!("Created new publication for {topic:?}");
let handle = channel.get_sender();
self.publishers.insert(topic.clone(), channel);
{
let mut write_lock = self.publishers.write().await;
write_lock.insert(topic.clone(), channel);
}
log::trace!("Inserted new publsiher into dashmap");
let _current_subscribers = self.client.register_publisher(&topic, topic_type).await?;
log::trace!("Registered new publication for {topic:?}");
Expand Down

0 comments on commit 48fdc3a

Please sign in to comment.