Skip to content

Commit

Permalink
Add event dedublication to src/device.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jauler committed Dec 12, 2024
1 parent a0962a1 commit a60d8bc
Showing 1 changed file with 40 additions and 13 deletions.
53 changes: 40 additions & 13 deletions src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,11 @@ struct Runtime {
/// Some of the events are time based, so just poll the whole state from time to time
polling_interval: Interval,

/// Store most recent reported event to the apps per node
///
/// TODO: This is planned to be refactored into a bit better solution in https://github.com/NordSecurity/libtelio/pull/1021
last_transmitted_event: HashMap<PublicKey, Node>,

#[cfg(test)]
/// MockedAdapter (tests)
test_env: telio_wg::tests::Env,
Expand Down Expand Up @@ -1275,6 +1280,7 @@ impl Runtime {
derp_events_publisher: derp_events.tx,
},
polling_interval,
last_transmitted_event: Default::default(),
#[cfg(test)]
test_env: wg::tests::Env {
analytics: analytics_ch,
Expand Down Expand Up @@ -2302,6 +2308,21 @@ impl Runtime {
}
}
}

fn is_dublicated_event(&mut self, node: &Node) -> bool {
match self.last_transmitted_event.get(&node.public_key) {
None => node.state == PeerState::Disconnected,
Some(last_node) => *node == *last_node,
}
}

fn remember_last_transmitted_node_event(&mut self, node: Node) {
if node.state == PeerState::Disconnected {
self.last_transmitted_event.remove(&node.public_key);
} else {
self.last_transmitted_event.insert(node.public_key, node);
}
}
}

#[async_trait]
Expand Down Expand Up @@ -2364,11 +2385,11 @@ impl TaskRuntime for Runtime {

if let Some(node) = node {
// Publish WG event to app
let event = Event::builder::<Node>().set(node).build();
if let Some(event) = event {
if !self.is_dublicated_event(&node) {
let _ = self.event_publishers.libtelio_event_publisher.send(
Box::new(event)
Box::new(Event::Node {body: node.clone()})
);
self.remember_last_transmitted_node_event(node);
}
}

Expand Down Expand Up @@ -2418,26 +2439,32 @@ impl TaskRuntime for Runtime {

match (&pq_event, &self.requested_state.exit_node, &self.requested_state.last_exit_node) {
(telio_pq::Event::Connecting(pubkey), Some(exit), _) if exit.public_key == *pubkey => {
let body = Node {
let node = Node {
state: PeerState::Connecting,
link_state: Some(LinkState::Down),
link_state: if self.features.link_detection.is_some() {Some(LinkState::Down)} else {None},
..node_from_exit_node(exit)
};

let _ = self.event_publishers.libtelio_event_publisher.send(
Box::new(Event::Node { body })
);
if !self.is_dublicated_event(&node) {
let _ = self.event_publishers.libtelio_event_publisher.send(
Box::new(Event::Node {body: node.clone()})
);
self.remember_last_transmitted_node_event(node);
}
},
(telio_pq::Event::Disconnected(pubkey), _, Some(last_exit)) if last_exit.public_key == *pubkey => {
let body = Node {
let node = Node {
state: PeerState::Disconnected,
link_state: Some(LinkState::Down),
link_state: if self.features.link_detection.is_some() {Some(LinkState::Down)} else {None},
..node_from_exit_node(last_exit)
};

let _ = self.event_publishers.libtelio_event_publisher.send(
Box::new(Event::Node { body })
);
if !self.is_dublicated_event(&node) {
let _ = self.event_publishers.libtelio_event_publisher.send(
Box::new(Event::Node {body: node.clone()})
);
self.remember_last_transmitted_node_event(node);
}
},
_ => (),
}
Expand Down

0 comments on commit a60d8bc

Please sign in to comment.