Skip to content

Commit 463a682

Browse files
fix: ignore old peer messages (#132)
1 parent 4b3cbd7 commit 463a682

File tree

3 files changed

+149
-50
lines changed

3 files changed

+149
-50
lines changed

log4rs_sample.yml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,38 @@ appenders:
119119
encoder:
120120
pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} {l:5} {m} // {f}:{L}{n}"
121121

122+
peer_info:
123+
kind: rolling_file
124+
path: "{{log_dir}}/log/peer_info.log"
125+
policy:
126+
kind: compound
127+
trigger:
128+
kind: size
129+
limit: 10mb
130+
roller:
131+
kind: fixed_window
132+
base: 1
133+
count: 5
134+
pattern: "{{log_dir}}/log/peer_info.{}.log"
135+
encoder:
136+
pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} {l:5} {m} // {f}:{L}{n}"
137+
138+
new_tip_notify:
139+
kind: rolling_file
140+
path: "{{log_dir}}/log/new_tip_notify.log"
141+
policy:
142+
kind: compound
143+
trigger:
144+
kind: size
145+
limit: 10mb
146+
roller:
147+
kind: fixed_window
148+
base: 1
149+
count: 5
150+
pattern: "{{log_dir}}/log/new_tip_notify.{}.log"
151+
encoder:
152+
pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} {l:5} {m} // {f}:{L}{n}"
153+
122154
# Set the default logging level to "warn" and attach the "stdout" appender to the root
123155
root:
124156
level: info
@@ -145,6 +177,16 @@ loggers:
145177
- stdout
146178
- peers
147179
additive: false
180+
tari::p2pool::topics::peer_info:
181+
level: debug
182+
appenders:
183+
- peer_info
184+
additive: false
185+
tari::p2pool::topics::new_tip_notify:
186+
level: debug
187+
appenders:
188+
- new_tip_notify
189+
additive: false
148190
tari::p2pool::server::p2p:
149191
level: info
150192
appenders:

