Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikolay Kurtov committed Dec 5, 2023
1 parent a6f2819 commit c4ef2d8
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 67 deletions.
16 changes: 9 additions & 7 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ pub fn check_header_known(
chain: &Chain,
header: &BlockHeader,
) -> Result<Result<(), BlockKnownError>, Error> {
// TODO: Change the return type to Result<BlockKnownError, Error>.
// TODO: Change the return type to Result<BlockKnownStatusEnum, Error>.
let header_head = chain.store().header_head()?;
if header.hash() == &header_head.last_block_hash
|| header.hash() == &header_head.prev_block_hash
Expand All @@ -412,7 +412,7 @@ fn check_known_store(
chain: &Chain,
block_hash: &CryptoHash,
) -> Result<Result<(), BlockKnownError>, Error> {
// TODO: Change the return type to Result<BlockKnownError, Error>.
// TODO: Change the return type to Result<BlockKnownStatusEnum, Error>.
if chain.store().block_exists(block_hash)? {
Ok(Err(BlockKnownError::KnownInStore))
} else {
Expand All @@ -429,7 +429,7 @@ pub fn check_known(
chain: &Chain,
block_hash: &CryptoHash,
) -> Result<Result<(), BlockKnownError>, Error> {
// TODO: Change the return type to Result<BlockKnownError, Error>.
// TODO: Change the return type to Result<BlockKnownStatusEnum, Error>.
let head = chain.store().head()?;
// Quick in-memory check for fast-reject any block handled recently.
if block_hash == &head.last_block_hash || block_hash == &head.prev_block_hash {
Expand Down Expand Up @@ -1896,12 +1896,14 @@ impl Chain {
// Sort headers by heights.
headers.sort_by_key(|left| left.height());

if let Some(header) = headers.first() {
debug!(
if let (Some(first_header), Some(last_header)) = (headers.first(), headers.last()) {
info!(
target: "chain",
num_headers = headers.len(),
first_hash = ?header.hash(),
first_hash = header.height(),
first_hash = ?first_header.hash(),
first_height = first_header.height(),
last_hash = ?last_header.hash(),
last_height = ?last_header.height(),
"Sync block headers");
} else {
// No headers.
Expand Down
6 changes: 3 additions & 3 deletions chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,11 @@ impl SyncStatus {
// Represent NoSync as 0 because it is the state of a normal well-behaving node.
SyncStatus::NoSync => 0,
SyncStatus::AwaitingPeers => 1,
SyncStatus::EpochSync { epoch_ord: _ } => 2,
SyncStatus::HeaderSync { start_height: _, current_height: _, highest_height: _ } => 3,
SyncStatus::EpochSync { .. } => 2,
SyncStatus::HeaderSync { .. } => 3,
SyncStatus::StateSync(_) => 4,
SyncStatus::StateSyncDone => 5,
SyncStatus::BlockSync { start_height: _, current_height: _, highest_height: _ } => 6,
SyncStatus::BlockSync { .. } => 6,
}
}

Expand Down
51 changes: 22 additions & 29 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1362,21 +1362,14 @@ impl ClientActor {
})
}

fn receive_headers(&mut self, mut headers: Vec<BlockHeader>, peer_id: PeerId) -> bool {
fn receive_headers(&mut self, headers: Vec<BlockHeader>, peer_id: PeerId) -> bool {
let _span =
debug_span!(target: "client", "receive_headers", num_headers = headers.len(), ?peer_id)
.entered();
if headers.is_empty() {
info!(target: "client", "Received no block headers");
info!(target: "client", "Received an empty set of block headers");
return true;
}
// Sort headers by heights.
headers.sort_by_key(|left| left.height());
info!(
target: "client",
height_from = headers.first().unwrap().height(),
height_last = headers.last().unwrap().height(),
"Received block headers");
match self.client.sync_block_headers(headers) {
Ok(_) => true,
Err(err) => {
Expand Down Expand Up @@ -1788,29 +1781,29 @@ impl ClientActor {
/// * The last block of the prev epoch
/// Returns whether the node is syncing its state.
fn maybe_receive_state_sync_blocks(&mut self, block: &Block) -> bool {
match self.client.sync_status {
SyncStatus::StateSync(StateSyncStatus { sync_hash, .. }) => {
if let Ok(header) = self.client.chain.get_block_header(&sync_hash) {
let block: MaybeValidated<Block> = (*block).clone().into();
// Notice that two blocks are saved differently:
// * save_block() for one block.
// * save_orphan() for another block.
if block.hash() == header.prev_hash() {
// The last block of the previous epoch.
if let Err(err) = self.client.chain.save_block(block) {
error!(target: "client", ?err, "Failed to save a block during state sync");
}
} else if block.hash() == &sync_hash {
// The first block of the new epoch.
if let Err(err) = self.client.chain.save_orphan(block, false) {
error!(target: "client", ?err, "Received an invalid block during state sync");
}
}
let SyncStatus::StateSync(StateSyncStatus { sync_hash, .. }) = self.client.sync_status
else {
return false;
};
if let Ok(header) = self.client.chain.get_block_header(&sync_hash) {
let block: MaybeValidated<Block> = (*block).clone().into();
let block_hash = *block.hash();
// Notice that two blocks are saved differently:
// * save_block() for one block.
// * save_orphan() for another block.
if &block_hash == header.prev_hash() {
// The last block of the previous epoch.
if let Err(err) = self.client.chain.save_block(block) {
error!(target: "client", ?err, ?block_hash, "Failed to save a block during state sync");
}
} else if block_hash == sync_hash {
// The first block of the new epoch.
if let Err(err) = self.client.chain.save_orphan(block, false) {
error!(target: "client", ?err, ?block_hash, "Received an invalid block during state sync");
}
true
}
_ => false,
}
true
}
}

Expand Down
42 changes: 14 additions & 28 deletions chain/client/src/sync/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl BlockSync {
return Ok(true);
}
BlockSyncDue::RequestBlock => {
self.block_sync(chain, &head, &header_head, highest_height_peers)?;
self.block_sync(chain, highest_height_peers)?;
}
BlockSyncDue::WaitForBlock => {
// Do nothing.
Expand Down Expand Up @@ -129,17 +129,15 @@ impl BlockSync {
}

// Finds the last block on the canonical chain that is in store (processed).
fn get_last_processed_block(
&self,
chain: &Chain,
last_block_hash: &CryptoHash,
) -> Result<CryptoHash, near_chain::Error> {
fn get_last_processed_block(&self, chain: &Chain) -> Result<CryptoHash, near_chain::Error> {
// TODO: Can this function be replaced with `Chain::get_latest_known()`?
// The current chain head may not be on the canonical chain.
// Now we find the most recent block we know on the canonical chain.
// In practice the forks from the last final block are very short, so it is
// acceptable to perform this on each request.

let mut header = chain.get_block_header(&last_block_hash)?;
let head = chain.head()?;
let mut header = chain.get_block_header(&head.last_block_hash)?;
// First go back until we find the common block
while match chain.get_block_header_by_height(header.height()) {
Ok(got_header) => got_header.hash() != header.hash(),
Expand Down Expand Up @@ -176,18 +174,17 @@ impl BlockSync {
fn block_sync(
&mut self,
chain: &Chain,
chain_head: &Tip,
header_head: &Tip,
highest_height_peers: &[HighestHeightPeerInfo],
) -> Result<(), near_chain::Error> {
// Update last request now because we want to update it whether or not
// the rest of the logic succeeds.
// TODO: If this code fails we should retry ASAP. Shouldn't we?
let chain_head = chain.head()?;
self.last_request =
Some(BlockSyncRequest { head: chain_head.last_block_hash, when: StaticClock::utc() });

// The last block on the canonical chain that is processed (is in store).
let reference_hash = self.get_last_processed_block(chain, &chain_head.last_block_hash)?;
let reference_hash = self.get_last_processed_block(chain)?;

// Look ahead for MAX_BLOCK_REQUESTS block headers and add requests for
// blocks that we don't have yet.
Expand All @@ -207,6 +204,7 @@ impl BlockSync {
}
}

let header_head = chain.header_head()?;
// Assume that peers are configured to keep as many epochs does this
// node and expect peers to have blocks in the range
// [gc_stop_height, header_head.last_block_hash].
Expand Down Expand Up @@ -388,9 +386,7 @@ mod test {

// fetch three blocks at a time
for i in 0..3 {
let head = env.clients[1].chain.head().unwrap();
let header_head = env.clients[1].chain.header_head().unwrap();
block_sync.block_sync(&env.clients[1].chain, &head, &header_head, &peer_infos).unwrap();
block_sync.block_sync(&env.clients[1].chain, &peer_infos).unwrap();

let expected_blocks: Vec<_> =
blocks[i * MAX_BLOCK_REQUESTS..(i + 1) * MAX_BLOCK_REQUESTS].to_vec();
Expand All @@ -404,11 +400,9 @@ mod test {
}
}

let head = env.clients[1].chain.head().unwrap();
let header_head = env.clients[1].chain.header_head().unwrap();
// Now test when the node receives the block out of order
// fetch the next three blocks
block_sync.block_sync(&env.clients[1].chain, &head, &header_head, &peer_infos).unwrap();
block_sync.block_sync(&env.clients[1].chain, &peer_infos).unwrap();
check_hashes_from_network_adapter(
&network_adapter,
(3 * MAX_BLOCK_REQUESTS..4 * MAX_BLOCK_REQUESTS).map(|h| *blocks[h].hash()).collect(),
Expand All @@ -419,10 +413,8 @@ mod test {
Provenance::NONE,
);

let head = env.clients[1].chain.head().unwrap();
let header_head = env.clients[1].chain.header_head().unwrap();
// the next block sync should not request block[4*MAX_BLOCK_REQUESTS-1] again
block_sync.block_sync(&env.clients[1].chain, &head, &header_head, &peer_infos).unwrap();
block_sync.block_sync(&env.clients[1].chain, &peer_infos).unwrap();
check_hashes_from_network_adapter(
&network_adapter,
(3 * MAX_BLOCK_REQUESTS..4 * MAX_BLOCK_REQUESTS - 1)
Expand All @@ -438,9 +430,7 @@ mod test {
.process_block_test(MaybeValidated::from(blocks[i].clone()), Provenance::NONE);
}

let head = env.clients[1].chain.head().unwrap();
let header_head = env.clients[1].chain.header_head().unwrap();
block_sync.block_sync(&env.clients[1].chain, &head, &header_head, &peer_infos).unwrap();
block_sync.block_sync(&env.clients[1].chain, &peer_infos).unwrap();
let requested_block_hashes = collect_hashes_from_network_adapter(&network_adapter);
assert!(requested_block_hashes.is_empty(), "{:?}", requested_block_hashes);

Expand Down Expand Up @@ -473,9 +463,7 @@ mod test {
env.clients[1].chain.sync_block_headers(block_headers, &mut challenges).unwrap();
assert!(challenges.is_empty());

let head = env.clients[1].chain.head().unwrap();
let header_head = env.clients[1].chain.header_head().unwrap();
block_sync.block_sync(&env.clients[1].chain, &head, &header_head, &peer_infos).unwrap();
block_sync.block_sync(&env.clients[1].chain, &peer_infos).unwrap();
let requested_block_hashes = collect_hashes_from_network_adapter(&network_adapter);
// We don't have archival peers, and thus cannot request any blocks
assert_eq!(requested_block_hashes, HashSet::new());
Expand All @@ -485,9 +473,7 @@ mod test {
peer.archival = true;
}

let head = env.clients[1].chain.head().unwrap();
let header_head = env.clients[1].chain.header_head().unwrap();
block_sync.block_sync(&env.clients[1].chain, &head, &header_head, &peer_infos).unwrap();
block_sync.block_sync(&env.clients[1].chain, &peer_infos).unwrap();
let requested_block_hashes = collect_hashes_from_network_adapter(&network_adapter);
assert_eq!(
requested_block_hashes,
Expand Down

0 comments on commit c4ef2d8

Please sign in to comment.