Skip to content

Commit

Permalink
feat: add blocklist clearing (#153)
Browse files Browse the repository at this point in the history
Adds a scheduled job to clear out block list.
Also doesn't greylist for connection closed
  • Loading branch information
stringhandler authored Nov 13, 2024
1 parent be051db commit a3a600d
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 20 deletions.
53 changes: 41 additions & 12 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,15 @@ pub struct Config {
pub squad: Squad,
pub user_agent: String,
pub grey_list_clear_interval: Duration,
pub black_list_clear_interval: Duration,
pub sync_interval: Duration,
pub is_seed_peer: bool,
pub debug_print_chain: bool,
pub num_peers_to_sync: usize,
pub max_blocks_to_request: usize,
pub sync_job_enabled: bool,
pub peer_list_folder: PathBuf,
pub sha3x_enabled: bool,
pub randomx_enabled: bool,
pub event_delay_before_start: u32,
}

impl Default for Config {
Expand All @@ -203,15 +202,14 @@ impl Default for Config {
squad: Squad::from("default".to_string()),
user_agent: "tari-p2pool".to_string(),
grey_list_clear_interval: Duration::from_secs(60),
black_list_clear_interval: Duration::from_secs(60 * 10),
sync_interval: Duration::from_secs(10),
is_seed_peer: false,
debug_print_chain: false,
num_peers_to_sync: 2,
max_blocks_to_request: 20,
sync_job_enabled: true,
sha3x_enabled: true,
randomx_enabled: true,
event_delay_before_start: 10,
}
}
}
Expand Down Expand Up @@ -1135,10 +1133,25 @@ where S: ShareChain
},
request_response::Event::OutboundFailure { peer, error, .. } => {
debug!(target: LOG_TARGET, squad = &self.config.squad; "REQ-RES outbound failure: {peer:?} -> {error:?}");
// Unlock the permit
self.network_peer_store
.move_to_grey_list(peer, format!("Error during share chain sync:{}", error))
.await;
let mut should_grey_list = true;
match error {
OutboundFailure::DialFailure => {
debug!(target: SYNC_REQUEST_LOG_TARGET, "Share chain sync request failed: {peer:?} -> {error:?}");
},
OutboundFailure::ConnectionClosed => {
// I think it might upgrade to a DCTUR so no need to grey list
debug!(target: SYNC_REQUEST_LOG_TARGET, "Share chain sync request failed: {peer:?} -> {error:?}");
should_grey_list = false;
},
_ => {
warn!(target: SYNC_REQUEST_LOG_TARGET, "Share chain sync request failed: {peer:?} -> {error:?}");
},
}
if should_grey_list {
self.network_peer_store
.move_to_grey_list(peer, format!("Error during share chain sync:{}", error))
.await;
}

// Remove peer from peer store to try to sync from another peer,
// if the peer goes online/accessible again, the peer store will have it again.
Expand Down Expand Up @@ -1718,6 +1731,8 @@ where S: ShareChain
let mut grey_list_clear_interval = tokio::time::interval(self.config.grey_list_clear_interval);
grey_list_clear_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

let mut black_list_clear_interval = tokio::time::interval(self.config.black_list_clear_interval);
black_list_clear_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut sync_interval = tokio::time::interval(self.config.sync_interval);
sync_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

Expand Down Expand Up @@ -1838,6 +1853,13 @@ where S: ShareChain
warn!(target: LOG_TARGET, "Clearing grey list took too long: {:?}", timer.elapsed());
}
},
_ = black_list_clear_interval.tick() => {
let timer = Instant::now();
self.network_peer_store.clear_black_list();
if timer.elapsed() > MAX_ACCEPTABLE_NETWORK_EVENT_TIMEOUT {
warn!(target: LOG_TARGET, "Clearing black list took too long: {:?}", timer.elapsed());
}
},
_ = connection_stats_publish.tick() => {
let timer = Instant::now();
let connection_info = self.get_libp2p_connection_info();
Expand Down Expand Up @@ -2085,9 +2107,16 @@ where S: ShareChain
// self.swarm.behaviour_mut().kademlia.add_address(peer_id, addr.clone());
self.swarm.add_peer_address(peer_id.clone(), addr.clone());
self.network_peer_store.add_seed_peer(peer_id.clone());
let _ = self.swarm.dial(peer_id.clone()).inspect_err(|e| {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Failed to dial seed peer: {e:?}");
});
let _ = self
.swarm
.dial(
DialOpts::peer_id(peer_id.clone())
.condition(PeerCondition::Always)
.build(),
)
.inspect_err(|e| {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Failed to dial seed peer: {e:?}");
});
});

