Skip to content

Commit

Permalink
chore: Add StatusChangedAutonat notification
Browse files Browse the repository at this point in the history
  • Loading branch information
bgins committed Apr 1, 2024
1 parent 643b3c9 commit 62fc71a
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 11 deletions.
4 changes: 2 additions & 2 deletions homestar-runtime/src/event_handler/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub(crate) use network::{
NetworkNotification, NewListenAddr, OutgoingConnectionError, PeerRegisteredRendezvous,
PublishedReceiptPubsub, PutReceiptDht, PutWorkflowInfoDht, ReceiptQuorumFailureDht,
ReceiptQuorumSuccessDht, ReceivedReceiptPubsub, ReceivedWorkflowInfo, RegisteredRendezvous,
SentWorkflowInfo, WorkflowInfoQuorumFailureDht, WorkflowInfoQuorumSuccessDht,
WorkflowInfoSource,
SentWorkflowInfo, StatusChangedAutonat, WorkflowInfoQuorumFailureDht,
WorkflowInfoQuorumSuccessDht, WorkflowInfoSource,
};
pub(crate) use receipt::ReceiptNotification;

Expand Down
41 changes: 38 additions & 3 deletions homestar-runtime/src/event_handler/notification/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
//! [swarm]: libp2p::swarm::Swarm

use anyhow::anyhow;

use homestar_invocation::ipld::DagJson;
use libipld::{serde::from_ipld, Ipld};
use schemars::JsonSchema;
use std::{collections::BTreeMap, fmt};