src/server/p2p/messages.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ where T: Serialize {
4747
#[derive(Serialize, Deserialize, Debug, Clone)]
4848
pub struct PeerInfo {
4949
pub version: u64,
50+
pub peer_id: Option<PeerId>,
5051
pub current_sha3x_height: u64,
5152
pub current_random_x_height: u64,
5253
pub current_sha3x_pow: u128,
@@ -60,6 +61,7 @@ pub struct PeerInfo {
6061
impl_conversions!(PeerInfo);
6162
impl PeerInfo {
6263
pub fn new(
64+
peer_id: PeerId,
6365
current_sha3x_height: u64,
6466
current_random_x_height: u64,
6567
current_sha3x_pow: u128,
@@ -71,6 +73,7 @@ impl PeerInfo {
7173
let timestamp = EpochTime::now();
7274
Self {
7375
version: PROTOCOL_VERSION,
76+
peer_id: Some(peer_id),
7477
current_sha3x_height,
7578
current_random_x_height,
7679
current_sha3x_pow,

src/server/p2p/network.rs

Lines changed: 104 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,16 @@ const CATCH_UP_SYNC_REQUEST_RESPONSE_PROTOCOL: &str = "/catch_up_sync/4";
105105
const LOG_TARGET: &str = "tari::p2pool::server::p2p";
106106
const SYNC_REQUEST_LOG_TARGET: &str = "sync_request";
107107
const MESSAGE_LOGGING_LOG_TARGET: &str = "tari::p2pool::message_logging";
108+
const PEER_INFO_LOGGING_LOG_TARGET: &str = "tari::p2pool::topics::peer_info";
109+
const NEW_TIP_NOTIFY_LOGGING_LOG_TARGET: &str = "tari::p2pool::topics::new_tip_notify";
108110
pub const STABLE_PRIVATE_KEY_FILE: &str = "p2pool_private.key";
109111

110112
const MAX_ACCEPTABLE_P2P_MESSAGE_TIMEOUT: Duration = Duration::from_millis(500);
111-
const SYNC_TIMEOUT: Duration = Duration::from_secs(60);
112113
const MAX_ACCEPTABLE_NETWORK_EVENT_TIMEOUT: Duration = Duration::from_millis(100);
113114
const CATCH_UP_SYNC_BLOCKS_IN_I_HAVE: usize = 100;
114115
const MAX_CATCH_UP_ATTEMPTS: usize = 150;
115116
// Time to start up and catch up before we start processing new tip messages
116-
const STARTUP_CATCH_UP_TIME: Duration = Duration::from_secs(120);
117+
const STARTUP_CATCH_UP_TIME: Duration = Duration::from_secs(1);
117118

118119
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
119120
pub struct Squad {
@@ -415,14 +416,14 @@ where S: ShareChain
415416
StreamProtocol::new(SHARE_CHAIN_SYNC_REQ_RESP_PROTOCOL),
416417
request_response::ProtocolSupport::Full,
417418
)],
418-
request_response::Config::default().with_request_timeout(Duration::from_secs(30)), // 10 is the default
419+
request_response::Config::default().with_request_timeout(Duration::from_secs(10)), // 10 is the default
419420
),
420421
direct_peer_exchange: cbor::Behaviour::<DirectPeerInfoRequest, DirectPeerInfoResponse>::new(
421422
[(
422423
StreamProtocol::new(DIRECT_PEER_EXCHANGE_REQ_RESP_PROTOCOL),
423424
request_response::ProtocolSupport::Full,
424425
)],
425-
request_response::Config::default().with_request_timeout(Duration::from_secs(30)), // 10 is the default
426+
request_response::Config::default().with_request_timeout(Duration::from_secs(10)), // 10 is the default
426427
),
427428
catch_up_sync: cbor::Behaviour::<CatchUpSyncRequest, CatchUpSyncResponse>::new(
428429
[(
@@ -499,6 +500,7 @@ where S: ShareChain
499500
let current_pow_sha3x = share_chain_sha3x.chain_pow().await.as_u128();
500501
let current_pow_random_x = share_chain_random_x.chain_pow().await.as_u128();
501502
let peer_info_squad_raw = PeerInfo::new(
503+
self.swarm.local_peer_id().clone(),
502504
current_height_sha3x,
503505
current_height_random_x,
504506
current_pow_sha3x,
@@ -596,47 +598,74 @@ where S: ShareChain
596598
#[allow(clippy::too_many_lines)]
597599
async fn handle_new_gossipsub_message(&mut self, message: Message) -> Result<MessageAcceptance, Error> {
598600
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "New gossipsub message: {message:?}");
599-
let peer = message.source;
600-
if let Some(peer) = peer {
601+
let source_peer = message.source;
602+
if let Some(source_peer) = source_peer {
601603
let topic = message.topic.to_string();
602604
match topic {
603-
topic if topic == Self::network_topic(PEER_INFO_TOPIC) => match messages::PeerInfo::try_from(message) {
604-
Ok(payload) => {
605-
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[PEERINFO_TOPIC] New peer info: {peer:?} -> {payload:?}");
606-
debug!(target: LOG_TARGET, squad = &self.config.squad; "[NETWORK] New peer info: {peer:?} -> {payload:?}");
607-
if payload.version != PROTOCOL_VERSION {
608-
trace!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", peer);
609-
return Ok(MessageAcceptance::Reject);
610-
}
611-
if payload.squad != self.config.squad.as_string() {
612-
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} is not in the same squad, skipping. Our squad: {}, their squad:{}", peer, self.config.squad, payload.squad);
613-
return Ok(MessageAcceptance::Ignore);
614-
}
615-
if !self.config.is_seed_peer {
616-
if self.add_peer(payload, peer.clone()).await {
617-
self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer);
618-
}
619-
}
620-
return Ok(MessageAcceptance::Accept);
621-
},
622-
Err(error) => {
623-
debug!(target: LOG_TARGET, squad = &self.config.squad; "Can't deserialize peer info payload: {:?}", error);
624-
return Ok(MessageAcceptance::Reject);
625-
},
626-
},
605+
// topic if topic == Self::network_topic(PEER_INFO_TOPIC) => match messages::PeerInfo::try_from(message)
606+
// { Ok(payload) => {
607+
// debug!(target: PEER_INFO_LOGGING_LOG_TARGET, "[PEERINFO_TOPIC] New peer info: {source_peer:?}
608+
// -> {payload:?}"); debug!(target: LOG_TARGET, squad = &self.config.squad;
609+
// "[NETWORK] New peer info: {source_peer:?} -> {payload:?}"); if
610+
// payload.version != PROTOCOL_VERSION { trace!(target: LOG_TARGET, squad =
611+
// &self.config.squad; "Peer {} has an outdated version, skipping", source_peer);
612+
// return Ok(MessageAcceptance::Reject);
613+
// }
614+
// if payload.squad != self.config.squad.as_string() {
615+
// debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} is not in the same squad,
616+
// skipping. Our squad: {}, their squad:{}", source_peer, self.config.squad, payload.squad);
617+
// return Ok(MessageAcceptance::Ignore);
618+
// }
619+
620+
// if payload.peer_id.as_ref() == Some(self.swarm.local_peer_id()) {
621+
// return Ok(MessageAcceptance::Ignore);
622+
// }
623+
624+
// if payload.peer_id != Some(source_peer) {
625+
// warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a peer info message
626+
// with a different peer id: {}, skipping", source_peer, payload.peer_id.as_ref().map(|p|
627+
// p.to_string()).unwrap_or("None".to_string())); // return
628+
// Ok(MessageAcceptance::Ignore); }
629+
// if !self.config.is_seed_peer {
630+
// if self.add_peer(payload, source_peer.clone()).await {
631+
// self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&source_peer);
632+
// }
633+
// }
634+
// return Ok(MessageAcceptance::Accept);
635+
// },
636+
// Err(error) => {
637+
// debug!(target: LOG_TARGET, squad = &self.config.squad; "Can't deserialize peer info payload:
638+
// {:?}", error); return Ok(MessageAcceptance::Reject);
639+
// },
640+
// },
627641
topic if topic == Self::squad_topic(&self.config.squad, PEER_INFO_TOPIC) => {
628642
match messages::PeerInfo::try_from(message) {
629643
Ok(payload) => {
630-
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[SQUAD_PEERINFO_TOPIC] New peer info: {peer:?} -> {payload:?}");
631-
632-
debug!(target: LOG_TARGET, squad = &self.config.squad; "[squad] New peer info: {peer:?} -> {payload:?}");
644+
debug!(target: LOG_TARGET, squad = &self.config.squad; "[squad] New peer info: {source_peer:?} -> {payload:?}");
633645
if payload.version != PROTOCOL_VERSION {
634-
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", peer);
646+
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", source_peer);
635647
return Ok(MessageAcceptance::Reject);
636648
}
649+
650+
// 60 seconds. TODO: make config
651+
if payload.timestamp < EpochTime::now().as_u64().saturating_sub(60) {
652+
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a peer info message that is too old, skipping", source_peer);
653+
// TODO: should be punish
654+
return Ok(MessageAcceptance::Ignore);
655+
}
656+
if payload.peer_id.as_ref() == Some(self.swarm.local_peer_id()) {
657+
return Ok(MessageAcceptance::Ignore);
658+
}
659+
660+
if payload.peer_id != Some(source_peer) {
661+
warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a peer info message with a different peer id: {}, skipping", source_peer, payload.peer_id.as_ref().map(|p| p.to_string()).unwrap_or("None".to_string()));
662+
// return Ok(MessageAcceptance::Ignore);
663+
}
664+
debug!(target: PEER_INFO_LOGGING_LOG_TARGET, "[SQUAD_PEERINFO_TOPIC] New peer info: {source_peer:?} -> {payload:?}");
665+
637666
if !self.config.is_seed_peer {
638-
if self.add_peer(payload, peer.clone()).await {
639-
self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer);
667+
if self.add_peer(payload, source_peer.clone()).await {
668+
self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&source_peer);
640669
}
641670
}
642671
return Ok(MessageAcceptance::Accept);
@@ -651,33 +680,35 @@ where S: ShareChain
651680
// TODO: (sender peer's wallet address should be included always in the conibases with a fixed percent
652681
// (like 20%))
653682
topic if topic == Self::squad_topic(&self.config.squad, BLOCK_NOTIFY_TOPIC) => {
654-
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[SQUAD_NEW_BLOCK_TOPIC] New block from gossip: {peer:?}");
655-
656683
// if self.sync_in_progress.load(Ordering::SeqCst) {
657684
// return;
658685
// }
659686
match NotifyNewTipBlock::try_from(message) {
660687
Ok(payload) => {
661688
if payload.version != PROTOCOL_VERSION {
662-
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", peer);
689+
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", source_peer);
663690
return Ok(MessageAcceptance::Reject);
664691
}
665692
// lets check age
666-
if payload.timestamp < EpochTime::now().as_u64().saturating_sub(60) {
667-
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a notify message that is too old, skipping", peer);
693+
if payload.timestamp < EpochTime::now().as_u64().saturating_sub(10) {
694+
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a notify message that is too old, skipping", source_peer);
668695
return Ok(MessageAcceptance::Ignore);
669696
}
670697
let payload = Arc::new(payload);
671-
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "[SQUAD_NEW_BLOCK_TOPIC] New block from gossip: {peer:?} -> {payload:?}");
672-
let source_peer = payload.peer_id();
698+
let message_peer = payload.peer_id();
699+
if message_peer.to_string() != source_peer.to_string() {
700+
warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block with a different peer id: {}, skipping", source_peer, message_peer);
701+
}
702+
debug!(target: NEW_TIP_NOTIFY_LOGGING_LOG_TARGET, "[SQUAD_NEW_BLOCK_TOPIC] New block from gossip: {source_peer:?} -> {payload:?}");
703+
673704
// If we don't have this peer, try do peer exchange
674-
if !self.network_peer_store.exists(source_peer) {
675-
self.initiate_direct_peer_exchange(source_peer).await;
705+
if !self.network_peer_store.exists(message_peer) {
706+
self.initiate_direct_peer_exchange(message_peer).await;
676707
}
677708

678709
// verify payload
679710
if payload.new_blocks.is_empty() {
680-
warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent notify new tip with no blocks.", source_peer);
711+
warn!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent notify new tip with no blocks.", message_peer);
681712
return Ok(MessageAcceptance::Reject);
682713
}
683714

@@ -694,7 +725,14 @@ where S: ShareChain
694725
payload.new_blocks.iter().map(|(h, _)| *h).max().unwrap_or(0) <=
695726
our_tip.saturating_sub(4)
696727
{
697-
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block that is not better than ours, skipping", source_peer);
728+
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block that is not better than ours, skipping", message_peer);
729+
return Ok(MessageAcceptance::Ignore);
730+
}
731+
732+
// if payload.
733+
if payload.new_blocks.iter().map(|(h, _)| *h).max().unwrap_or(0) > our_tip.saturating_add(2)
734+
{
735+
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} sent a block that is too far ahead, skipping", message_peer);
698736
return Ok(MessageAcceptance::Ignore);
699737
}
700738

@@ -706,7 +744,7 @@ where S: ShareChain
706744
missing_blocks.push(block.clone());
707745
}
708746
if !missing_blocks.is_empty() {
709-
self.sync_share_chain(algo, source_peer, missing_blocks, true).await;
747+
self.sync_share_chain(algo, message_peer, missing_blocks, true).await;
710748
}
711749
return Ok(MessageAcceptance::Accept);
712750
},
@@ -715,7 +753,7 @@ where S: ShareChain
715753
debug!(target: LOG_TARGET, squad = &self.config.squad; "Can't deserialize broadcast block payload: {:?}", error);
716754
self.network_peer_store
717755
.move_to_grey_list(
718-
peer,
756+
source_peer.clone(),
719757
format!("Node sent a block that could not be deserialized: {:?}", error),
720758
)
721759
.await;
@@ -773,6 +811,10 @@ where S: ShareChain
773811
})
774812
{
775813
let local_peer_id = self.swarm.local_peer_id().clone();
814+
if peer == &local_peer_id {
815+
return;
816+
}
817+
776818
// TODO: Should we send them our details? The problem is that if we send too many of these, libp2p
777819
// starts dropping requests with "libp2p_relay::priv_client::handler Dropping in-flight connect
778820
// request because we are at capacity"
@@ -1687,14 +1729,26 @@ where S: ShareChain
16871729

16881730
// dbg!(&missing_blocks);
16891731

1690-
let best_peers = self
1732+
let mut best_peers = self
16911733
.network_peer_store
16921734
.best_peers_to_sync(self.config.num_peers_to_sync, *algo);
16931735
let our_pow = match algo {
16941736
PowAlgorithm::RandomX => self.share_chain_random_x.chain_pow().await,
16951737
PowAlgorithm::Sha3x => self.share_chain_sha3x.chain_pow().await,
16961738
};
16971739

1740+
let min_catch_up_difficulty = 1000;
1741+
best_peers.retain(|x| match algo {
1742+
PowAlgorithm::RandomX => {
1743+
let their_pow = x.peer_info.current_random_x_pow;
1744+
their_pow > min_catch_up_difficulty
1745+
},
1746+
PowAlgorithm::Sha3x => {
1747+
let their_pow = x.peer_info.current_sha3x_pow;
1748+
their_pow > min_catch_up_difficulty
1749+
},
1750+
});
1751+
16981752
// info!(target: LOG_TARGET, squad = &self.config.squad; "Best peers to sync: {best_peers:?}");
16991753
if best_peers.is_empty() {
17001754
info!(target: LOG_TARGET, squad = &self.config.squad; "No peers found to try and sync to");

0 commit comments

Comments
 (0)