Skip to content

Commit

Permalink
feat(peersync)!: push local peer record to connected peer on change
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Nov 13, 2024
1 parent 3072882 commit 481cc12
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 174 deletions.
7 changes: 2 additions & 5 deletions network/core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -861,14 +861,11 @@ where
Autonat(event) => {
self.on_autonat_event(event)?;
},
PeerSync(peersync::Event::LocalPeerRecordUpdated { record }) => {
info!(target: LOG_TARGET, "🧑‍🧑‍🧒‍🧒 Local peer record updated: {:?}",record);
},
PeerSync(peersync::Event::PeerBatchReceived { new_peers, from_peer }) => {
info!(target: LOG_TARGET, "🧑‍🧑‍🧒‍🧒 Peer batch received: from_peer={}, new_peers={}", from_peer, new_peers);
debug!(target: LOG_TARGET, "🧑‍🧑‍🧒‍🧒 Peer batch received: from_peer={}, new_peers={}", from_peer, new_peers);
},
PeerSync(event) => {
info!(target: LOG_TARGET, "ℹ️ PeerSync event: {:?}", event);
debug!(target: LOG_TARGET, "ℹ️ PeerSync event: {:?}", event);
},
Kad(kad::Event::OutboundQueryProgressed {
id,
Expand Down
35 changes: 21 additions & 14 deletions network/libp2p-peersync/proto/messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,37 @@

syntax = "proto3";

message Message {
oneof payload {
SignedPeerRecord LocalRecord = 1;
WantPeers WantPeers = 2;
}
}

// A want request for peers
message WantPeers {
// Requested peers
repeated bytes want_peer_ids = 1;
// Requested peers
repeated bytes want_peer_ids = 1;
}

// Response to a want request. This response is streamed back to the requester.
message WantPeerResponse {
// A peer that was requested.
SignedPeerRecord peer = 1;
// A peer that was requested.
SignedPeerRecord peer = 1;
}

message SignedPeerRecord {
// The addresses of the peer
repeated bytes addresses = 1;
// The Unix epoch based timestamp when this peer record was signed
uint64 ts_updated_at = 2;
// The signature that signs the peer record (addresses | ts_updated_at)
PeerSignature signature = 3;
// The addresses of the peer
repeated bytes addresses = 1;
// The Unix epoch based timestamp when this peer record was signed
uint64 ts_updated_at = 2;
// The signature that signs the peer record (addresses | ts_updated_at)
PeerSignature signature = 3;
}

message PeerSignature {
// The public key of the peer
bytes public_key = 1;
// The signature that signs the peer record (addresses | ts_updated_at)
bytes signature = 2;
// The public key of the peer
bytes public_key = 1;
// The signature that signs the peer record (addresses | ts_updated_at)
bytes signature = 2;
}
81 changes: 45 additions & 36 deletions network/libp2p-peersync/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ use libp2p::{
futures::executor::block_on,
identity::Keypair,
swarm::{
behaviour::ExternalAddrConfirmed,
AddressChange,
ConnectionClosed,
ConnectionDenied,
ConnectionId,
Expand All @@ -34,7 +32,7 @@ use libp2p::{
use crate::{
error::Error,
event::Event,
handler::Handler,
handler::{Handler, HandlerAction},
store::PeerStore,
Config,
LocalPeerRecord,
Expand Down Expand Up @@ -72,7 +70,7 @@ where TPeerStore: PeerStore
Self::with_custom_protocol(keypair, DEFAULT_PROTOCOL_NAME, store, config)
}

pub fn with_custom_protocol(
fn with_custom_protocol(
keypair: Keypair,
protocol: StreamProtocol,
peer_store: TPeerStore,
Expand All @@ -88,7 +86,7 @@ where TPeerStore: PeerStore
active_outbound_connections: HashMap::new(),
remaining_want_peers: HashSet::new(),
pending_syncs: VecDeque::new(),
pending_tasks: futures_bounded::FuturesSet::new(Duration::from_secs(1000), 1024),
pending_tasks: futures_bounded::FuturesSet::new(Duration::from_secs(30), 1024),
sync_semaphore: Arc::new(async_semaphore::Semaphore::new(1)),
}
}
Expand All @@ -106,6 +104,10 @@ where TPeerStore: PeerStore
.map_err(|e| Error::StoreError(e.to_string()))
}

pub fn local_peer_record(&self) -> &LocalPeerRecord {
&self.local_peer_record
}

pub fn add_known_local_public_addresses(&mut self, addrs: Vec<Multiaddr>) {
if addrs.is_empty() {
return;
Expand Down Expand Up @@ -151,7 +153,7 @@ where TPeerStore: PeerStore
self.pending_events.push_back(ToSwarm::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(*conn_id),
event: list.clone(),
event: HandlerAction::WantPeers(list.clone()),
});
}
}
Expand All @@ -174,39 +176,31 @@ where TPeerStore: PeerStore
}
}

fn on_address_change(&mut self, _address_change: AddressChange) {}

fn on_external_addr_confirmed(&mut self, addr_confirmed: ExternalAddrConfirmed) {
self.local_peer_record.add_address(addr_confirmed.addr.clone());
self.handle_update_local_record()
}

fn handle_update_local_record(&mut self) {
let store = self.peer_store.clone();
let local_peer_record = self.local_peer_record.clone();
if !local_peer_record.is_signed() {
return;
}
let peer_rec = match local_peer_record.clone().try_into() {
Ok(peer_rec) => peer_rec,
Err(err) => {
tracing::error!("Failed to convert local peer record to signed peer record: {}", err);
return;
},
};
let local_peer_rec = SignedPeerRecord::from(local_peer_record);
let local_rec = Arc::new(local_peer_rec.clone());
let task = async move {
match store.put(peer_rec).await {
Ok(_) => Event::LocalPeerRecordUpdated {
record: local_peer_record,
},
match store.put(local_peer_rec).await {
Ok(_) => Event::LocalPeerRecordUpdated,
Err(err) => {
tracing::error!("Failed to add local peer record to store: {}", err);
Event::Error(Error::StoreError(err.to_string()))
},
}
};
match self.pending_tasks.try_push(task) {
Ok(()) => {},
Ok(()) => {
self.pending_events.reserve(self.active_outbound_connections.len());
for (peer_id, conn_id) in &self.active_outbound_connections {
self.pending_events.push_back(ToSwarm::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(*conn_id),
event: HandlerAction::PushLocalRecord(local_rec.clone()),
});
}
},
Err(_) => {
self.pending_events.push_back(ToSwarm::GenerateEvent(Event::Error(
Error::ExceededMaxNumberOfPendingTasks,
Expand All @@ -225,17 +219,18 @@ where TPeerStore: PeerStore
fn handle_established_inbound_connection(
&mut self,
_connection_id: ConnectionId,
peer: PeerId,
peer_id: PeerId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
let handler = Handler::new(
peer,
peer_id,
self.peer_store.clone(),
self.protocol.clone(),
&self.config,
self.remaining_want_peers.clone(),
self.sync_semaphore.clone(),
None,
);
Ok(handler)
}
Expand All @@ -248,13 +243,15 @@ where TPeerStore: PeerStore
_role_override: Endpoint,
_port_use: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
tracing::debug!("outbound connection to peer {}", peer);
let handler = Handler::new(
peer,
self.peer_store.clone(),
self.protocol.clone(),
&self.config,
self.remaining_want_peers.clone(),
self.sync_semaphore.clone(),
Some(Arc::new(self.local_peer_record.clone().into())),
);
self.active_outbound_connections.insert(peer, connection_id);
Ok(handler)
Expand All @@ -264,16 +261,16 @@ where TPeerStore: PeerStore
match event {
FromSwarm::ConnectionEstablished(_) => {},
FromSwarm::ConnectionClosed(connection_closed) => self.on_connection_closed(connection_closed),
FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
FromSwarm::AddressChange(_) => {},
FromSwarm::ExternalAddrConfirmed(addr_confirmed) => {
self.on_external_addr_confirmed(addr_confirmed);
self.local_peer_record.add_address(addr_confirmed.addr.clone());
self.handle_update_local_record()
},
FromSwarm::ExternalAddrExpired(addr_expired) => {
self.local_peer_record.remove_address(addr_expired.addr);
self.handle_update_local_record();
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::LocalPeerRecordUpdated {
record: self.local_peer_record.clone(),
}));
.push_back(ToSwarm::GenerateEvent(Event::LocalPeerRecordUpdated));
},
_ => {},
}
Expand Down Expand Up @@ -302,7 +299,7 @@ where TPeerStore: PeerStore
},
Event::InboundStreamInterrupted { .. } => {},
Event::OutboundStreamInterrupted { .. } => {},
Event::ResponseStreamComplete { .. } => {},
Event::InboundRequestCompleted { .. } => {},
Event::LocalPeerRecordUpdated { .. } => {},
Event::Error(_) => {},
}
Expand All @@ -312,6 +309,18 @@ where TPeerStore: PeerStore

fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if let Some(event) = self.pending_events.pop_front() {
// if let
// ToSwarm::GenerateEvent(event) =
// &event {
// match event {
// Event::InboundFailure { peer_id, .. } => {}
// Event::OutboundFailure { peer_id, .. } => {}
// Event::InboundStreamInterrupted { peer_id, .. } => {}
// Event::OutboundStreamInterrupted { peer_id, .. } => {}
// Event::ResponseStreamComplete { peer_id, .. } => {}
// Event::Error(_) => {}
// }
// }
return Poll::Ready(event);
}
if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
Expand Down
8 changes: 3 additions & 5 deletions network/libp2p-peersync/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use libp2p::PeerId;

use crate::{error::Error, LocalPeerRecord};
use crate::error::Error;

#[derive(Debug)]
pub enum Event {
Expand All @@ -25,13 +25,11 @@ pub enum Event {
OutboundStreamInterrupted {
peer_id: PeerId,
},
ResponseStreamComplete {
InboundRequestCompleted {
peer_id: PeerId,
peers_sent: usize,
requested: usize,
},
LocalPeerRecordUpdated {
record: LocalPeerRecord,
},
LocalPeerRecordUpdated,
Error(Error),
}
Loading

0 comments on commit 481cc12

Please sign in to comment.