Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
stedfn committed Jan 4, 2025
1 parent 35247ff commit cc26e2b
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 2 deletions.
92 changes: 92 additions & 0 deletions chain/network/src/peer_manager/network_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::types::{
ChainInfo, PeerManagerSenderForNetwork, PeerType, ReasonForBan, StatePartRequestBody,
Tier3Request, Tier3RequestBody,
};
use actix::ArbiterHandle;
use anyhow::Context;
use arc_swap::ArcSwap;
use near_async::messaging::{CanSend, SendAsync, Sender};
Expand Down Expand Up @@ -698,6 +699,97 @@ impl NetworkState {
success
}

/// Send message to specific account.
/// Return whether the message is sent or not.
/// The message might be sent over TIER1 or TIER2 connection depending on the message type.
pub fn send_message_to_account_with_arbiter(
self: &Arc<Self>,
clock: &time::Clock,
account_id: &AccountId,
msg: RoutedMessageBody,
arbiter: ArbiterHandle,
) -> bool {
// If the message is allowed to be sent to self, we handle it directly.
if self.config.validator.account_id().is_some_and(|id| &id == account_id) {
// For now, we don't allow some types of messages to be sent to self.
debug_assert!(msg.allow_sending_to_self());
let this = self.clone();
let clock = clock.clone();
let peer_id = self.config.node_id();
let msg = self.sign_message(
&clock,
RawRoutedMessage { target: PeerIdOrHash::PeerId(peer_id.clone()), body: msg },
);
arbiter.spawn(async move {
this.receive_routed_message(&clock, peer_id, msg.hash(), msg.msg.body).await;
});
return true;
}

let accounts_data = self.accounts_data.load();
if tcp::Tier::T1.is_allowed_routed(&msg) {
for key in accounts_data.keys_by_id.get(account_id).iter().flat_map(|keys| keys.iter())
{
let data = match accounts_data.data.get(key) {
Some(data) => data,
None => continue,
};
let conn = match self.get_tier1_proxy(data) {
Some(conn) => conn,
None => continue,
};
// TODO(gprusak): in case of PartialEncodedChunk, consider stripping everything
// but the header. This will bound the message size
conn.send_message(Arc::new(PeerMessage::Routed(self.sign_message(
clock,
RawRoutedMessage {
target: PeerIdOrHash::PeerId(data.peer_id.clone()),
body: msg,
},
))));
return true;
}
}

let peer_id_from_account_data = accounts_data
.keys_by_id
.get(account_id)
.iter()
.flat_map(|keys| keys.iter())
.flat_map(|key| accounts_data.data.get(key))
.next()
.map(|data| data.peer_id.clone());
// Find the target peer_id:
// - first look it up in self.accounts_data
// - if missing, fall back to lookup in self.graph.routing_table
// We want to deprecate self.graph.routing_table.account_owner in the next release.
let target = if let Some(peer_id) = peer_id_from_account_data {
metrics::ACCOUNT_TO_PEER_LOOKUPS.with_label_values(&["AccountData"]).inc();
peer_id
} else if let Some(peer_id) = self.account_announcements.get_account_owner(account_id) {
metrics::ACCOUNT_TO_PEER_LOOKUPS.with_label_values(&["AnnounceAccount"]).inc();
peer_id
} else {
// TODO(MarX, #1369): Message is dropped here. Define policy for this case.
metrics::MessageDropped::UnknownAccount.inc(&msg);
tracing::debug!(target: "network",
account_id = ?self.config.validator.account_id(),
to = ?account_id,
?msg,"Drop message: unknown account",
);
tracing::trace!(target: "network", known_peers = ?self.account_announcements.get_accounts_keys(), "Known peers");
return false;
};

let mut success = false;
let msg = RawRoutedMessage { target: PeerIdOrHash::PeerId(target), body: msg };
let msg = self.sign_message(clock, msg);
for _ in 0..msg.body.message_resend_count() {
success |= self.send_message_to_peer(clock, tcp::Tier::T2, msg.clone());
}
success
}

pub async fn receive_routed_message(
self: &Arc<Self>,
clock: &time::Clock,
Expand Down
8 changes: 6 additions & 2 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1071,12 +1071,14 @@ impl PeerManagerActor {
NetworkResponses::NoResponse
}
NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple) => {
let arbiter = actix::Arbiter::current();
validator_witness_tuple.into_par_iter().for_each(
|(chunk_validator, partial_witness)| {
self.state.send_message_to_account(
self.state.send_message_to_account_with_arbiter(
&self.clock,
&chunk_validator,
RoutedMessageBody::PartialEncodedStateWitness(partial_witness),
arbiter.clone(),
);
},
);
Expand All @@ -1086,13 +1088,15 @@ impl PeerManagerActor {
chunk_validators,
partial_witness,
) => {
let arbiter = actix::Arbiter::current();
chunk_validators.into_par_iter().for_each(|chunk_validator| {
self.state.send_message_to_account(
self.state.send_message_to_account_with_arbiter(
&self.clock,
&chunk_validator,
RoutedMessageBody::PartialEncodedStateWitnessForward(
partial_witness.clone(),
),
arbiter.clone(),
);
});
NetworkResponses::NoResponse
Expand Down

0 comments on commit cc26e2b

Please sign in to comment.