Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(state-sync): set the sync hash only when it's final #12720

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3964,13 +3964,15 @@ impl Chain {
Ok(SnapshotAction::None)
}
} else {
let Some(sync_hash) = self.get_sync_hash(&head.last_block_hash)? else {
return Ok(SnapshotAction::None);
};
if sync_hash == head.last_block_hash {
// note that here we're returning prev_block_hash instead of last_block_hash because in this case
// we can't detect the right sync hash until it is actually applied as the head block
Ok(SnapshotAction::MakeSnapshot(head.prev_block_hash))
let is_sync_prev = crate::state_sync::is_sync_prev_hash(
self.chain_store.store(),
&head.last_block_hash,
&head.prev_block_hash,
)?;
if is_sync_prev {
// Here the head block is the prev block of what the sync hash will be, and the previous
// block is the point in the chain we want to snapshot state for
Ok(SnapshotAction::MakeSnapshot(head.last_block_hash))
} else {
Ok(SnapshotAction::None)
}
Expand Down
147 changes: 110 additions & 37 deletions chain/chain/src/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@ fn get_state_sync_new_chunks(
Ok(store.get_ser(DBCol::StateSyncNewChunks, block_hash.as_ref())?)
}

fn iter_state_sync_new_chunks_keys<'a>(
store: &'a Store,
) -> impl Iterator<Item = Result<CryptoHash, std::io::Error>> + 'a {
store
.iter(DBCol::StateSyncNewChunks)
.map(|item| item.and_then(|(k, _v)| CryptoHash::try_from_slice(&k)))
}

fn iter_state_sync_hashes_keys<'a>(
store: &'a Store,
) -> impl Iterator<Item = Result<EpochId, std::io::Error>> + 'a {
Expand All @@ -31,17 +23,18 @@ fn iter_state_sync_hashes_keys<'a>(
.map(|item| item.and_then(|(k, _v)| EpochId::try_from_slice(&k)))
}

/// Saves new chunk info and returns whether there are at least 2 chunks per shard in the epoch for header.prev_hash()
fn save_epoch_new_chunks<T: ChainStoreAccess>(
chain_store: &T,
store_update: &mut StoreUpdate,
header: &BlockHeader,
) -> Result<(), Error> {
) -> Result<bool, Error> {
let Some(mut num_new_chunks) =
get_state_sync_new_chunks(chain_store.store(), header.prev_hash())?
else {
// This might happen in the case of epoch sync where we save individual headers without having all
// headers that belong to the epoch.
return Ok(());
return Ok(false);
};

// This shouldn't happen because block headers in the same epoch should have chunks masks
Expand All @@ -53,17 +46,11 @@ fn save_epoch_new_chunks<T: ChainStoreAccess>(
block_hash=%header.hash(), chunk_mask_len=%header.chunk_mask().len(), stored_len=%num_new_chunks.len(),
"block header's chunk mask not of the same length as stored value in DBCol::StateSyncNewChunks",
);
return Ok(());
return Ok(false);
}

let done = num_new_chunks.iter().all(|num_chunks| *num_chunks >= 2);
if done {
// TODO(current_epoch_state_sync): this will not be correct if this block doesn't end up finalized on the main chain.
// We should fix it by setting the sync hash when it's finalized, which requires making changes to how we take state snapshots.
store_update.set_ser(DBCol::StateSyncHashes, header.epoch_id().as_ref(), header.hash())?;
store_update.delete_all(DBCol::StateSyncNewChunks);
return Ok(());
}