// if !seed_peers.is_empty() {
Expand Down Expand Up @@ -2134,10 +2163,10 @@ where S: ShareChain
external_addr.parse()?,
);
}
self.subscribe_to_topics().await;

let seed_peers = self.parse_seed_peers().await?;
self.join_seed_peers(seed_peers).await?;
self.subscribe_to_topics().await;

// start initial share chain sync
// let in_progress = self.sync_in_progress.clone();
Expand Down
29 changes: 21 additions & 8 deletions src/server/p2p/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub enum AddPeerStatus {
pub struct PeerStore {
whitelist_peers: HashMap<String, PeerStoreRecord>,
greylist_peers: HashMap<String, PeerStoreRecord>,
blacklist_peers: HashSet<String>,
blacklist_peers: HashMap<String, PeerStoreRecord>,
stats_broadcast_client: StatsBroadcastClient,
seed_peers: Vec<PeerId>,
}
Expand All @@ -106,7 +106,7 @@ impl PeerStore {
stats_broadcast_client,
whitelist_peers: HashMap::new(),
greylist_peers: HashMap::new(),
blacklist_peers: HashSet::new(),
blacklist_peers: HashMap::new(),
seed_peers: Vec::new(),
}
}
Expand Down Expand Up @@ -140,7 +140,7 @@ impl PeerStore {
pub fn exists(&self, peer_id: &PeerId) -> bool {
self.whitelist_peers.contains_key(&peer_id.to_base58()) ||
self.greylist_peers.contains_key(&peer_id.to_base58()) ||
self.blacklist_peers.contains(&peer_id.to_base58())
self.blacklist_peers.contains_key(&peer_id.to_base58())
}

pub fn whitelist_peers(&self) -> &HashMap<String, PeerStoreRecord> {
Expand Down Expand Up @@ -236,7 +236,7 @@ impl PeerStore {
self.greylist_peers.insert(peer_id.to_base58(), peer_record);
return AddPeerStatus::Greylisted;
}
if self.blacklist_peers.contains(&peer_id.to_base58()) {
if self.blacklist_peers.contains_key(&peer_id.to_base58()) {
return AddPeerStatus::Blacklisted;
}

Expand Down Expand Up @@ -308,10 +308,10 @@ impl PeerStore {
}

pub fn clear_grey_list(&mut self) {
for (peer_id, record) in self.greylist_peers.iter() {
for (peer_id, record) in self.greylist_peers.drain() {
if record.num_grey_listings >= MAX_GREY_LISTINGS {
warn!(target: LOG_TARGET, "Blacklisting peer {} because of: {}", peer_id, record.last_grey_list_reason.as_ref().unwrap_or(&"unknown".to_string()));
self.blacklist_peers.insert(peer_id.clone());
self.blacklist_peers.insert(peer_id.clone(), record.clone());
} else {
if self.seed_peers.contains(&record.peer_id) {
// Don't put seed peers in the whitelist
Expand All @@ -320,7 +320,20 @@ impl PeerStore {
self.whitelist_peers.insert(peer_id.clone(), record.clone());
}
}
self.greylist_peers.clear();
let _ = self.stats_broadcast_client.send_new_peer(
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
);
}

pub fn clear_black_list(&mut self) {
for (peer_id, mut record) in self.blacklist_peers.drain() {
record.catch_up_attempts = 0;
record.last_grey_list_reason = None;
record.num_grey_listings = 0;
self.whitelist_peers.insert(peer_id, record);
}
let _ = self.stats_broadcast_client.send_new_peer(
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
Expand Down Expand Up @@ -389,7 +402,7 @@ impl PeerStore {
}

pub fn is_blacklisted(&self, peer_id: &PeerId) -> bool {
self.blacklist_peers.contains(&peer_id.to_base58())
self.blacklist_peers.contains_key(&peer_id.to_base58())
}

pub fn is_whitelisted(&self, peer_id: &PeerId) -> bool {
Expand Down

0 comments on commit a3a600d

Please sign in to comment.