Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(peersync)!: push local peer record to connected peer on change #6687

Merged
merged 7 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions network/core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -861,14 +861,11 @@ where
Autonat(event) => {
self.on_autonat_event(event)?;
},
PeerSync(peersync::Event::LocalPeerRecordUpdated { record }) => {
info!(target: LOG_TARGET, "🧑‍🧑‍🧒‍🧒 Local peer record updated: {:?}",record);
},
PeerSync(peersync::Event::PeerBatchReceived { new_peers, from_peer }) => {
info!(target: LOG_TARGET, "🧑‍🧑‍🧒‍🧒 Peer batch received: from_peer={}, new_peers={}", from_peer, new_peers);
debug!(target: LOG_TARGET, "🧑‍🧑‍🧒‍🧒 Peer batch received: from_peer={}, new_peers={}", from_peer, new_peers);
},
PeerSync(event) => {
info!(target: LOG_TARGET, "ℹ️ PeerSync event: {:?}", event);
debug!(target: LOG_TARGET, "ℹ️ PeerSync event: {:?}", event);
},
Kad(kad::Event::OutboundQueryProgressed {
id,
Expand Down
35 changes: 21 additions & 14 deletions network/libp2p-peersync/proto/messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,37 @@

syntax = "proto3";

message Message {
oneof payload {
SignedPeerRecord LocalRecord = 1;
WantPeers WantPeers = 2;
}
}

// A want request for peers
message WantPeers {
// Requested peers
repeated bytes want_peer_ids = 1;
// Requested peers
repeated bytes want_peer_ids = 1;
}

// Response to a want request. This response is streamed back to the requester.
message WantPeerResponse {
// A peer that was requested.
SignedPeerRecord peer = 1;
// A peer that was requested.
SignedPeerRecord peer = 1;
}

message SignedPeerRecord {
// The addresses of the peer
repeated bytes addresses = 1;
// The Unix epoch based timestamp when this peer record was signed
uint64 ts_updated_at = 2;
// The signature that signs the peer record (addresses | ts_updated_at)
PeerSignature signature = 3;
// The addresses of the peer
repeated bytes addresses = 1;
// The Unix epoch based timestamp when this peer record was signed
uint64 ts_updated_at = 2;
// The signature that signs the peer record (addresses | ts_updated_at)
PeerSignature signature = 3;
}

message PeerSignature {
// The public key of the peer
bytes public_key = 1;
// The signature that signs the peer record (addresses | ts_updated_at)
bytes signature = 2;
// The public key of the peer
bytes public_key = 1;
// The signature that signs the peer record (addresses | ts_updated_at)
bytes signature = 2;
}
81 changes: 45 additions & 36 deletions network/libp2p-peersync/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ use libp2p::{
futures::executor::block_on,
identity::Keypair,
swarm::{
behaviour::ExternalAddrConfirmed,
AddressChange,
ConnectionClosed,
ConnectionDenied,
ConnectionId,
Expand All @@ -34,7 +32,7 @@ use libp2p::{
use crate::{
error::Error,
event::Event,
handler::Handler,
handler::{Handler, HandlerAction},
store::PeerStore,
Config,
LocalPeerRecord,
Expand Down Expand Up @@ -72,7 +70,7 @@ where TPeerStore: PeerStore
Self::with_custom_protocol(keypair, DEFAULT_PROTOCOL_NAME, store, config)
}

pub fn with_custom_protocol(
fn with_custom_protocol(
keypair: Keypair,
protocol: StreamProtocol,
peer_store: TPeerStore,
Expand All @@ -88,7 +86,7 @@ where TPeerStore: PeerStore
active_outbound_connections: HashMap::new(),
remaining_want_peers: HashSet::new(),
pending_syncs: VecDeque::new(),
pending_tasks: futures_bounded::FuturesSet::new(Duration::from_secs(1000), 1024),
pending_tasks: futures_bounded::FuturesSet::new(Duration::from_secs(30), 1024),
sync_semaphore: Arc::new(async_semaphore::Semaphore::new(1)),
}
}
Expand All @@ -106,6 +104,10 @@ where TPeerStore: PeerStore
.map_err(|e| Error::StoreError(e.to_string()))
}

pub fn local_peer_record(&self) -> &LocalPeerRecord {
&self.local_peer_record
}

pub fn add_known_local_public_addresses(&mut self, addrs: Vec<Multiaddr>) {
if addrs.is_empty() {
return;
Expand Down Expand Up @@ -151,7 +153,7 @@ where TPeerStore: PeerStore
self.pending_events.push_back(ToSwarm::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(*conn_id),
event: list.clone(),
event: HandlerAction::WantPeers(list.clone()),
});
}
}
Expand All @@ -174,39 +176,31 @@ where TPeerStore: PeerStore
}
}

fn on_address_change(&mut self, _address_change: AddressChange) {}

fn on_external_addr_confirmed(&mut self, addr_confirmed: ExternalAddrConfirmed) {
self.local_peer_record.add_address(addr_confirmed.addr.clone());
self.handle_update_local_record()
}

fn handle_update_local_record(&mut self) {
let store = self.peer_store.clone();
let local_peer_record = self.local_peer_record.clone();
if !local_peer_record.is_signed() {
return;
}
let peer_rec = match local_peer_record.clone().try_into() {
Ok(peer_rec) => peer_rec,
Err(err) => {
tracing::error!("Failed to convert local peer record to signed peer record: {}", err);
return;
},
};
let local_peer_rec = SignedPeerRecord::from(local_peer_record);
let local_rec = Arc::new(local_peer_rec.clone());
let task = async move {
match store.put(peer_rec).await {
Ok(_) => Event::LocalPeerRecordUpdated {
record: local_peer_record,
},
match store.put(local_peer_rec).await {
Ok(_) => Event::LocalPeerRecordUpdated,
Err(err) => {
tracing::error!("Failed to add local peer record to store: {}", err);
Event::Error(Error::StoreError(err.to_string()))
},
}
};
match self.pending_tasks.try_push(task) {
Ok(()) => {},
Ok(()) => {
self.pending_events.reserve(self.active_outbound_connections.len());
for (peer_id, conn_id) in &self.active_outbound_connections {
self.pending_events.push_back(ToSwarm::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(*conn_id),
event: HandlerAction::PushLocalRecord(local_rec.clone()),
});
}
},
Err(_) => {
self.pending_events.push_back(ToSwarm::GenerateEvent(Event::Error(
Error::ExceededMaxNumberOfPendingTasks,
Expand All @@ -225,17 +219,18 @@ where TPeerStore: PeerStore
fn handle_established_inbound_connection(
&mut self,
_connection_id: ConnectionId,
peer: PeerId,
peer_id: PeerId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
let handler = Handler::new(
peer,
peer_id,
self.peer_store.clone(),
self.protocol.clone(),
&self.config,
self.remaining_want_peers.clone(),
self.sync_semaphore.clone(),
None,
);
Ok(handler)
}
Expand All @@ -248,13 +243,15 @@ where TPeerStore: PeerStore
_role_override: Endpoint,
_port_use: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
tracing::debug!("outbound connection to peer {}", peer);
let handler = Handler::new(
peer,
self.peer_store.clone(),
self.protocol.clone(),
&self.config,
self.remaining_want_peers.clone(),
self.sync_semaphore.clone(),
Some(Arc::new(self.local_peer_record.clone().into())),
);
self.active_outbound_connections.insert(peer, connection_id);
Ok(handler)
Expand All @@ -264,16 +261,16 @@ where TPeerStore: PeerStore
match event {
FromSwarm::ConnectionEstablished(_) => {},
FromSwarm::ConnectionClosed(connection_closed) => self.on_connection_closed(connection_closed),
FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
FromSwarm::AddressChange(_) => {},
FromSwarm::ExternalAddrConfirmed(addr_confirmed) => {
self.on_external_addr_confirmed(addr_confirmed);
self.local_peer_record.add_address(addr_confirmed.addr.clone());
self.handle_update_local_record()
},
FromSwarm::ExternalAddrExpired(addr_expired) => {
self.local_peer_record.remove_address(addr_expired.addr);
self.handle_update_local_record();
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::LocalPeerRecordUpdated {
record: self.local_peer_record.clone(),
}));
.push_back(ToSwarm::GenerateEvent(Event::LocalPeerRecordUpdated));
},
_ => {},
}
Expand Down Expand Up @@ -302,7 +299,7 @@ where TPeerStore: PeerStore
},
Event::InboundStreamInterrupted { .. } => {},
Event::OutboundStreamInterrupted { .. } => {},
Event::ResponseStreamComplete { .. } => {},
Event::InboundRequestCompleted { .. } => {},
Event::LocalPeerRecordUpdated { .. } => {},
Event::Error(_) => {},
}
Expand All @@ -312,6 +309,18 @@ where TPeerStore: PeerStore

fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if let Some(event) = self.pending_events.pop_front() {
// if let
// ToSwarm::GenerateEvent(event) =
// &event {
// match event {
// Event::InboundFailure { peer_id, .. } => {}
// Event::OutboundFailure { peer_id, .. } => {}
// Event::InboundStreamInterrupted { peer_id, .. } => {}
// Event::OutboundStreamInterrupted { peer_id, .. } => {}
// Event::ResponseStreamComplete { peer_id, .. } => {}
// Event::Error(_) => {}
// }
// }
return Poll::Ready(event);
}
if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
Expand Down
8 changes: 3 additions & 5 deletions network/libp2p-peersync/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use libp2p::PeerId;

use crate::{error::Error, LocalPeerRecord};
use crate::error::Error;

#[derive(Debug)]
pub enum Event {
Expand All @@ -25,13 +25,11 @@ pub enum Event {
OutboundStreamInterrupted {
peer_id: PeerId,
},
ResponseStreamComplete {
InboundRequestCompleted {
peer_id: PeerId,
peers_sent: usize,
requested: usize,
},
LocalPeerRecordUpdated {
record: LocalPeerRecord,
},
LocalPeerRecordUpdated,
Error(Error),
}
Loading
Loading