for (num_new_chunks, new_chunk) in num_new_chunks.iter_mut().zip(header.chunk_mask().iter()) {
// Only need to reach 2, so don't bother adding more than that
if *new_chunk && *num_new_chunks < 2 {
Expand All @@ -72,7 +59,7 @@ fn save_epoch_new_chunks<T: ChainStoreAccess>(
}

store_update.set_ser(DBCol::StateSyncNewChunks, header.hash().as_ref(), &num_new_chunks)?;
Ok(())
Ok(done)
}

fn on_new_epoch(store_update: &mut StoreUpdate, header: &BlockHeader) -> Result<(), Error> {
Expand All @@ -96,31 +83,89 @@ fn remove_old_epochs(
Ok(())
}

fn remove_old_blocks<T: ChainStoreAccess>(
/// Helper to turn DBNotFoundErr() or the genesis header into None. We might get DBNotFoundErr() in the case
/// of epoch sync where we save individual headers without having all headers that belong to the epoch.
fn get_block_header<T: ChainStoreAccess>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a method with the same name in Chain - can you rename to something unique to avoid confusion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to treat genesis differently? Is this method the best place to deal with that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm get_block_header -> maybe_get_block_header?

And yea actually good point, moved the genesis height check outside this function and only call it in one place now. I think it's safest to perform the check because the first epoch also has a zero epoch ID

chain_store: &T,
block_hash: &CryptoHash,
) -> Result<Option<BlockHeader>, Error> {
match chain_store.get_block_header(block_hash) {
Ok(h) => {
if h.height() != chain_store.get_genesis_height() {
Ok(Some(h))
} else {
Ok(None)
}
}
Comment on lines +93 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: h -> block_header
mini nit: maybe move the genesis handling outside of this match - if it makes things cleaner

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// This might happen in the case of epoch sync where we save individual headers without having all
// headers that belong to the epoch.
Err(Error::DBNotFoundErr(_)) => Ok(None),
Err(e) => Err(e),
}
}

fn has_enough_new_chunks(store: &Store, block_hash: &CryptoHash) -> Result<Option<bool>, Error> {
let Some(num_new_chunks) = get_state_sync_new_chunks(store, block_hash)? else {
// This might happen in the case of epoch sync where we save individual headers without having all
// headers that belong to the epoch.
return Ok(None);
};
Ok(Some(num_new_chunks.iter().all(|num_chunks| *num_chunks >= 2)))
}
Comment on lines +107 to +114
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you need the Option in the return type? Can you just return bool?


/// Save num new chunks info and store the state sync hash if it has been found. We store it only
/// once it becomes final.
fn on_new_header<T: ChainStoreAccess>(
chain_store: &T,
store_update: &mut StoreUpdate,
header: &BlockHeader,
) -> Result<(), Error> {
if header.last_final_block() == &CryptoHash::default() {
let done = save_epoch_new_chunks(chain_store, store_update, header)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to save new chunks for a non final block header? It may very well be discarded and not included on the canonical chain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm what's the bug though?

We store the new chunks for every new header stored, and if a header that gets passed to save_epoch_new_chunks() doesn't end up on the canonical chain, the consequence is that we never look at or touch that row in the DB again until we delete_all(DBCol::StateSyncNewChunks).

We need to store the number of new chunks at some point, and if we wait until a block is finalized to do it, the logic gets more complicated. In this version where we just call save_epoch_new_chunks() on every new non-final head update, we can assume that the info for the prev_hash of the header currently being processed has already been stored. But if we only do save_epoch_new_chunks() on each new last_final_block, we possibly need to iterate backwards until we find one that has new chunks info stored. And this backwards iteration will possibly need to go further back than what we already have here. Actually I'm not really sure off the top of my head what the upper bound is on how far back we'd need to go (basically, the upper bound on the number of blocks a final head update could jump over)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, sorry, I thought the new chunks are stored per height but they are stored per block hash - it should be fine as is like you said.

Good catch about the last final block skipping some heights! I completely missed that property.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch about the last final block skipping some heights! I completely missed that property.

Hah actually to be honest I didn't know that either until writing this PR :D

if !done {
return Ok(());
}
// We don't need to keep info for old blocks around. After a block is finalized, we don't need anything before it
let last_final_header = match chain_store.get_block_header(header.last_final_block()) {
Ok(h) => h,
// This might happen in the case of epoch sync where we save individual headers without having all
// headers that belong to the epoch.
Err(Error::DBNotFoundErr(_)) => return Ok(()),
Err(e) => return Err(e),

// Now check if the sync hash is known and finalized. The sync hash is the block after the first block with at least 2
// chunks per shard in the epoch. Note that we cannot just check if the current header.last_final_block() is the sync
// hash, because even though this function is called for each header, it is not guaranteed that we'll see every block
// by checking header.last_final_block(), because it is possible for the final block to jump by more than one upon a new
// head update. So here we iterate backwards until we find it, if it exists yet.

let epoch_id = header.epoch_id();
let last_final_hash = header.last_final_block();

let Some(mut sync) = get_block_header(chain_store, last_final_hash)? else {
return Ok(());
};
for block_hash in iter_state_sync_new_chunks_keys(chain_store.store()) {
let block_hash = block_hash?;
let old_header = chain_store.get_block_header(&block_hash)?;
if old_header.height() < last_final_header.height() {
store_update.delete(DBCol::StateSyncNewChunks, block_hash.as_ref());
loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this loop always iterate from the current final block all the way back to the beginning of the epoch? You may want to add an early return if the StateSyncHashes is already set for the epoch_id.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I found this check elsewhere. Can you add a comment that this method should only be called if the sync hash wasn't found yet? Or maybe even some debug assertions.

let Some(sync_prev) = get_block_header(chain_store, sync.prev_hash())? else {
return Ok(());
};
if sync_prev.epoch_id() != epoch_id {
return Ok(());
}
if has_enough_new_chunks(chain_store.store(), sync_prev.hash())? != Some(true) {
return Ok(());
}
}

Ok(())
let Some(sync_prev_prev) = get_block_header(chain_store, sync_prev.prev_hash())? else {
return Ok(());
};
let Some(prev_prev_done) =
has_enough_new_chunks(chain_store.store(), sync_prev_prev.hash())?
else {
return Ok(());
};

if !prev_prev_done {
// `sync_prev_prev` doesn't have enough new chunks, and `sync_prev` does, meaning `sync` is the first final
// valid sync block
store_update.set_ser(DBCol::StateSyncHashes, epoch_id.as_ref(), sync.hash())?;
store_update.delete_all(DBCol::StateSyncNewChunks);
return Ok(());
}
sync = sync_prev;
}
}

/// Updates information in the DB related to calculating the correct "sync_hash" for this header's epoch,
Expand Down Expand Up @@ -154,6 +199,34 @@ pub(crate) fn update_sync_hashes<T: ChainStoreAccess>(
return remove_old_epochs(chain_store.store(), store_update, header, &prev_header);
}

save_epoch_new_chunks(chain_store, store_update, header)?;
remove_old_blocks(chain_store, store_update, header)
on_new_header(chain_store, store_update, header)
}

///. Returns whether `block_hash` is the block that will appear immediately before the "sync_hash" block. That is,
/// whether it is going to be the prev_hash of the "sync_hash" block, when it is found.
///
/// `block_hash` is the prev_hash of the future "sync_hash" block iff the number of new chunks in the epoch in
/// each shard is at least 2, and at least one shard of the block `prev_hash` doesn't yet have 2 new chunks in it.
Comment on lines +208 to +209
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe say "is the first block in the that has the X property" instead of "is a block that has the X property but the previous block does not"

/// This function can only return true before we save the "sync_hash" block to the `StateSyncHashes` column,
/// because it relies on data stored in the `StateSyncNewChunks` column, which is cleaned up after that.
///
/// This is used when making state snapshots, because in that case we don't need to wait for the "sync_hash"
/// block to be finalized to take a snapshot of the state as of its prev prev block
pub(crate) fn is_sync_prev_hash(
store: &Store,
block_hash: &CryptoHash,
prev_hash: &CryptoHash,
) -> Result<bool, Error> {
let Some(new_chunks) = get_state_sync_new_chunks(store, block_hash)? else {
return Ok(false);
};
let done = new_chunks.iter().all(|num_chunks| *num_chunks >= 2);
if !done {
return Ok(false);
}
let Some(prev_new_chunks) = get_state_sync_new_chunks(store, prev_hash)? else {
return Ok(false);
};
let prev_done = prev_new_chunks.iter().all(|num_chunks| *num_chunks >= 2);
Ok(!prev_done)
}
2 changes: 1 addition & 1 deletion integration-tests/src/test_loop/tests/resharding_v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::test_loop::utils::{ONE_NEAR, TGAS};
use near_parameters::{vm, RuntimeConfig, RuntimeConfigStore};

/// Default and minimal epoch length used in resharding tests.
const DEFAULT_EPOCH_LENGTH: u64 = 6;
const DEFAULT_EPOCH_LENGTH: u64 = 7;

/// Increased epoch length that has to be used in some tests due to the delay caused by catch up.
///
Expand Down
Loading
Loading