From cc26e2b75ebc03bfc7871980889f5a95902fcc7e Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Sat, 4 Jan 2025 12:32:37 +0200 Subject: [PATCH] . --- .../src/peer_manager/network_state/mod.rs | 92 +++++++++++++++++++ .../src/peer_manager/peer_manager_actor.rs | 8 +- 2 files changed, 98 insertions(+), 2 deletions(-) diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index 90370ff2816..cda45d4d4b9 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -35,6 +35,7 @@ use crate::types::{ ChainInfo, PeerManagerSenderForNetwork, PeerType, ReasonForBan, StatePartRequestBody, Tier3Request, Tier3RequestBody, }; +use actix::ArbiterHandle; use anyhow::Context; use arc_swap::ArcSwap; use near_async::messaging::{CanSend, SendAsync, Sender}; @@ -698,6 +699,97 @@ impl NetworkState { success } + /// Send message to specific account. + /// Return whether the message is sent or not. + /// The message might be sent over TIER1 or TIER2 connection depending on the message type. + pub fn send_message_to_account_with_arbiter( + self: &Arc, + clock: &time::Clock, + account_id: &AccountId, + msg: RoutedMessageBody, + arbiter: ArbiterHandle, + ) -> bool { + // If the message is allowed to be sent to self, we handle it directly. + if self.config.validator.account_id().is_some_and(|id| &id == account_id) { + // For now, we don't allow some types of messages to be sent to self. + debug_assert!(msg.allow_sending_to_self()); + let this = self.clone(); + let clock = clock.clone(); + let peer_id = self.config.node_id(); + let msg = self.sign_message( + &clock, + RawRoutedMessage { target: PeerIdOrHash::PeerId(peer_id.clone()), body: msg }, + ); + arbiter.spawn(async move { + this.receive_routed_message(&clock, peer_id, msg.hash(), msg.msg.body).await; + }); + return true; + } + + let accounts_data = self.accounts_data.load(); + if tcp::Tier::T1.is_allowed_routed(&msg) { + for key in accounts_data.keys_by_id.get(account_id).iter().flat_map(|keys| keys.iter()) + { + let data = match accounts_data.data.get(key) { + Some(data) => data, + None => continue, + }; + let conn = match self.get_tier1_proxy(data) { + Some(conn) => conn, + None => continue, + }; + // TODO(gprusak): in case of PartialEncodedChunk, consider stripping everything + // but the header. This will bound the message size + conn.send_message(Arc::new(PeerMessage::Routed(self.sign_message( + clock, + RawRoutedMessage { + target: PeerIdOrHash::PeerId(data.peer_id.clone()), + body: msg, + }, + )))); + return true; + } + } + + let peer_id_from_account_data = accounts_data + .keys_by_id + .get(account_id) + .iter() + .flat_map(|keys| keys.iter()) + .flat_map(|key| accounts_data.data.get(key)) + .next() + .map(|data| data.peer_id.clone()); + // Find the target peer_id: + // - first look it up in self.accounts_data + // - if missing, fall back to lookup in self.graph.routing_table + // We want to deprecate self.graph.routing_table.account_owner in the next release. + let target = if let Some(peer_id) = peer_id_from_account_data { + metrics::ACCOUNT_TO_PEER_LOOKUPS.with_label_values(&["AccountData"]).inc(); + peer_id + } else if let Some(peer_id) = self.account_announcements.get_account_owner(account_id) { + metrics::ACCOUNT_TO_PEER_LOOKUPS.with_label_values(&["AnnounceAccount"]).inc(); + peer_id + } else { + // TODO(MarX, #1369): Message is dropped here. Define policy for this case. + metrics::MessageDropped::UnknownAccount.inc(&msg); + tracing::debug!(target: "network", + account_id = ?self.config.validator.account_id(), + to = ?account_id, + ?msg,"Drop message: unknown account", + ); + tracing::trace!(target: "network", known_peers = ?self.account_announcements.get_accounts_keys(), "Known peers"); + return false; + }; + + let mut success = false; + let msg = RawRoutedMessage { target: PeerIdOrHash::PeerId(target), body: msg }; + let msg = self.sign_message(clock, msg); + for _ in 0..msg.body.message_resend_count() { + success |= self.send_message_to_peer(clock, tcp::Tier::T2, msg.clone()); + } + success + } + pub async fn receive_routed_message( self: &Arc, clock: &time::Clock, diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 0652a5428f3..09ad567abae 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -1071,12 +1071,14 @@ impl PeerManagerActor { NetworkResponses::NoResponse } NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple) => { + let arbiter = actix::Arbiter::current(); validator_witness_tuple.into_par_iter().for_each( |(chunk_validator, partial_witness)| { - self.state.send_message_to_account( + self.state.send_message_to_account_with_arbiter( &self.clock, &chunk_validator, RoutedMessageBody::PartialEncodedStateWitness(partial_witness), + arbiter.clone(), ); }, ); @@ -1086,13 +1088,15 @@ impl PeerManagerActor { chunk_validators, partial_witness, ) => { + let arbiter = actix::Arbiter::current(); chunk_validators.into_par_iter().for_each(|chunk_validator| { - self.state.send_message_to_account( + self.state.send_message_to_account_with_arbiter( &self.clock, &chunk_validator, RoutedMessageBody::PartialEncodedStateWitnessForward( partial_witness.clone(), ), + arbiter.clone(), ); }); NetworkResponses::NoResponse