From c875aa099ca20f7cea9c1f77a7c41ccb4bcee4f0 Mon Sep 17 00:00:00 2001 From: stringhandler Date: Mon, 30 Dec 2024 15:34:59 +0200 Subject: [PATCH] fix: dial less often --- src/server/p2p/network.rs | 92 ++++++++++++++++++++--------------- src/server/p2p/peer_store.rs | 2 +- src/server/p2p/relay_store.rs | 6 +-- src/server/p2p/setup.rs | 3 +- src/sharechain/p2chain.rs | 17 ++++--- 5 files changed, 68 insertions(+), 52 deletions(-) diff --git a/src/server/p2p/network.rs b/src/server/p2p/network.rs index f3d95c8f..166df08b 100644 --- a/src/server/p2p/network.rs +++ b/src/server/p2p/network.rs @@ -35,13 +35,7 @@ use libp2p::{ ping, relay, request_response::{self, cbor, OutboundFailure, OutboundRequestId, ResponseChannel}, - swarm::{ - behaviour::toggle::Toggle, - dial_opts::{DialOpts, PeerCondition}, - DialError, - NetworkBehaviour, - SwarmEvent, - }, + swarm::{behaviour::toggle::Toggle, dial_opts::DialOpts, DialError, NetworkBehaviour, SwarmEvent}, Multiaddr, PeerId, Swarm, @@ -756,9 +750,9 @@ where S: ShareChain match add_status { AddPeerStatus::NewPeer => { - self.initiate_direct_peer_exchange(&peer).await; - self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer); - let _unused = self.swarm.dial(peer); + // self.initiate_direct_peer_exchange(&peer).await; + // self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer); + // let _unused = self.swarm.dial(peer); return true; }, AddPeerStatus::Existing => {}, @@ -781,6 +775,7 @@ where S: ShareChain error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to create peer info: {error:?}"); }) { + dbg!("here"); let local_peer_id = *self.swarm.local_peer_id(); if peer == &local_peer_id { return; @@ -1187,7 +1182,7 @@ where S: ShareChain info!(target: LOG_TARGET, squad = &self.config.squad; "Connection established: {peer_id:?} -> {endpoint:?} ({num_established:?}/{concurrent_dial_errors:?}/{established_in:?})"); }, SwarmEvent::Dialing { peer_id, .. } => { - info!(target: LOG_TARGET, squad = &self.config.squad; "Dialing: {peer_id:?}"); + info!(target: LOG_TARGET, "Dialing: {peer_id:?}"); }, SwarmEvent::NewListenAddr { address, .. } => { info!(target: LOG_TARGET, squad = &self.config.squad; "Listening on {address:?}"); @@ -1203,6 +1198,7 @@ where S: ShareChain if !endpoint.is_dialer() { warn!(target: LOG_TARGET, squad = &self.config.squad; "Connection closed: {peer_id:?} -> {endpoint:?} ({num_established:?}) -> {cause:?}"); } + warn!(target: LOG_TARGET, squad = &self.config.squad; "Connection closed: {peer_id:?} -> {endpoint:?} ({num_established:?}) -> {cause:?}"); }, SwarmEvent::IncomingConnectionError { connection_id, @@ -1210,7 +1206,7 @@ where S: ShareChain send_back_addr, error, } => { - info!(target: LOG_TARGET, squad = &self.config.squad; "Incoming connection error: {connection_id:?} -> {local_addr:?} -> {send_back_addr:?} -> {error:?}"); + info!(target: LOG_TARGET, "Incoming connection error: {connection_id:?} -> {local_addr:?} -> {send_back_addr:?} -> {error:?}"); }, SwarmEvent::ListenerError { listener_id, error } => { error!(target: LOG_TARGET, squad = &self.config.squad; "Listener error: {listener_id:?} -> {error:?}"); @@ -1227,7 +1223,7 @@ where S: ShareChain match error { DialError::Transport(transport_error) => { // There are a lot of cancelled errors, so ignore them - debug!(target: LOG_TARGET, "Outgoing connection error, ignoring: {peer_id:?} -> {transport_error:?}"); + warn!(target: LOG_TARGET, "Outgoing connection error, ignoring: {peer_id:?} -> {transport_error:?}"); }, _ => { warn!(target: LOG_TARGET, squad = &self.config.squad; "Outgoing connection error: {peer_id:?} -> {error:?}"); @@ -1966,15 +1962,15 @@ where S: ShareChain addresses.truncate(8); // Try dial, this should already be happening though - if let Err(err) = self.swarm.dial( - DialOpts::peer_id(relay.peer_id) - .addresses(relay.addresses.clone()) - // .condition(PeerCondition::NotDialing) - .build(), - ) { - debug!(target: LOG_TARGET, "🚨 Failed to dial relay: {}", err); - // return; - } + // if let Err(err) = self.swarm.dial( + // DialOpts::peer_id(relay.peer_id) + // .addresses(relay.addresses.clone()) + // // .condition(PeerCondition::NotDialing) + // .build(), + // ) { + // debug!(target: LOG_TARGET, "🚨 Failed to dial relay: {}", err); + // // return; + // } addresses.iter().for_each(|addr| { let listen_addr = addr.clone().with(Protocol::P2pCircuit); @@ -2172,7 +2168,7 @@ where S: ShareChain let mut connection_stats_publish = tokio::time::interval(Duration::from_secs(10)); connection_stats_publish.set_missed_tick_behavior(MissedTickBehavior::Skip); - let mut seek_connections_interval = tokio::time::interval(Duration::from_secs(5)); + let mut seek_connections_interval = tokio::time::interval(Duration::from_secs(20)); seek_connections_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); let mut debug_chain_graph = if self.config.debug_print_chain { @@ -2191,6 +2187,7 @@ where S: ShareChain tokio::pin!(connection_stats_publish); tokio::pin!(seek_connections_interval); + let uptime = Instant::now(); loop { // info!(target: LOG_TARGET, "P2P service main loop iter"); select! { @@ -2220,11 +2217,13 @@ where S: ShareChain let info = self.swarm.network_info(); let counters = info.connection_counters(); - let num_connections = counters.num_established_incoming() + counters.num_established_outgoing(); - if num_connections > 20 { + // let num_connections = counters.num_established_incoming() + counters.num_established_outgoing(); + let num_connections = counters.num_established_outgoing(); + if num_connections > 8 { continue; } - if num_connections == 0 { + if num_connections == 0 && uptime.elapsed() < Duration::from_secs(60) { + match self.dial_seed_peers().await { Ok(_) => {}, Err(e) => { @@ -2236,26 +2235,44 @@ where S: ShareChain let mut num_dialed = 0; let mut store_write_lock = self.network_peer_store.write().await; - // Rather try and search good peers rather than randomly dialing - // 1000 peers will take a long time to get through + + let mut peers_to_dial = vec![]; for record in store_write_lock.best_peers_to_dial(100) { - debug!(target: LOG_TARGET, "Dialing peer: {:?} with height(rx/sha) {}/{}", record.peer_id, record.peer_info.current_random_x_height, record.peer_info.current_sha3x_height); // dbg!(&record.peer_id); // Only dial seed peers if we have 0 connections if !self.swarm.is_connected(&record.peer_id) && !store_write_lock.is_seed_peer(&record.peer_id) { + // if &record.peer_id.to_string() != "12D3KooWD6GY3c8cz6AwKaDaqmqGCbmewhjKT5ULN9JUB5oUgWjS" { + // store_write_lock.update_last_dial_attempt(&record.peer_id); + // info!(target: LOG_TARGET, "Skipping dialing peer: {:?} with height(rx/sha) {}/{} on {}", record.peer_id, record.peer_info.current_random_x_height, record.peer_info.current_sha3x_height, record.peer_info.public_addresses().iter().map(|a| a.to_string()).collect::>().join(", ")); + // continue; + // } store_write_lock.update_last_dial_attempt(&record.peer_id); - let dial_opts= DialOpts::peer_id(record.peer_id).condition(PeerCondition::Always).addresses(record.peer_info.public_addresses().clone()).extend_addresses_through_behaviour().build(); - let _unused = self.swarm.dial(dial_opts); + info!(target: LOG_TARGET, "Dialing peer: {:?} with height(rx/sha) {}/{} on {}", record.peer_id, record.peer_info.current_random_x_height, record.peer_info.current_sha3x_height, record.peer_info.public_addresses().iter().map(|a| a.to_string()).collect::>().join(", ")); + let dial_opts= DialOpts::peer_id(record.peer_id).addresses(record.peer_info.public_addresses().clone()).extend_addresses_through_behaviour().build(); + // let dial_opts= DialOpts::peer_id(record.peer_id).addresses(vec!["/ip4/152.228.210.16/tcp/19001/p2p/12D3KooWD6GY3c8cz6AwKaDaqmqGCbmewhjKT5ULN9JUB5oUgWjS".parse().unwrap(), "/ip4/152.228.210.16/udp/19001/quic-v1/p2p/12D3KooWD6GY3c8cz6AwKaDaqmqGCbmewhjKT5ULN9JUB5oUgWjS".parse().unwrap()]).build(); + // let dial_opts = DialOpts::unknown_peer_id().address("/ip4/152.228.210.16/tcp/19001/p2p/12D3KooWD6GY3c8cz6AwKaDaqmqGCbmewhjKT5ULN9JUB5oUgWjS".parse().unwrap()).build(); + let _unused = self.swarm.dial(dial_opts).map_err(|e| { + warn!(target: LOG_TARGET, "Failed to dial peer: {e:?}"); + }); + // self.initiate_direct_peer_exchange(&record.peer_id).await; + peers_to_dial.push(record.peer_id); num_dialed += 1; // We can only do 30 connections // after 30 it starts cancelling dials - if num_dialed > 80 { + if num_dialed > 10 { break; } } + + } - } + drop(store_write_lock); + // for peer in peers_to_dial { + // dbg!("trying"); + // self.initiate_direct_peer_exchange(&peer).await; + // } + } if timer.elapsed() > MAX_ACCEPTABLE_NETWORK_EVENT_TIMEOUT { warn!(target: LOG_TARGET, "Seeking connections took too long: {:?}", timer.elapsed()); } @@ -2514,12 +2531,9 @@ where S: ShareChain // self.swarm.behaviour_mut().kademlia.add_address(peer_id, addr.clone()); self.swarm.add_peer_address(*peer_id, addr.clone()); peers_to_add.push(*peer_id); - let _unused = self - .swarm - .dial(DialOpts::peer_id(*peer_id).condition(PeerCondition::Always).build()) - .inspect_err(|e| { - warn!(target: LOG_TARGET, squad = &self.config.squad; "Failed to dial seed peer: {e:?}"); - }); + let _unused = self.swarm.dial(DialOpts::peer_id(*peer_id).build()).inspect_err(|e| { + warn!(target: LOG_TARGET, squad = &self.config.squad; "Failed to dial seed peer: {e:?}"); + }); }); self.network_peer_store.write().await.add_seed_peers(peers_to_add); diff --git a/src/server/p2p/peer_store.rs b/src/server/p2p/peer_store.rs index e3d84670..7b30d410 100644 --- a/src/server/p2p/peer_store.rs +++ b/src/server/p2p/peer_store.rs @@ -229,7 +229,7 @@ impl PeerStore { let mut peers = self.whitelist_peers.values().collect::>(); peers.retain(|peer| { !peer.peer_info.public_addresses().is_empty() && - (peer.last_dial_attempt.is_none() || peer.last_dial_attempt.unwrap().elapsed().as_secs() > 60) + (peer.last_dial_attempt.is_none() || peer.last_dial_attempt.unwrap().elapsed().as_secs() > 120) }); peers.sort_by(|a, b| { b.num_grey_listings diff --git a/src/server/p2p/relay_store.rs b/src/server/p2p/relay_store.rs index 77514055..2f50310b 100644 --- a/src/server/p2p/relay_store.rs +++ b/src/server/p2p/relay_store.rs @@ -22,11 +22,11 @@ impl RelayStore { } pub fn select_random_relay(&mut self) { - let Some((peer, addrs)) = self.possible_relays.iter().choose(&mut rand::thread_rng()) else { + let Some((_peer, addrs)) = self.possible_relays.iter().choose(&mut rand::thread_rng()) else { return; }; self.selected_relay = Some(RelayPeer { - peer_id: *peer, + // peer_id: *peer, addresses: addrs.iter().cloned().collect(), is_circuit_established: false, }); @@ -35,7 +35,7 @@ impl RelayStore { #[derive(Debug, Clone)] pub struct RelayPeer { - pub peer_id: PeerId, + // pub peer_id: PeerId, pub addresses: Vec, pub is_circuit_established: bool, } diff --git a/src/server/p2p/setup.rs b/src/server/p2p/setup.rs index 950112d1..4b5bb37b 100644 --- a/src/server/p2p/setup.rs +++ b/src/server/p2p/setup.rs @@ -81,8 +81,7 @@ pub(crate) async fn new_swarm(config: &config::Config) -> Result P2Chain { let mut new_chain = Self::new_empty(algo, total_size, share_window, block_time, new_block_cache); for block in from_block_cache.all_blocks()? { info!(target: LOG_TARGET, "Loading block {}({:x}{:x}{:x}{:x}) into chain", block.height, block.hash[0], block.hash[1], block.hash[2], block.hash[3]); - new_chain.add_block_to_chain(block)?; + let _ = new_chain.add_block_to_chain(block).inspect_err(|e| { + error!(target: LOG_TARGET, "Failed to load block into chain: {}", e); + }); } Ok(new_chain) } @@ -466,12 +468,13 @@ impl P2Chain { let nextblock = parent_level.get_header(¤t_block.prev_hash); if nextblock.is_none() { error!(target: LOG_TARGET, "FATAL: Reorging (block in chain) failed because parent block was not found and chain data is corrupted."); - panic!( - "FATAL: Reorging (block in chain) failed because parent block was not found and chain \ - data is corrupted. current_block: {:?}, current tip: {:?}", - current_block.height, - self.get_tip().map(|t| t.height()) - ); + return Err(ShareChainError::BlockNotFound); + // panic!( + // "FATAL: Reorging (block in chain) failed because parent block was not found and chain + // \ data is corrupted. current_block: {:?}, current tip: + // {:?}", current_block.height, + // self.get_tip().map(|t| t.height()) + // ); } // fix the main chain let mut_parent_level = self.level_at_height(current_block.height.saturating_sub(1)).unwrap();