Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Rename BodySync to BlockSync for consistency #10293

Merged
merged 16 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 45 additions & 31 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ pub fn check_header_known(
chain: &Chain,
header: &BlockHeader,
) -> Result<Result<(), BlockKnownError>, Error> {
// TODO: Change the return type to Result<BlockKnownStatusEnum, Error>.
let header_head = chain.store().header_head()?;
if header.hash() == &header_head.last_block_hash
|| header.hash() == &header_head.prev_block_hash
Expand All @@ -411,6 +412,7 @@ fn check_known_store(
chain: &Chain,
block_hash: &CryptoHash,
) -> Result<Result<(), BlockKnownError>, Error> {
// TODO: Change the return type to Result<BlockKnownStatusEnum, Error>.
if chain.store().block_exists(block_hash)? {
Ok(Err(BlockKnownError::KnownInStore))
} else {
Expand All @@ -427,6 +429,7 @@ pub fn check_known(
chain: &Chain,
block_hash: &CryptoHash,
) -> Result<Result<(), BlockKnownError>, Error> {
// TODO: Change the return type to Result<BlockKnownStatusEnum, Error>.
let head = chain.store().head()?;
// Quick in-memory check for fast-reject any block handled recently.
if block_hash == &head.last_block_hash || block_hash == &head.prev_block_hash {
Expand Down Expand Up @@ -1890,59 +1893,70 @@ impl Chain {
mut headers: Vec<BlockHeader>,
challenges: &mut Vec<ChallengeBody>,
) -> Result<(), Error> {
// Sort headers by heights if they are out of order.
// Sort headers by heights.
headers.sort_by_key(|left| left.height());

if let Some(header) = headers.first() {
debug!(target: "chain", "Sync block headers: {} headers from {} at {}", headers.len(), header.hash(), header.height());
if let (Some(first_header), Some(last_header)) = (headers.first(), headers.last()) {
info!(
target: "chain",
num_headers = headers.len(),
first_hash = ?first_header.hash(),
first_height = first_header.height(),
last_hash = ?last_header.hash(),
last_height = ?last_header.height(),
"Sync block headers");
} else {
// No headers.
return Ok(());
};

// Performance optimization to skip looking up every header in the store.
let all_known = if let Some(last_header) = headers.last() {
// If the last header is known, then the other headers are known too.
self.store.get_block_header(last_header.hash()).is_ok()
} else {
false
// Empty set of headers, therefore all received headers are known.
true
};

if !all_known {
// Validate header and then add to the chain.
for header in headers.iter() {
match check_header_known(self, header)? {
Ok(_) => {}
Err(_) => continue,
}
if all_known {
return Ok(());
}
nikurt marked this conversation as resolved.
Show resolved Hide resolved

self.validate_header(header, &Provenance::SYNC, challenges)?;
let mut chain_update = self.chain_update();
chain_update.chain_store_update.save_block_header(header.clone())?;
// Validate header and then add to the chain.
for header in headers.iter() {
match check_header_known(self, header)? {
Ok(_) => {}
Err(_) => continue,
}

// Add validator proposals for given header.
let last_finalized_height =
chain_update.chain_store_update.get_block_height(header.last_final_block())?;
let epoch_manager_update = chain_update
.epoch_manager
.add_validator_proposals(BlockHeaderInfo::new(header, last_finalized_height))?;
chain_update.chain_store_update.merge(epoch_manager_update);
chain_update.commit()?;
self.validate_header(header, &Provenance::SYNC, challenges)?;
let mut chain_update = self.chain_update();
chain_update.chain_store_update.save_block_header(header.clone())?;

#[cfg(feature = "new_epoch_sync")]
{
// At this point BlockInfo for this header should be in DB and in `epoch_manager`s cache because of `add_validator_proposals` call.
let mut chain_update = self.chain_update();
chain_update.save_epoch_sync_info_if_finalised(header)?;
chain_update.commit()?;
}
// Add validator proposals for given header.
let last_finalized_height =
chain_update.chain_store_update.get_block_height(header.last_final_block())?;
let epoch_manager_update = chain_update
.epoch_manager
.add_validator_proposals(BlockHeaderInfo::new(header, last_finalized_height))?;
chain_update.chain_store_update.merge(epoch_manager_update);
chain_update.commit()?;

#[cfg(feature = "new_epoch_sync")]
{
// At this point BlockInfo for this header should be in DB and in `epoch_manager`s cache because of `add_validator_proposals` call.
let mut chain_update = self.chain_update();
chain_update.save_epoch_sync_info_if_finalised(header)?;
chain_update.commit()?;
}
}

let mut chain_update = self.chain_update();

if let Some(header) = headers.last() {
// Update header_head if it's the new tip
chain_update.update_header_head_if_not_challenged(header)?;
}

chain_update.commit()
}

Expand Down
20 changes: 12 additions & 8 deletions chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,12 @@ pub enum SyncStatus {
StateSync(StateSyncStatus),
/// Sync state across all shards is done.
StateSyncDone,
/// Catch up on blocks.
BodySync { start_height: BlockHeight, current_height: BlockHeight, highest_height: BlockHeight },
/// Download and process blocks until the head reaches the head of the network.
BlockSync {
start_height: BlockHeight,
current_height: BlockHeight,
highest_height: BlockHeight,
},
}

impl SyncStatus {
Expand All @@ -311,18 +315,18 @@ impl SyncStatus {
// Represent NoSync as 0 because it is the state of a normal well-behaving node.
SyncStatus::NoSync => 0,
SyncStatus::AwaitingPeers => 1,
SyncStatus::EpochSync { epoch_ord: _ } => 2,
SyncStatus::HeaderSync { start_height: _, current_height: _, highest_height: _ } => 3,
SyncStatus::EpochSync { .. } => 2,
SyncStatus::HeaderSync { .. } => 3,
SyncStatus::StateSync(_) => 4,
SyncStatus::StateSyncDone => 5,
SyncStatus::BodySync { start_height: _, current_height: _, highest_height: _ } => 6,
SyncStatus::BlockSync { .. } => 6,
}
}

pub fn start_height(&self) -> Option<BlockHeight> {
match self {
SyncStatus::HeaderSync { start_height, .. } => Some(*start_height),
SyncStatus::BodySync { start_height, .. } => Some(*start_height),
SyncStatus::BlockSync { start_height, .. } => Some(*start_height),
_ => None,
}
}
Expand Down Expand Up @@ -353,8 +357,8 @@ impl From<SyncStatus> for SyncStatusView {
.collect(),
),
SyncStatus::StateSyncDone => SyncStatusView::StateSyncDone,
SyncStatus::BodySync { start_height, current_height, highest_height } => {
SyncStatusView::BodySync { start_height, current_height, highest_height }
SyncStatus::BlockSync { start_height, current_height, highest_height } => {
SyncStatusView::BlockSync { start_height, current_height, highest_height }
}
}
}
Expand Down
59 changes: 40 additions & 19 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,21 +428,11 @@ impl Handler<WithSpanContext<BlockResponse>> for ClientActor {
.store()
.get_all_block_hashes_by_height(block.header().height());
if was_requested || blocks_at_height.is_err() || blocks_at_height.as_ref().unwrap().is_empty() {
if let SyncStatus::StateSync(StateSyncStatus{ sync_hash, .. }) = &mut this.client.sync_status {
if let Ok(header) = this.client.chain.get_block_header(sync_hash) {
if block.hash() == header.prev_hash() {
if let Err(e) = this.client.chain.save_block(block.into()) {
error!(target: "client", "Failed to save a block during state sync: {}", e);
}
} else if block.hash() == sync_hash {
// This is the immediate block after a state sync
// We can afford to delay requesting missing chunks for this one block
if let Err(e) = this.client.chain.save_orphan(block.into(), false) {
error!(target: "client", "Received an invalid block during state sync: {}", e);
}
}
return;
}
// This is a very sneaky piece of logic.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is good to know :)

if this.maybe_receive_state_sync_blocks(&block) {
// A node is syncing its state. Don't consider receiving
// blocks other than the few special ones that State Sync expects.
return;
}
this.client.receive_block(
block,
Expand Down Expand Up @@ -1377,17 +1367,17 @@ impl ClientActor {
debug_span!(target: "client", "receive_headers", num_headers = headers.len(), ?peer_id)
.entered();
if headers.is_empty() {
info!(target: "client", "Received an empty set of block headers");
return true;
}
info!(target: "client", "Received block headers from height {} to {}", headers.first().unwrap().height(), headers.last().unwrap().height());
match self.client.sync_block_headers(headers) {
Ok(_) => true,
Err(err) => {
if err.is_bad_data() {
error!(target: "client", "Error processing sync blocks: {}", err);
error!(target: "client", ?err, "Error processing sync blocks");
false
} else {
debug!(target: "client", "Block headers refused by chain: {}", err);
debug!(target: "client", ?err, "Block headers refused by chain");
true
}
}
Expand Down Expand Up @@ -1762,7 +1752,7 @@ impl ClientActor {

self.client
.process_block_processing_artifact(block_processing_artifacts);
self.client.sync_status.update(SyncStatus::BodySync {
self.client.sync_status.update(SyncStatus::BlockSync {
start_height: 0,
current_height: 0,
highest_height: 0,
Expand All @@ -1784,6 +1774,37 @@ impl ClientActor {
&self.config_updater,
)
}

/// Checks if the node is syncing its State and applies special logic in that case.
/// A node usually ignores blocks that are too far ahead, but in case of a node syncing its state it is looking for 2 specific blocks:
/// * The first block of the new epoch
/// * The last block of the prev epoch
/// Returns whether the node is syncing its state.
fn maybe_receive_state_sync_blocks(&mut self, block: &Block) -> bool {
let SyncStatus::StateSync(StateSyncStatus { sync_hash, .. }) = self.client.sync_status
else {
return false;
};
if let Ok(header) = self.client.chain.get_block_header(&sync_hash) {
let block: MaybeValidated<Block> = (*block).clone().into();
let block_hash = *block.hash();
// Notice that two blocks are saved differently:
// * save_block() for one block.
// * save_orphan() for another block.
if &block_hash == header.prev_hash() {
// The last block of the previous epoch.
if let Err(err) = self.client.chain.save_block(block) {
error!(target: "client", ?err, ?block_hash, "Failed to save a block during state sync");
}
} else if block_hash == sync_hash {
// The first block of the new epoch.
if let Err(err) = self.client.chain.save_orphan(block, false) {
error!(target: "client", ?err, ?block_hash, "Received an invalid block during state sync");
}
}
}
true
}
}

impl Drop for ClientActor {
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ pub fn display_sync_status(
current_height
)
}
SyncStatus::BodySync { start_height, current_height, highest_height } => {
SyncStatus::BlockSync { start_height, current_height, highest_height } => {
let percent = if highest_height <= start_height {
0.0
} else {
Expand Down
Loading
Loading