Skip to content

Commit

Permalink
feat: Finish discovery notifications, schemas, and test upgrades
Browse files Browse the repository at this point in the history
  • Loading branch information
bgins committed Feb 1, 2024
1 parent 448f379 commit 86de5ed
Show file tree
Hide file tree
Showing 4 changed files with 332 additions and 201 deletions.
23 changes: 11 additions & 12 deletions homestar-runtime/src/event_handler/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,15 @@ pub(crate) enum DispatchEvent {
pub(crate) fn setup_cache(
sender: Arc<channel::AsyncChannelSender<Event>>,
) -> Cache<String, CacheValue> {
let eviction_listener =
move |_key: Arc<String>, val: CacheValue, cause: RemovalCause| -> ListenerFuture {
let tx = Arc::clone(&sender);

async move {
if let Some(CacheData::OnExpiration(event)) = val.data.get("on_expiration") {
println!("event: {:?}", event);
if cause != Expired {
return;
}
let eviction_listener = move |_key: Arc<String>,
val: CacheValue,
cause: RemovalCause|
-> ListenerFuture {
let tx = Arc::clone(&sender);

async move {
if let Some(CacheData::OnExpiration(event)) = val.data.get("on_expiration") {
if cause == Expired {
match event {
DispatchEvent::RegisterPeer => {
if let Some(CacheData::Peer(rendezvous_node)) =
Expand Down Expand Up @@ -99,8 +97,9 @@ pub(crate) fn setup_cache(
};
}
}
.boxed()
};
}
.boxed()
};

Cache::builder()
.expire_after(Expiry)
Expand Down
96 changes: 22 additions & 74 deletions homestar-runtime/src/event_handler/notification/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,11 @@
use anyhow::anyhow;
use chrono::prelude::Utc;
use homestar_invocation::ipld::DagJson;
use itertools::Itertools;
use jsonrpsee::core::StringError;
use libipld::{
serde::{from_ipld, to_ipld},
Ipld,
};
use libipld::{serde::from_ipld, Ipld};
use libp2p::{Multiaddr, PeerId};
use schemars::{
gen::SchemaGenerator,
schema::{InstanceType, Metadata, ObjectValidation, Schema, SchemaObject, SingleOrVec},
JsonSchema,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, collections::BTreeMap, fmt, str::FromStr};
use std::{collections::BTreeMap, fmt, str::FromStr};

const TIMESTAMP_KEY: &str = "timestamp";

Expand Down Expand Up @@ -145,7 +136,6 @@ pub enum NetworkNotification {
/// Rendezvous discover served notification.
#[schemars(rename = "discover_served_rendezvous")]
DiscoverServedRendezvous(DiscoverServedRendezvous),
// peer_discovered_rendezvous
/// Rendezvous peer registered notification.
#[schemars(rename = "peer_registered_rendezvous")]
PeerRegisteredRendezvous(PeerRegisteredRendezvous),
Expand Down Expand Up @@ -351,14 +341,16 @@ impl TryFrom<Ipld> for ConnectionClosed {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, JsonSchema)]
#[schemars(rename = "discovered_mdns")]
pub struct DiscoveredMdns {
timestamp: i64,
peers: Vec<(String, String)>,
#[schemars(description = "Peers discovered by peer ID and multiaddress")]
peers: BTreeMap<String, String>,
}

impl DiscoveredMdns {
pub(crate) fn new(peers: Vec<(PeerId, Multiaddr)>) -> DiscoveredMdns {
pub(crate) fn new(peers: BTreeMap<PeerId, Multiaddr>) -> DiscoveredMdns {
DiscoveredMdns {
timestamp: Utc::now().timestamp_millis(),
peers: peers
Expand Down Expand Up @@ -401,71 +393,23 @@ impl TryFrom<Ipld> for DiscoveredMdns {
.to_owned(),
)?;

let peers_map = from_ipld::<BTreeMap<String, Ipld>>(
let peers = from_ipld::<BTreeMap<String, String>>(
map.get(peers_key)
.ok_or_else(|| anyhow!("missing {peers_key}"))?
.to_owned(),
)?;

let mut peers: Vec<(String, String)> = vec![];
for peer in peers_map.iter() {
peers.push((peer.0.to_string(), from_ipld(peer.1.to_owned())?))
}

Ok(DiscoveredMdns { timestamp, peers })
}
}

impl JsonSchema for DiscoveredMdns {
fn schema_name() -> String {
"discovered_mdns".to_owned()
}

fn schema_id() -> Cow<'static, str> {
Cow::Borrowed("homestar-runtime::event_handler::notification::swarm::DiscoveredMdns")
}

fn json_schema(gen: &mut SchemaGenerator) -> Schema {
let schema = SchemaObject {
instance_type: Some(SingleOrVec::Single(InstanceType::Object.into())),
object: Some(Box::new(ObjectValidation {
properties: BTreeMap::from([
(
"timestamp".to_string(),
Schema::Object(SchemaObject {
instance_type: Some(SingleOrVec::Single(InstanceType::Number.into())),
..Default::default()
}),
),
(
"peers".to_string(),
Schema::Object(SchemaObject {
instance_type: Some(SingleOrVec::Single(InstanceType::Object.into())),
metadata: Some(Box::new(Metadata {
description: Some("Peers and their addresses".to_string()),
..Default::default()
})),
object: Some(Box::new(ObjectValidation {
additional_properties: Some(Box::new(<String>::json_schema(gen))),
..Default::default()
})),
..Default::default()
}),
),
]),
..Default::default()
})),
..Default::default()
};
schema.into()
}
}

#[derive(Debug, Clone, JsonSchema)]
#[schemars(rename = "discovered_rendezvous")]
pub struct DiscoveredRendezvous {
timestamp: i64,
#[schemars(description = "Server that fulfilled the discovery request")]
server: String,
#[schemars(description = "Peers discovered by peer ID and multiaddresses")]
peers: BTreeMap<String, Vec<String>>,
}

Expand Down Expand Up @@ -561,6 +505,7 @@ impl TryFrom<Ipld> for DiscoveredRendezvous {
#[schemars(rename = "registered_rendezvous")]
pub struct RegisteredRendezvous {
timestamp: i64,
#[schemars(description = "Server that accepted registration")]
server: String,
}

Expand Down Expand Up @@ -613,6 +558,7 @@ impl TryFrom<Ipld> for RegisteredRendezvous {
#[schemars(rename = "registered_rendezvous")]
pub struct DiscoverServedRendezvous {
timestamp: i64,
#[schemars(description = "Peer that requested discovery")]
enquirer: String,
}

Expand Down Expand Up @@ -668,7 +614,9 @@ impl TryFrom<Ipld> for DiscoverServedRendezvous {
#[schemars(rename = "peer_registered_rendezvous")]
pub struct PeerRegisteredRendezvous {
timestamp: i64,
#[schemars(description = "Peer registered")]
peer_id: String,
#[schemars(description = "Multiaddresses for peer")]
addresses: Vec<String>,
}

Expand Down Expand Up @@ -751,7 +699,7 @@ mod test {
peer_id: PeerId,
address: Multiaddr,
addresses: Vec<Multiaddr>,
peers: Vec<(PeerId, Multiaddr)>,
peers: BTreeMap<PeerId, Multiaddr>,
peers_vec_addr: BTreeMap<PeerId, Vec<Multiaddr>>,
}

Expand All @@ -763,7 +711,7 @@ mod test {
Multiaddr::from_str("/ip4/127.0.0.1/tcp/7000").unwrap(),
Multiaddr::from_str("/ip4/127.0.0.1/tcp/7001").unwrap(),
],
peers: vec![
peers: BTreeMap::from([
(
PeerId::random(),
Multiaddr::from_str("/ip4/127.0.0.1/tcp/7000").unwrap(),
Expand All @@ -772,7 +720,7 @@ mod test {
PeerId::random(),
Multiaddr::from_str("/ip4/127.0.0.1/tcp/7001").unwrap(),
),
],
]),
peers_vec_addr: BTreeMap::from([
(
PeerId::random(),
Expand Down Expand Up @@ -861,10 +809,10 @@ mod test {
assert_eq!(n.timestamp, timestamp);

for peer in n.peers {
assert!(peers.contains(&(
PeerId::from_str(&peer.0).unwrap(),
Multiaddr::from_str(&peer.1).unwrap()
)))
assert_eq!(
Multiaddr::from_str(&peer.1).unwrap(),
peers[&PeerId::from_str(&peer.0).unwrap()]
)
}
}
NetworkNotification::DiscoveredRendezvous(n) => {
Expand Down
26 changes: 14 additions & 12 deletions homestar-runtime/src/event_handler/swarm_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,17 +312,15 @@ async fn handle_swarm_event<DB: Database>(
NetworkNotification::DiscoveredRendezvous(
notification::DiscoveredRendezvous::new(
rendezvous_node,
BTreeMap::from(
registrations
.iter()
.map(|registration| {
(
registration.record.peer_id(),
registration.record.addresses().to_owned(),
)
})
.collect::<BTreeMap<PeerId, Vec<Multiaddr>>>(),
),
registrations
.iter()
.map(|registration| {
(
registration.record.peer_id(),
registration.record.addresses().to_owned(),
)
})
.collect::<BTreeMap<PeerId, Vec<Multiaddr>>>(),
),
),
);
Expand Down Expand Up @@ -1101,7 +1099,11 @@ async fn handle_swarm_event<DB: Database>(
#[cfg(feature = "websocket-notify")]
notification::emit_network_event(
event_handler.ws_evt_sender(),
NetworkNotification::DiscoveredMdns(notification::DiscoveredMdns::new(list)),
NetworkNotification::DiscoveredMdns(notification::DiscoveredMdns::new(
list.iter()
.map(|peer| (peer.0, peer.1.to_owned()))
.collect::<BTreeMap<PeerId, Multiaddr>>(),
)),
)
}
SwarmEvent::Behaviour(ComposedEvent::Mdns(mdns::Event::Expired(list))) => {
Expand Down
Loading

0 comments on commit 86de5ed

Please sign in to comment.