Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add AutoNAT behavior #632

Merged
merged 12 commits into from
May 17, 2024
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions homestar-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ jsonrpsee = { version = "0.21", default-features = false, features = [
] }
libipld = { workspace = true }
libp2p = { version = "0.53", default-features = false, features = [
"autonat",
"dns",
"kad",
"request-response",
Expand Down
4 changes: 2 additions & 2 deletions homestar-runtime/src/event_handler/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub(crate) use network::{
NetworkNotification, NewListenAddr, OutgoingConnectionError, PeerRegisteredRendezvous,
PublishedReceiptPubsub, PutReceiptDht, PutWorkflowInfoDht, ReceiptQuorumFailureDht,
ReceiptQuorumSuccessDht, ReceivedReceiptPubsub, ReceivedWorkflowInfo, RegisteredRendezvous,
SentWorkflowInfo, WorkflowInfoQuorumFailureDht, WorkflowInfoQuorumSuccessDht,
WorkflowInfoSource,
SentWorkflowInfo, StatusChangedAutonat, WorkflowInfoQuorumFailureDht,
WorkflowInfoQuorumSuccessDht, WorkflowInfoSource,
};
pub(crate) use receipt::ReceiptNotification;

Expand Down
41 changes: 38 additions & 3 deletions homestar-runtime/src/event_handler/notification/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
//! [swarm]: libp2p::swarm::Swarm

use anyhow::anyhow;

use homestar_invocation::ipld::DagJson;
use libipld::{serde::from_ipld, Ipld};
use schemars::JsonSchema;
use std::{collections::BTreeMap, fmt};

pub(crate) mod autonat;
pub(crate) mod connection;
pub(crate) mod dht;
pub(crate) mod mdns;
pub(crate) mod pubsub;
pub(crate) mod rendezvous;
pub(crate) mod req_resp;
pub(crate) use autonat::StatusChangedAutonat;
pub(crate) use connection::{
ConnectionClosed, ConnectionEstablished, IncomingConnectionError, NewListenAddr,
OutgoingConnectionError,
Expand Down Expand Up @@ -43,12 +44,15 @@ pub enum NetworkNotification {
/// Connection closed notification.
#[schemars(rename = "connection_closed")]
ConnnectionClosed(ConnectionClosed),
/// Outgoing conenction error notification.
/// Outgoing connection error notification.
#[schemars(rename = "outgoing_connection_error")]
OutgoingConnectionError(OutgoingConnectionError),
/// Incoming conenction error notification.
/// Incoming connection error notification.
#[schemars(rename = "incoming_connection_error")]
IncomingConnectionError(IncomingConnectionError),
/// Autonat status changed notification.
#[schemars(rename = "status_changed_autonat")]
StatusChangedAutonat(StatusChangedAutonat),
/// mDNS discovered notification.
#[schemars(rename = "discovered_mdns")]
DiscoveredMdns(DiscoveredMdns),
Expand Down Expand Up @@ -120,6 +124,7 @@ impl fmt::Display for NetworkNotification {
NetworkNotification::IncomingConnectionError(_) => {
write!(f, "incoming_connection_error")
}
NetworkNotification::StatusChangedAutonat(_) => write!(f, "status_changed_autonat"),
NetworkNotification::DiscoveredMdns(_) => write!(f, "discovered_mdns"),
NetworkNotification::DiscoveredRendezvous(_) => write!(f, "discovered_rendezvous"),
NetworkNotification::RegisteredRendezvous(_) => write!(f, "registered_rendezvous"),
Expand Down Expand Up @@ -180,6 +185,10 @@ impl From<NetworkNotification> for Ipld {
"incoming_connection_error".into(),
n.into(),
)])),
NetworkNotification::StatusChangedAutonat(n) => Ipld::Map(BTreeMap::from([(
"status_changed_autonat".into(),
n.into(),
)])),
NetworkNotification::DiscoveredMdns(n) => {
Ipld::Map(BTreeMap::from([("discovered_mdns".into(), n.into())]))
}
Expand Down Expand Up @@ -267,6 +276,9 @@ impl TryFrom<Ipld> for NetworkNotification {
"incoming_connection_error" => Ok(NetworkNotification::IncomingConnectionError(
IncomingConnectionError::try_from(val.to_owned())?,
)),
"status_changed_autonat" => Ok(NetworkNotification::StatusChangedAutonat(
StatusChangedAutonat::try_from(val.to_owned())?,
)),
"discovered_mdns" => Ok(NetworkNotification::DiscoveredMdns(
DiscoveredMdns::try_from(val.to_owned())?,
)),
Expand Down Expand Up @@ -333,10 +345,12 @@ impl TryFrom<Ipld> for NetworkNotification {
#[cfg(test)]
mod test {
use super::*;
use crate::libp2p::nat_status::NatStatusExt;
use faststr::FastStr;
use homestar_invocation::test_utils::cid::generate_cid;
use libipld::Cid;
use libp2p::{
autonat::NatStatus,
swarm::{DialError, ListenError},
Multiaddr, PeerId,
};
Expand All @@ -350,6 +364,7 @@ mod test {
cid: Cid,
connected_peer_count: usize,
name: FastStr,
nat_status: NatStatus,
num_tasks: u32,
peer_id: PeerId,
peers: Vec<PeerId>,
Expand All @@ -371,6 +386,7 @@ mod test {
cid: generate_cid(&mut thread_rng()),
connected_peer_count: 1,
name: FastStr::new("Strong Bad"),
nat_status: NatStatus::Public(Multiaddr::from_str("/ip4/127.0.0.1/tcp/7002").unwrap()),
num_tasks: 1,
peer_id: PeerId::random(),
peers: vec![PeerId::random(), PeerId::random()],
Expand Down Expand Up @@ -411,6 +427,7 @@ mod test {
cid,
connected_peer_count,
name,
nat_status,
num_tasks,
peer_id,
peers,
Expand All @@ -428,6 +445,7 @@ mod test {
let outgoing_connection_error =
OutgoingConnectionError::new(Some(peer_id), DialError::NoAddresses);
let incoming_connection_error = IncomingConnectionError::new(ListenError::Aborted);
let status_changed_autonat = StatusChangedAutonat::new(nat_status);
let discovered_mdns = DiscoveredMdns::new(peers_map);
let discovered_rendezvous = DiscoveredRendezvous::new(peer_id, peers_map_vec_addr);
let registered_rendezvous = RegisteredRendezvous::new(peer_id);
Expand Down Expand Up @@ -506,6 +524,10 @@ mod test {
incoming_connection_error.timestamp().to_owned(),
NetworkNotification::IncomingConnectionError(incoming_connection_error),
),
(
status_changed_autonat.timestamp().to_owned(),
NetworkNotification::StatusChangedAutonat(status_changed_autonat),
),
(
discovered_mdns.timestamp().to_owned(),
NetworkNotification::DiscoveredMdns(discovered_mdns),
Expand Down Expand Up @@ -584,6 +606,7 @@ mod test {
cid,
connected_peer_count,
name,
nat_status,
num_tasks,
peer_id,
peers,
Expand Down Expand Up @@ -623,6 +646,18 @@ mod test {
assert_eq!(n.timestamp(), timestamp);
assert_eq!(n.error().to_string(), ListenError::Aborted.to_string());
}
NetworkNotification::StatusChangedAutonat(n) => {
let (status, address) = nat_status.to_tuple();

assert_eq!(n.timestamp(), timestamp);
assert_eq!(n.status(), &status);
assert_eq!(
n.address()
.as_ref()
.map(|a| Multiaddr::from_str(&a).unwrap()),
address
);
}
NetworkNotification::DiscoveredMdns(n) => {
assert_eq!(n.timestamp(), timestamp);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//! Notification types for [swarm] autonat events.
//!
//! [swarm]: libp2p::swarm::Swarm

use crate::libp2p::nat_status::NatStatusExt;
use anyhow::anyhow;
use chrono::prelude::Utc;
use derive_getters::Getters;
use homestar_invocation::ipld::DagJson;
use libipld::{serde::from_ipld, Ipld};
use libp2p::autonat::NatStatus;
use schemars::JsonSchema;
use std::collections::BTreeMap;

const ADDRESS_KEY: &str = "address";
const STATUS_KEY: &str = "status";
const TIMESTAMP_KEY: &str = "timestamp";

#[derive(Debug, Clone, Getters, JsonSchema)]
#[schemars(rename = "status_changed_autonat")]
pub struct StatusChangedAutonat {
timestamp: i64,
status: String,
address: Option<String>,
}

impl StatusChangedAutonat {
pub(crate) fn new(status: NatStatus) -> StatusChangedAutonat {
let (status, address) = status.to_tuple();

StatusChangedAutonat {
timestamp: Utc::now().timestamp_millis(),
status: status.to_string(),
address: address.map(|a| a.to_string()),
}
}
}

impl DagJson for StatusChangedAutonat {}

impl From<StatusChangedAutonat> for Ipld {
fn from(notification: StatusChangedAutonat) -> Self {
Ipld::Map(BTreeMap::from([
(TIMESTAMP_KEY.into(), notification.timestamp.into()),
(STATUS_KEY.into(), notification.status.into()),
(
ADDRESS_KEY.into(),
notification
.address
.map(|peer_id| peer_id.into())
.unwrap_or(Ipld::Null),
),
]))
}
}

impl TryFrom<Ipld> for StatusChangedAutonat {
type Error = anyhow::Error;

fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
let map = from_ipld::<BTreeMap<String, Ipld>>(ipld)?;

let timestamp = from_ipld(
map.get(TIMESTAMP_KEY)
.ok_or_else(|| anyhow!("missing {TIMESTAMP_KEY}"))?
.to_owned(),
)?;

let status = from_ipld(
map.get(STATUS_KEY)
.ok_or_else(|| anyhow!("missing {STATUS_KEY}"))?
.to_owned(),
)?;

let address = map
.get(ADDRESS_KEY)
.and_then(|ipld| match ipld {
Ipld::Null => None,
ipld => Some(ipld),
})
.and_then(|ipld| from_ipld(ipld.to_owned()).ok());

Ok(StatusChangedAutonat {
timestamp,
status,
address,
})
}
}
Loading
Loading