-
Notifications
You must be signed in to change notification settings - Fork 660
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(state-sync): set the sync hash only when it's final #12720
base: master
Are you sure you want to change the base?
Changes from all commits
b170b13
14a1b50
4008661
8374ec4
0fc16d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,14 +15,6 @@ fn get_state_sync_new_chunks( | |
Ok(store.get_ser(DBCol::StateSyncNewChunks, block_hash.as_ref())?) | ||
} | ||
|
||
fn iter_state_sync_new_chunks_keys<'a>( | ||
store: &'a Store, | ||
) -> impl Iterator<Item = Result<CryptoHash, std::io::Error>> + 'a { | ||
store | ||
.iter(DBCol::StateSyncNewChunks) | ||
.map(|item| item.and_then(|(k, _v)| CryptoHash::try_from_slice(&k))) | ||
} | ||
|
||
fn iter_state_sync_hashes_keys<'a>( | ||
store: &'a Store, | ||
) -> impl Iterator<Item = Result<EpochId, std::io::Error>> + 'a { | ||
|
@@ -31,17 +23,18 @@ fn iter_state_sync_hashes_keys<'a>( | |
.map(|item| item.and_then(|(k, _v)| EpochId::try_from_slice(&k))) | ||
} | ||
|
||
/// Saves new chunk info and returns whether there are at least 2 chunks per shard in the epoch for header.prev_hash() | ||
fn save_epoch_new_chunks<T: ChainStoreAccess>( | ||
chain_store: &T, | ||
store_update: &mut StoreUpdate, | ||
header: &BlockHeader, | ||
) -> Result<(), Error> { | ||
) -> Result<bool, Error> { | ||
let Some(mut num_new_chunks) = | ||
get_state_sync_new_chunks(chain_store.store(), header.prev_hash())? | ||
else { | ||
// This might happen in the case of epoch sync where we save individual headers without having all | ||
// headers that belong to the epoch. | ||
return Ok(()); | ||
return Ok(false); | ||
}; | ||
|
||
// This shouldn't happen because block headers in the same epoch should have chunks masks | ||
|
@@ -53,17 +46,11 @@ fn save_epoch_new_chunks<T: ChainStoreAccess>( | |
block_hash=%header.hash(), chunk_mask_len=%header.chunk_mask().len(), stored_len=%num_new_chunks.len(), | ||
"block header's chunk mask not of the same length as stored value in DBCol::StateSyncNewChunks", | ||
); | ||
return Ok(()); | ||
return Ok(false); | ||
} | ||
|
||
let done = num_new_chunks.iter().all(|num_chunks| *num_chunks >= 2); | ||
if done { | ||
// TODO(current_epoch_state_sync): this will not be correct if this block doesn't end up finalized on the main chain. | ||
// We should fix it by setting the sync hash when it's finalized, which requires making changes to how we take state snapshots. | ||
store_update.set_ser(DBCol::StateSyncHashes, header.epoch_id().as_ref(), header.hash())?; | ||
store_update.delete_all(DBCol::StateSyncNewChunks); | ||
return Ok(()); | ||
} | ||
|
||
for (num_new_chunks, new_chunk) in num_new_chunks.iter_mut().zip(header.chunk_mask().iter()) { | ||
// Only need to reach 2, so don't bother adding more than that | ||
if *new_chunk && *num_new_chunks < 2 { | ||
|
@@ -72,7 +59,7 @@ fn save_epoch_new_chunks<T: ChainStoreAccess>( | |
} | ||
|
||
store_update.set_ser(DBCol::StateSyncNewChunks, header.hash().as_ref(), &num_new_chunks)?; | ||
Ok(()) | ||
Ok(done) | ||
} | ||
|
||
fn on_new_epoch(store_update: &mut StoreUpdate, header: &BlockHeader) -> Result<(), Error> { | ||
|
@@ -96,31 +83,89 @@ fn remove_old_epochs( | |
Ok(()) | ||
} | ||
|
||
fn remove_old_blocks<T: ChainStoreAccess>( | ||
/// Helper to turn DBNotFoundErr() or the genesis header into None. We might get DBNotFoundErr() in the case | ||
/// of epoch sync where we save individual headers without having all headers that belong to the epoch. | ||
fn get_block_header<T: ChainStoreAccess>( | ||
chain_store: &T, | ||
block_hash: &CryptoHash, | ||
) -> Result<Option<BlockHeader>, Error> { | ||
match chain_store.get_block_header(block_hash) { | ||
Ok(h) => { | ||
if h.height() != chain_store.get_genesis_height() { | ||
Ok(Some(h)) | ||
} else { | ||
Ok(None) | ||
} | ||
} | ||
Comment on lines
+93
to
+99
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: h -> block_header There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
// This might happen in the case of epoch sync where we save individual headers without having all | ||
// headers that belong to the epoch. | ||
Err(Error::DBNotFoundErr(_)) => Ok(None), | ||
Err(e) => Err(e), | ||
} | ||
} | ||
|
||
fn has_enough_new_chunks(store: &Store, block_hash: &CryptoHash) -> Result<Option<bool>, Error> { | ||
let Some(num_new_chunks) = get_state_sync_new_chunks(store, block_hash)? else { | ||
// This might happen in the case of epoch sync where we save individual headers without having all | ||
// headers that belong to the epoch. | ||
return Ok(None); | ||
}; | ||
Ok(Some(num_new_chunks.iter().all(|num_chunks| *num_chunks >= 2))) | ||
} | ||
Comment on lines
+107
to
+114
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why you need the Option in the return type? Can you just return bool? |
||
|
||
/// Save num new chunks info and store the state sync hash if it has been found. We store it only | ||
/// once it becomes final. | ||
fn on_new_header<T: ChainStoreAccess>( | ||
chain_store: &T, | ||
store_update: &mut StoreUpdate, | ||
header: &BlockHeader, | ||
) -> Result<(), Error> { | ||
if header.last_final_block() == &CryptoHash::default() { | ||
let done = save_epoch_new_chunks(chain_store, store_update, header)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it ok to save new chunks for a non final block header? It may very well be discarded and not included on the canonical chain. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm what's the bug though? We store the new chunks for every new header stored, and if a header that gets passed to We need to store the number of new chunks at some point, and if we wait until a block is finalized to do it, the logic gets more complicated. In this version where we just call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah yeah, sorry, I thought the new chunks are stored per height but they are stored per block hash - it should be fine as is like you said. Good catch about the last final block skipping some heights! I completely missed that property. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Hah actually to be honest I didn't know that either until writing this PR :D |
||
if !done { | ||
return Ok(()); | ||
} | ||
// We don't need to keep info for old blocks around. After a block is finalized, we don't need anything before it | ||
let last_final_header = match chain_store.get_block_header(header.last_final_block()) { | ||
Ok(h) => h, | ||
// This might happen in the case of epoch sync where we save individual headers without having all | ||
// headers that belong to the epoch. | ||
Err(Error::DBNotFoundErr(_)) => return Ok(()), | ||
Err(e) => return Err(e), | ||
|
||
// Now check if the sync hash is known and finalized. The sync hash is the block after the first block with at least 2 | ||
// chunks per shard in the epoch. Note that we cannot just check if the current header.last_final_block() is the sync | ||
// hash, because even though this function is called for each header, it is not guaranteed that we'll see every block | ||
// by checking header.last_final_block(), because it is possible for the final block to jump by more than one upon a new | ||
// head update. So here we iterate backwards until we find it, if it exists yet. | ||
|
||
let epoch_id = header.epoch_id(); | ||
let last_final_hash = header.last_final_block(); | ||
|
||
let Some(mut sync) = get_block_header(chain_store, last_final_hash)? else { | ||
return Ok(()); | ||
}; | ||
for block_hash in iter_state_sync_new_chunks_keys(chain_store.store()) { | ||
let block_hash = block_hash?; | ||
let old_header = chain_store.get_block_header(&block_hash)?; | ||
if old_header.height() < last_final_header.height() { | ||
store_update.delete(DBCol::StateSyncNewChunks, block_hash.as_ref()); | ||
loop { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this loop always iterate from the current final block all the way back to the beginning of the epoch? You may want to add an early return if the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I found this check elsewhere. Can you add a comment that this method should only be called if the sync hash wasn't found yet? Or maybe even some debug assertions. |
||
let Some(sync_prev) = get_block_header(chain_store, sync.prev_hash())? else { | ||
return Ok(()); | ||
}; | ||
if sync_prev.epoch_id() != epoch_id { | ||
return Ok(()); | ||
} | ||
if has_enough_new_chunks(chain_store.store(), sync_prev.hash())? != Some(true) { | ||
return Ok(()); | ||
} | ||
} | ||
|
||
Ok(()) | ||
let Some(sync_prev_prev) = get_block_header(chain_store, sync_prev.prev_hash())? else { | ||
return Ok(()); | ||
}; | ||
let Some(prev_prev_done) = | ||
has_enough_new_chunks(chain_store.store(), sync_prev_prev.hash())? | ||
else { | ||
return Ok(()); | ||
}; | ||
|
||
if !prev_prev_done { | ||
// `sync_prev_prev` doesn't have enough new chunks, and `sync_prev` does, meaning `sync` is the first final | ||
// valid sync block | ||
store_update.set_ser(DBCol::StateSyncHashes, epoch_id.as_ref(), sync.hash())?; | ||
store_update.delete_all(DBCol::StateSyncNewChunks); | ||
return Ok(()); | ||
} | ||
sync = sync_prev; | ||
} | ||
} | ||
|
||
/// Updates information in the DB related to calculating the correct "sync_hash" for this header's epoch, | ||
|
@@ -154,6 +199,34 @@ pub(crate) fn update_sync_hashes<T: ChainStoreAccess>( | |
return remove_old_epochs(chain_store.store(), store_update, header, &prev_header); | ||
} | ||
|
||
save_epoch_new_chunks(chain_store, store_update, header)?; | ||
remove_old_blocks(chain_store, store_update, header) | ||
on_new_header(chain_store, store_update, header) | ||
} | ||
|
||
///. Returns whether `block_hash` is the block that will appear immediately before the "sync_hash" block. That is, | ||
/// whether it is going to be the prev_hash of the "sync_hash" block, when it is found. | ||
/// | ||
/// `block_hash` is the prev_hash of the future "sync_hash" block iff the number of new chunks in the epoch in | ||
/// each shard is at least 2, and at least one shard of the block `prev_hash` doesn't yet have 2 new chunks in it. | ||
Comment on lines
+208
to
+209
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Maybe say "is the first block in the that has the X property" instead of "is a block that has the X property but the previous block does not" |
||
/// This function can only return true before we save the "sync_hash" block to the `StateSyncHashes` column, | ||
/// because it relies on data stored in the `StateSyncNewChunks` column, which is cleaned up after that. | ||
/// | ||
/// This is used when making state snapshots, because in that case we don't need to wait for the "sync_hash" | ||
/// block to be finalized to take a snapshot of the state as of its prev prev block | ||
pub(crate) fn is_sync_prev_hash( | ||
store: &Store, | ||
block_hash: &CryptoHash, | ||
prev_hash: &CryptoHash, | ||
) -> Result<bool, Error> { | ||
let Some(new_chunks) = get_state_sync_new_chunks(store, block_hash)? else { | ||
return Ok(false); | ||
}; | ||
let done = new_chunks.iter().all(|num_chunks| *num_chunks >= 2); | ||
if !done { | ||
return Ok(false); | ||
} | ||
let Some(prev_new_chunks) = get_state_sync_new_chunks(store, prev_hash)? else { | ||
return Ok(false); | ||
}; | ||
let prev_done = prev_new_chunks.iter().all(|num_chunks| *num_chunks >= 2); | ||
Ok(!prev_done) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a method with the same name in Chain - can you rename to something unique to avoid confusion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to treat genesis differently? Is this method the best place to deal with that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm
get_block_header
->maybe_get_block_header
?And yea actually good point, moved the genesis height check outside this function and only call it in one place now. I think it's safest to perform the check because the first epoch also has a zero epoch ID