diff --git a/homestar-runtime/src/event_handler/notification.rs b/homestar-runtime/src/event_handler/notification.rs index 69611a0d..abe5867c 100644 --- a/homestar-runtime/src/event_handler/notification.rs +++ b/homestar-runtime/src/event_handler/notification.rs @@ -20,8 +20,8 @@ pub(crate) use network::{ NetworkNotification, NewListenAddr, OutgoingConnectionError, PeerRegisteredRendezvous, PublishedReceiptPubsub, PutReceiptDht, PutWorkflowInfoDht, ReceiptQuorumFailureDht, ReceiptQuorumSuccessDht, ReceivedReceiptPubsub, ReceivedWorkflowInfo, RegisteredRendezvous, - SentWorkflowInfo, WorkflowInfoQuorumFailureDht, WorkflowInfoQuorumSuccessDht, - WorkflowInfoSource, + SentWorkflowInfo, StatusChangedAutonat, WorkflowInfoQuorumFailureDht, + WorkflowInfoQuorumSuccessDht, WorkflowInfoSource, }; pub(crate) use receipt::ReceiptNotification; diff --git a/homestar-runtime/src/event_handler/notification/network.rs b/homestar-runtime/src/event_handler/notification/network.rs index 63e3eeae..1a932305 100644 --- a/homestar-runtime/src/event_handler/notification/network.rs +++ b/homestar-runtime/src/event_handler/notification/network.rs @@ -3,18 +3,19 @@ //! [swarm]: libp2p::swarm::Swarm use anyhow::anyhow; - use homestar_invocation::ipld::DagJson; use libipld::{serde::from_ipld, Ipld}; use schemars::JsonSchema; use std::{collections::BTreeMap, fmt}; +pub(crate) mod autonat; pub(crate) mod connection; pub(crate) mod dht; pub(crate) mod mdns; pub(crate) mod pubsub; pub(crate) mod rendezvous; pub(crate) mod req_resp; +pub(crate) use autonat::StatusChangedAutonat; pub(crate) use connection::{ ConnectionClosed, ConnectionEstablished, IncomingConnectionError, NewListenAddr, OutgoingConnectionError, @@ -43,12 +44,15 @@ pub enum NetworkNotification { /// Connection closed notification. #[schemars(rename = "connection_closed")] ConnnectionClosed(ConnectionClosed), - /// Outgoing conenction error notification. + /// Outgoing connection error notification. #[schemars(rename = "outgoing_connection_error")] OutgoingConnectionError(OutgoingConnectionError), - /// Incoming conenction error notification. + /// Incoming connection error notification. #[schemars(rename = "incoming_connection_error")] IncomingConnectionError(IncomingConnectionError), + /// Autonat status changed notification. + #[schemars(rename = "status_changed_autonat")] + StatusChangedAutonat(StatusChangedAutonat), /// mDNS discovered notification. #[schemars(rename = "discovered_mdns")] DiscoveredMdns(DiscoveredMdns), @@ -120,6 +124,7 @@ impl fmt::Display for NetworkNotification { NetworkNotification::IncomingConnectionError(_) => { write!(f, "incoming_connection_error") } + NetworkNotification::StatusChangedAutonat(_) => write!(f, "status_changed_autonat"), NetworkNotification::DiscoveredMdns(_) => write!(f, "discovered_mdns"), NetworkNotification::DiscoveredRendezvous(_) => write!(f, "discovered_rendezvous"), NetworkNotification::RegisteredRendezvous(_) => write!(f, "registered_rendezvous"), @@ -180,6 +185,10 @@ impl From for Ipld { "incoming_connection_error".into(), n.into(), )])), + NetworkNotification::StatusChangedAutonat(n) => Ipld::Map(BTreeMap::from([( + "status_changed_autonat".into(), + n.into(), + )])), NetworkNotification::DiscoveredMdns(n) => { Ipld::Map(BTreeMap::from([("discovered_mdns".into(), n.into())])) } @@ -267,6 +276,9 @@ impl TryFrom for NetworkNotification { "incoming_connection_error" => Ok(NetworkNotification::IncomingConnectionError( IncomingConnectionError::try_from(val.to_owned())?, )), + "status_changed_autonat" => Ok(NetworkNotification::StatusChangedAutonat( + StatusChangedAutonat::try_from(val.to_owned())?, + )), "discovered_mdns" => Ok(NetworkNotification::DiscoveredMdns( DiscoveredMdns::try_from(val.to_owned())?, )), @@ -333,10 +345,12 @@ impl TryFrom for NetworkNotification { #[cfg(test)] mod test { use super::*; + use crate::libp2p::nat_status::NatStatusExt; use faststr::FastStr; use homestar_invocation::test_utils::cid::generate_cid; use libipld::Cid; use libp2p::{ + autonat::NatStatus, swarm::{DialError, ListenError}, Multiaddr, PeerId, }; @@ -350,6 +364,7 @@ mod test { cid: Cid, connected_peer_count: usize, name: FastStr, + nat_status: NatStatus, num_tasks: u32, peer_id: PeerId, peers: Vec, @@ -371,6 +386,7 @@ mod test { cid: generate_cid(&mut thread_rng()), connected_peer_count: 1, name: FastStr::new("Strong Bad"), + nat_status: NatStatus::Public(Multiaddr::from_str("/ip4/127.0.0.1/tcp/7002").unwrap()), num_tasks: 1, peer_id: PeerId::random(), peers: vec![PeerId::random(), PeerId::random()], @@ -411,6 +427,7 @@ mod test { cid, connected_peer_count, name, + nat_status, num_tasks, peer_id, peers, @@ -428,6 +445,7 @@ mod test { let outgoing_connection_error = OutgoingConnectionError::new(Some(peer_id), DialError::NoAddresses); let incoming_connection_error = IncomingConnectionError::new(ListenError::Aborted); + let status_changed_autonat = StatusChangedAutonat::new(nat_status); let discovered_mdns = DiscoveredMdns::new(peers_map); let discovered_rendezvous = DiscoveredRendezvous::new(peer_id, peers_map_vec_addr); let registered_rendezvous = RegisteredRendezvous::new(peer_id); @@ -506,6 +524,10 @@ mod test { incoming_connection_error.timestamp().to_owned(), NetworkNotification::IncomingConnectionError(incoming_connection_error), ), + ( + status_changed_autonat.timestamp().to_owned(), + NetworkNotification::StatusChangedAutonat(status_changed_autonat), + ), ( discovered_mdns.timestamp().to_owned(), NetworkNotification::DiscoveredMdns(discovered_mdns), @@ -584,6 +606,7 @@ mod test { cid, connected_peer_count, name, + nat_status, num_tasks, peer_id, peers, @@ -623,6 +646,18 @@ mod test { assert_eq!(n.timestamp(), timestamp); assert_eq!(n.error().to_string(), ListenError::Aborted.to_string()); } + NetworkNotification::StatusChangedAutonat(n) => { + let (status, address) = nat_status.to_tuple(); + + assert_eq!(n.timestamp(), timestamp); + assert_eq!(n.status(), &status); + assert_eq!( + n.address() + .as_ref() + .map(|a| Multiaddr::from_str(&a).unwrap()), + address + ); + } NetworkNotification::DiscoveredMdns(n) => { assert_eq!(n.timestamp(), timestamp); diff --git a/homestar-runtime/src/event_handler/notification/network/autonat.rs b/homestar-runtime/src/event_handler/notification/network/autonat.rs new file mode 100644 index 00000000..a037c853 --- /dev/null +++ b/homestar-runtime/src/event_handler/notification/network/autonat.rs @@ -0,0 +1,89 @@ +//! Notification types for [swarm] autonat events. +//! +//! [swarm]: libp2p::swarm::Swarm + +use crate::libp2p::nat_status::NatStatusExt; +use anyhow::anyhow; +use chrono::prelude::Utc; +use derive_getters::Getters; +use homestar_invocation::ipld::DagJson; +use libipld::{serde::from_ipld, Ipld}; +use libp2p::autonat::NatStatus; +use schemars::JsonSchema; +use std::collections::BTreeMap; + +const ADDRESS_KEY: &str = "address"; +const STATUS_KEY: &str = "status"; +const TIMESTAMP_KEY: &str = "timestamp"; + +#[derive(Debug, Clone, Getters, JsonSchema)] +#[schemars(rename = "status_changed_autonat")] +pub struct StatusChangedAutonat { + timestamp: i64, + status: String, + address: Option, +} + +impl StatusChangedAutonat { + pub(crate) fn new(status: NatStatus) -> StatusChangedAutonat { + let (status, address) = status.to_tuple(); + + StatusChangedAutonat { + timestamp: Utc::now().timestamp_millis(), + status: status.to_string(), + address: address.map(|a| a.to_string()), + } + } +} + +impl DagJson for StatusChangedAutonat {} + +impl From for Ipld { + fn from(notification: StatusChangedAutonat) -> Self { + Ipld::Map(BTreeMap::from([ + (TIMESTAMP_KEY.into(), notification.timestamp.into()), + (STATUS_KEY.into(), notification.status.into()), + ( + ADDRESS_KEY.into(), + notification + .address + .map(|peer_id| peer_id.into()) + .unwrap_or(Ipld::Null), + ), + ])) + } +} + +impl TryFrom for StatusChangedAutonat { + type Error = anyhow::Error; + + fn try_from(ipld: Ipld) -> Result { + let map = from_ipld::>(ipld)?; + + let timestamp = from_ipld( + map.get(TIMESTAMP_KEY) + .ok_or_else(|| anyhow!("missing {TIMESTAMP_KEY}"))? + .to_owned(), + )?; + + let status = from_ipld( + map.get(STATUS_KEY) + .ok_or_else(|| anyhow!("missing {STATUS_KEY}"))? + .to_owned(), + )?; + + let address = map + .get(ADDRESS_KEY) + .and_then(|ipld| match ipld { + Ipld::Null => None, + ipld => Some(ipld), + }) + .and_then(|ipld| from_ipld(ipld.to_owned()).ok()); + + Ok(StatusChangedAutonat { + timestamp, + status, + address, + }) + } +} diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index 424d19f9..310b9129 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -108,14 +108,29 @@ async fn handle_swarm_event( match event { SwarmEvent::Behaviour(ComposedEvent::Autonat(autonat_event)) => { match autonat_event { - autonat::Event::InboundProbe { .. } => { - // TODO + autonat::Event::InboundProbe(event) => { + // TODO Add log + println!("INBOUND PROBE EVENT: {event:?}") } - autonat::Event::OutboundProbe { .. } => { - // TODO + autonat::Event::OutboundProbe(event) => { + // TODO Add log + println!("OUTBOUND PROBE EVENT: {event:?}"); + println!( + "CONFIDENCE: {}", + event_handler.swarm.behaviour().autonat.confidence() + ) } - autonat::Event::StatusChanged { .. } => { - // TODO + autonat::Event::StatusChanged { old, new } => { + // TODO Add log + println!("STATUS CHANGED: Old - {old:?}, New - {new:?}"); + + #[cfg(feature = "websocket-notify")] + notification::emit_network_event( + event_handler.ws_evt_sender(), + NetworkNotification::StatusChangedAutonat( + notification::StatusChangedAutonat::new(new), + ), + ); } } }