pub(crate) mod autonat;
pub(crate) mod connection;
pub(crate) mod dht;
pub(crate) mod mdns;
pub(crate) mod pubsub;
pub(crate) mod rendezvous;
pub(crate) mod req_resp;
pub(crate) use autonat::StatusChangedAutonat;
pub(crate) use connection::{
ConnectionClosed, ConnectionEstablished, IncomingConnectionError, NewListenAddr,
OutgoingConnectionError,
Expand Down Expand Up @@ -43,12 +44,15 @@ pub enum NetworkNotification {
/// Connection closed notification.
#[schemars(rename = "connection_closed")]
ConnnectionClosed(ConnectionClosed),
/// Outgoing conenction error notification.
/// Outgoing connection error notification.
#[schemars(rename = "outgoing_connection_error")]
OutgoingConnectionError(OutgoingConnectionError),
/// Incoming conenction error notification.
/// Incoming connection error notification.
#[schemars(rename = "incoming_connection_error")]
IncomingConnectionError(IncomingConnectionError),
/// Autonat status changed notification.
#[schemars(rename = "status_changed_autonat")]
StatusChangedAutonat(StatusChangedAutonat),
/// mDNS discovered notification.
#[schemars(rename = "discovered_mdns")]
DiscoveredMdns(DiscoveredMdns),
Expand Down Expand Up @@ -120,6 +124,7 @@ impl fmt::Display for NetworkNotification {
NetworkNotification::IncomingConnectionError(_) => {
write!(f, "incoming_connection_error")
}
NetworkNotification::StatusChangedAutonat(_) => write!(f, "status_changed_autonat"),
NetworkNotification::DiscoveredMdns(_) => write!(f, "discovered_mdns"),
NetworkNotification::DiscoveredRendezvous(_) => write!(f, "discovered_rendezvous"),
NetworkNotification::RegisteredRendezvous(_) => write!(f, "registered_rendezvous"),
Expand Down Expand Up @@ -180,6 +185,10 @@ impl From<NetworkNotification> for Ipld {
"incoming_connection_error".into(),
n.into(),
)])),
NetworkNotification::StatusChangedAutonat(n) => Ipld::Map(BTreeMap::from([(
"status_changed_autonat".into(),
n.into(),
)])),
NetworkNotification::DiscoveredMdns(n) => {
Ipld::Map(BTreeMap::from([("discovered_mdns".into(), n.into())]))
}
Expand Down Expand Up @@ -267,6 +276,9 @@ impl TryFrom<Ipld> for NetworkNotification {
"incoming_connection_error" => Ok(NetworkNotification::IncomingConnectionError(
IncomingConnectionError::try_from(val.to_owned())?,
)),
"status_changed_autonat" => Ok(NetworkNotification::StatusChangedAutonat(
StatusChangedAutonat::try_from(val.to_owned())?,
)),
"discovered_mdns" => Ok(NetworkNotification::DiscoveredMdns(
DiscoveredMdns::try_from(val.to_owned())?,
)),
Expand Down Expand Up @@ -333,10 +345,12 @@ impl TryFrom<Ipld> for NetworkNotification {
#[cfg(test)]
mod test {
use super::*;
use crate::libp2p::nat_status::NatStatusExt;
use faststr::FastStr;
use homestar_invocation::test_utils::cid::generate_cid;
use libipld::Cid;
use libp2p::{
autonat::NatStatus,
swarm::{DialError, ListenError},
Multiaddr, PeerId,
};
Expand All @@ -350,6 +364,7 @@ mod test {
cid: Cid,
connected_peer_count: usize,
name: FastStr,
nat_status: NatStatus,
num_tasks: u32,
peer_id: PeerId,
peers: Vec<PeerId>,
Expand All @@ -371,6 +386,7 @@ mod test {
cid: generate_cid(&mut thread_rng()),
connected_peer_count: 1,
name: FastStr::new("Strong Bad"),
nat_status: NatStatus::Public(Multiaddr::from_str("/ip4/127.0.0.1/tcp/7002").unwrap()),
num_tasks: 1,
peer_id: PeerId::random(),
peers: vec![PeerId::random(), PeerId::random()],
Expand Down Expand Up @@ -411,6 +427,7 @@ mod test {
cid,
connected_peer_count,
name,
nat_status,
num_tasks,
peer_id,
peers,
Expand All @@ -428,6 +445,7 @@ mod test {
let outgoing_connection_error =
OutgoingConnectionError::new(Some(peer_id), DialError::NoAddresses);
let incoming_connection_error = IncomingConnectionError::new(ListenError::Aborted);
let status_changed_autonat = StatusChangedAutonat::new(nat_status);
let discovered_mdns = DiscoveredMdns::new(peers_map);
let discovered_rendezvous = DiscoveredRendezvous::new(peer_id, peers_map_vec_addr);
let registered_rendezvous = RegisteredRendezvous::new(peer_id);
Expand Down Expand Up @@ -506,6 +524,10 @@ mod test {
incoming_connection_error.timestamp().to_owned(),
NetworkNotification::IncomingConnectionError(incoming_connection_error),
),
(
status_changed_autonat.timestamp().to_owned(),
NetworkNotification::StatusChangedAutonat(status_changed_autonat),
),
(
discovered_mdns.timestamp().to_owned(),
NetworkNotification::DiscoveredMdns(discovered_mdns),
Expand Down Expand Up @@ -584,6 +606,7 @@ mod test {
cid,
connected_peer_count,
name,
nat_status,
num_tasks,
peer_id,
peers,
Expand Down Expand Up @@ -623,6 +646,18 @@ mod test {
assert_eq!(n.timestamp(), timestamp);
assert_eq!(n.error().to_string(), ListenError::Aborted.to_string());
}
NetworkNotification::StatusChangedAutonat(n) => {
let (status, address) = nat_status.to_tuple();

assert_eq!(n.timestamp(), timestamp);
assert_eq!(n.status(), &status);
assert_eq!(
n.address()
.as_ref()
.map(|a| Multiaddr::from_str(&a).unwrap()),
address
);
}
NetworkNotification::DiscoveredMdns(n) => {
assert_eq!(n.timestamp(), timestamp);

Expand Down
89 changes: 89 additions & 0 deletions homestar-runtime/src/event_handler/notification/network/autonat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//! Notification types for [swarm] autonat events.
//!
//! [swarm]: libp2p::swarm::Swarm

use crate::libp2p::nat_status::NatStatusExt;
use anyhow::anyhow;
use chrono::prelude::Utc;
use derive_getters::Getters;
use homestar_invocation::ipld::DagJson;
use libipld::{serde::from_ipld, Ipld};
use libp2p::autonat::NatStatus;
use schemars::JsonSchema;
use std::collections::BTreeMap;

const ADDRESS_KEY: &str = "address";
const STATUS_KEY: &str = "status";
const TIMESTAMP_KEY: &str = "timestamp";

#[derive(Debug, Clone, Getters, JsonSchema)]
#[schemars(rename = "status_changed_autonat")]
pub struct StatusChangedAutonat {
timestamp: i64,
status: String,
address: Option<String>,
}

impl StatusChangedAutonat {
pub(crate) fn new(status: NatStatus) -> StatusChangedAutonat {
let (status, address) = status.to_tuple();

StatusChangedAutonat {
timestamp: Utc::now().timestamp_millis(),
status: status.to_string(),
address: address.map(|a| a.to_string()),
}
}
}

impl DagJson for StatusChangedAutonat {}

impl From<StatusChangedAutonat> for Ipld {
fn from(notification: StatusChangedAutonat) -> Self {
Ipld::Map(BTreeMap::from([
(TIMESTAMP_KEY.into(), notification.timestamp.into()),
(STATUS_KEY.into(), notification.status.into()),
(
ADDRESS_KEY.into(),
notification
.address
.map(|peer_id| peer_id.into())
.unwrap_or(Ipld::Null),
),
]))
}
}

impl TryFrom<Ipld> for StatusChangedAutonat {
type Error = anyhow::Error;

fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
let map = from_ipld::<BTreeMap<String, Ipld>>(ipld)?;

let timestamp = from_ipld(
map.get(TIMESTAMP_KEY)
.ok_or_else(|| anyhow!("missing {TIMESTAMP_KEY}"))?
.to_owned(),
)?;

let status = from_ipld(
map.get(STATUS_KEY)
.ok_or_else(|| anyhow!("missing {STATUS_KEY}"))?
.to_owned(),
)?;

let address = map
.get(ADDRESS_KEY)
.and_then(|ipld| match ipld {
Ipld::Null => None,
ipld => Some(ipld),
})
.and_then(|ipld| from_ipld(ipld.to_owned()).ok());

Ok(StatusChangedAutonat {
timestamp,
status,
address,
})
}
}
27 changes: 21 additions & 6 deletions homestar-runtime/src/event_handler/swarm_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,29 @@ async fn handle_swarm_event<DB: Database>(
match event {
SwarmEvent::Behaviour(ComposedEvent::Autonat(autonat_event)) => {
match autonat_event {
autonat::Event::InboundProbe { .. } => {
// TODO
autonat::Event::InboundProbe(event) => {
// TODO Add log
println!("INBOUND PROBE EVENT: {event:?}")
}
autonat::Event::OutboundProbe { .. } => {
// TODO
autonat::Event::OutboundProbe(event) => {
// TODO Add log
println!("OUTBOUND PROBE EVENT: {event:?}");
println!(
"CONFIDENCE: {}",
event_handler.swarm.behaviour().autonat.confidence()
)
}
autonat::Event::StatusChanged { .. } => {
// TODO
autonat::Event::StatusChanged { old, new } => {
// TODO Add log
println!("STATUS CHANGED: Old - {old:?}, New - {new:?}");

#[cfg(feature = "websocket-notify")]
notification::emit_network_event(
event_handler.ws_evt_sender(),
NetworkNotification::StatusChangedAutonat(
notification::StatusChangedAutonat::new(new),
),
);
}
}
}
Expand Down

0 comments on commit 62fc71a

Please sign in to comment.