Skip to content

Commit

Permalink
fix(state-sync): fix issue when untracking and then tracking a shard (#…
Browse files Browse the repository at this point in the history
…12773)

`should_catch_up_shard()` was recently modified to not catch up a shard
that we tracked in the previous epoch, because we can just continue
applying chunks for it. But `get_should_apply_chunk()` was not aware of
this, and didn't properly handle this.

In the simplest case, there was no bug, but there would have been a bug
if the list of shards we don't currently track but will track in the
next epoch contains some shards we used to track and some we didn't.
Because in that case, when we apply the first block of the epoch, we
mark it as not caught up, but then `get_should_apply_chunk()` treats all
these shard IDs equally when deciding what chunks to apply, and just
doesn't apply any non-tracked shards.

So fix it by consolidating some logic in
`EpochManager::cared_about_shard_prev_epoch_from_prev_block()` (similar
to the other cares_about_shard* fns there), and then also checking this
value in `get_should_apply_chunk()` to get logic consistent with
`should_catch_up_shard()`
  • Loading branch information
marcelo-gonzalez authored Jan 24, 2025
1 parent 532d157 commit 618ef69
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 59 deletions.
72 changes: 26 additions & 46 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -803,14 +803,12 @@ impl Chain {
epoch_id: &EpochId,
block_hash: &CryptoHash,
prev_hash: &CryptoHash,
prev_prev_hash: &CryptoHash,
) -> Result<Option<StateSyncInfo>, Error> {
let shards_to_state_sync = Chain::get_shards_to_state_sync(
self.epoch_manager.as_ref(),
&self.shard_tracker,
me,
prev_hash,
prev_prev_hash,
)?;
if shards_to_state_sync.is_empty() {
Ok(None)
Expand Down Expand Up @@ -2411,8 +2409,7 @@ impl Chain {
// For the first block of the epoch we check if we need to start download states for
// shards that we will care about in the next epoch. If there is no state to be downloaded,
// we consider that we are caught up, otherwise not
let state_sync_info =
self.get_state_sync_info(me, epoch_id, block_hash, prev_hash, prev_prev_hash)?;
let state_sync_info = self.get_state_sync_info(me, epoch_id, block_hash, prev_hash)?;
debug!(
target: "chain", %block_hash, shards_to_sync=?state_sync_info.as_ref().map(|s| s.shards()),
"Checked for shards to sync for epoch T+1 upon processing first block of epoch T"
Expand Down Expand Up @@ -2445,20 +2442,11 @@ impl Chain {
shard_tracker: &ShardTracker,
me: &Option<AccountId>,
parent_hash: &CryptoHash,
prev_prev_hash: &CryptoHash,
) -> Result<Vec<ShardId>, Error> {
let epoch_id = epoch_manager.get_epoch_id_from_prev_block(parent_hash)?;
let mut shards_to_sync = Vec::new();
for shard_id in epoch_manager.shard_ids(&epoch_id)? {
if Self::should_catch_up_shard(
epoch_manager,
shard_tracker,
me,
&epoch_id,
parent_hash,
prev_prev_hash,
shard_id,
)? {
if Self::should_catch_up_shard(shard_tracker, me, parent_hash, shard_id)? {
shards_to_sync.push(shard_id)
}
}
Expand All @@ -2474,43 +2462,26 @@ impl Chain {
/// where we'll go from tracking it to not tracking it and back to tracking it in consecutive epochs,
/// then we can just continue to apply chunks as if we were tracking it in epoch T, and there's no need to state sync.
fn should_catch_up_shard(
_epoch_manager: &dyn EpochManagerAdapter,
shard_tracker: &ShardTracker,
me: &Option<AccountId>,
epoch_id: &EpochId,
epoch_last_block: &CryptoHash,
_epoch_last_block_prev: &CryptoHash,
prev_hash: &CryptoHash,
shard_id: ShardId,
) -> Result<bool, Error> {
// Won't care about it next epoch, no need to state sync it.
if !shard_tracker.will_care_about_shard(me.as_ref(), epoch_last_block, shard_id, true) {
if !shard_tracker.will_care_about_shard(me.as_ref(), prev_hash, shard_id, true) {
return Ok(false);
}
// Currently tracking the shard, so no need to state sync it.
if shard_tracker.care_about_shard(me.as_ref(), epoch_last_block, shard_id, true) {
if shard_tracker.care_about_shard(me.as_ref(), prev_hash, shard_id, true) {
return Ok(false);
}

// Now we need to state sync it unless we were tracking the parent in the previous epoch,
// in which case we don't need to because we already have the state, and can just continue applying chunks
if epoch_id == &EpochId::default() {
return Ok(true);
}

// let (_layout, parent_shard_id, _index) =
// epoch_manager.get_prev_shard_id_from_prev_hash(epoch_last_block, shard_id)?;
// // Note that here passing `epoch_last_block_prev` to care_about_shard() will have us check whether we were tracking it in
// // the previous epoch, because it is the "parent_hash" of the last block of the previous epoch.
// // TODO: consider refactoring these ShardTracker functions to accept an epoch_id
// // to make this less tricky.
// let tracked_before = shard_tracker.care_about_shard(
// me.as_ref(),
// epoch_last_block_prev,
// parent_shard_id,
// true,
// );
// TODO(resharding) Uncomment or remove above, and accordingly `_epoch_manager` and `_epoch_last_block_prev`.
Ok(true)
let tracked_before =
shard_tracker.cared_about_shard_in_prev_epoch(me.as_ref(), prev_hash, shard_id, true);
Ok(!tracked_before)
}

/// Check if any block with missing chunk is ready to be processed and start processing these blocks
Expand Down Expand Up @@ -3759,10 +3730,17 @@ impl Chain {
self.shard_tracker.care_about_shard(me.as_ref(), prev_hash, shard_id, true);
let cares_about_shard_next_epoch =
self.shard_tracker.will_care_about_shard(me.as_ref(), prev_hash, shard_id, true);
let cared_about_shard_prev_epoch = self.shard_tracker.cared_about_shard_in_prev_epoch(
me.as_ref(),
prev_hash,
shard_id,
true,
);
let should_apply_chunk = get_should_apply_chunk(
mode,
cares_about_shard_this_epoch,
cares_about_shard_next_epoch,
cared_about_shard_prev_epoch,
);
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, epoch_id)?;
Ok(ShardContext { shard_uid, should_apply_chunk })
Expand Down Expand Up @@ -4127,26 +4105,28 @@ fn shard_id_out_of_bounds(shard_id: ShardId) -> Error {
/// ApplyChunksMode::NotCaughtUp once with ApplyChunksMode::CatchingUp. Note
/// that it does not guard whether the children shards are ready or not, see the
/// comments before `need_to_reshard`
// TODO(state-sync): After the changes in https://github.com/near/nearcore/pull/12617,
// this needs to be changed to be aware of what shards can be applied now. Otherwise we have
// a bug in the rare case where we have something like this sequence of tracked shards in consecutive epochs:
// (s0) -> (s1) -> (s0, s2)
// In this case we don't state sync s0 since we already have the state, but we apply chunks with mode `NotCaughtUp`
// in the middle epoch there because we're downloading state for s2.
fn get_should_apply_chunk(
mode: ApplyChunksMode,
cares_about_shard_this_epoch: bool,
cares_about_shard_next_epoch: bool,
cared_about_shard_prev_epoch: bool,
) -> bool {
match mode {
// next epoch's shard states are not ready, only update this epoch's shards
ApplyChunksMode::NotCaughtUp => cares_about_shard_this_epoch,
// next epoch's shard states are not ready, only update this epoch's shards plus shards we will care about in the future
// and already have state for
ApplyChunksMode::NotCaughtUp => {
cares_about_shard_this_epoch
|| (cares_about_shard_next_epoch && cared_about_shard_prev_epoch)
}
// update both this epoch and next epoch
ApplyChunksMode::IsCaughtUp => cares_about_shard_this_epoch || cares_about_shard_next_epoch,
// catching up next epoch's shard states, do not update this epoch's shard state
// since it has already been updated through ApplyChunksMode::NotCaughtUp
ApplyChunksMode::CatchingUp => {
!cares_about_shard_this_epoch && cares_about_shard_next_epoch
let syncing_shard = !cares_about_shard_this_epoch
&& cares_about_shard_next_epoch
&& !cared_about_shard_prev_epoch;
syncing_shard
}
}
}
Expand Down
26 changes: 25 additions & 1 deletion chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ impl EpochManagerAdapter for MockEpochManager {
&self,
prev_hash: &CryptoHash,
shard_id: ShardId,
) -> Result<(ShardLayout, ShardId, ShardIndex), Error> {
) -> Result<(ShardLayout, ShardId, ShardIndex), EpochError> {
let shard_layout = self.get_shard_layout_from_prev_block(prev_hash)?;
// This is not correct if there was a resharding event in between
// the previous and current block.
Expand Down Expand Up @@ -987,6 +987,30 @@ impl EpochManagerAdapter for MockEpochManager {
Ok(false)
}

fn cared_about_shard_prev_epoch_from_prev_block(
&self,
parent_hash: &CryptoHash,
account_id: &AccountId,
shard_id: ShardId,
) -> Result<bool, EpochError> {
// This `unwrap` here tests that in all code paths we check that the epoch exists before
// we check if we care about a shard. Please do not remove the unwrap, fix the logic of
// the calling function.
let epoch_valset = self.get_epoch_and_valset(*parent_hash).unwrap();
let shard_layout = self.get_shard_layout_from_prev_block(parent_hash)?;
let shard_index = shard_layout.get_shard_index(shard_id)?;
let chunk_producers = self.get_chunk_producers(
(epoch_valset.1.wrapping_sub(1)) % self.validators_by_valset.len(),
shard_index,
);
for validator in chunk_producers {
if validator.account_id() == account_id {
return Ok(true);
}
}
Ok(false)
}

fn will_shard_layout_change(&self, parent_hash: &CryptoHash) -> Result<bool, EpochError> {
// Copied from EpochManager (KeyValueRuntime is deprecated anyway).
let epoch_id = self.get_epoch_id_from_prev_block(parent_hash)?;
Expand Down
27 changes: 25 additions & 2 deletions chain/epoch-manager/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub trait EpochManagerAdapter: Send + Sync {
&self,
prev_hash: &CryptoHash,
shard_id: ShardId,
) -> Result<(ShardLayout, ShardId, ShardIndex), Error>;
) -> Result<(ShardLayout, ShardId, ShardIndex), EpochError>;

/// Get shard layout given hash of previous block.
fn get_shard_layout_from_prev_block(
Expand Down Expand Up @@ -389,6 +389,13 @@ pub trait EpochManagerAdapter: Send + Sync {
shard_id: ShardId,
) -> Result<bool, EpochError>;

fn cared_about_shard_prev_epoch_from_prev_block(
&self,
parent_hash: &CryptoHash,
account_id: &AccountId,
shard_id: ShardId,
) -> Result<bool, EpochError>;

fn will_shard_layout_change(&self, parent_hash: &CryptoHash) -> Result<bool, EpochError>;

/// Tries to estimate in which epoch the given height would reside.
Expand Down Expand Up @@ -639,7 +646,7 @@ impl EpochManagerAdapter for EpochManagerHandle {
&self,
prev_hash: &CryptoHash,
shard_id: ShardId,
) -> Result<(ShardLayout, ShardId, ShardIndex), Error> {
) -> Result<(ShardLayout, ShardId, ShardIndex), EpochError> {
let shard_layout = self.get_shard_layout_from_prev_block(prev_hash)?;
let prev_shard_layout = self.get_shard_layout(&self.get_epoch_id(prev_hash)?)?;
let is_resharding_boundary =
Expand Down Expand Up @@ -955,6 +962,22 @@ impl EpochManagerAdapter for EpochManagerHandle {
)
}

// `shard_id` always refers to a shard in the current epoch that the next block from `parent_hash` belongs
// If shard layout changed after the prev epoch, returns true if the account cared about the parent shard
fn cared_about_shard_prev_epoch_from_prev_block(
&self,
parent_hash: &CryptoHash,
account_id: &AccountId,
shard_id: ShardId,
) -> Result<bool, EpochError> {
let (_layout, parent_shard_id, _index) =
self.get_prev_shard_id_from_prev_hash(parent_hash, shard_id)?;
let prev_epoch_id = self.get_prev_epoch_id_from_prev_block(parent_hash)?;

let epoch_manager = self.read();
epoch_manager.cares_about_shard_in_epoch(&prev_epoch_id, account_id, parent_shard_id)
}

fn will_shard_layout_change(&self, parent_hash: &CryptoHash) -> Result<bool, EpochError> {
let epoch_manager = self.read();
epoch_manager.will_shard_layout_change(parent_hash)
Expand Down
55 changes: 55 additions & 0 deletions chain/epoch-manager/src/shard_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,61 @@ impl ShardTracker {
self.tracks_shard_at_epoch(shard_id, &epoch_id)
}

fn tracks_shard_prev_epoch_from_prev_block(
&self,
shard_id: ShardId,
prev_hash: &CryptoHash,
) -> Result<bool, EpochError> {
let epoch_id = self.epoch_manager.get_prev_epoch_id_from_prev_block(prev_hash)?;
self.tracks_shard_at_epoch(shard_id, &epoch_id)
}

/// Whether the client cares about some shard in the previous epoch.
/// * If `account_id` is None, `is_me` is not checked and the
/// result indicates whether the client is tracking the shard
/// * If `account_id` is not None, it is supposed to be a validator
/// account and `is_me` indicates whether we check what shards
/// the client tracks.
// TODO: consolidate all these care_about_shard() functions. This could all be one
// function with an enum arg that tells what epoch we want to check, and one that allows
// passing an epoch ID or a prev hash, or current hash, or whatever.
pub fn cared_about_shard_in_prev_epoch(
&self,
account_id: Option<&AccountId>,
parent_hash: &CryptoHash,
shard_id: ShardId,
is_me: bool,
) -> bool {
// TODO: fix these unwrap_or here and handle error correctly. The current behavior masks potential errors and bugs
// https://github.com/near/nearcore/issues/4936
if let Some(account_id) = account_id {
let account_cares_about_shard = self
.epoch_manager
.cared_about_shard_prev_epoch_from_prev_block(parent_hash, account_id, shard_id)
.unwrap_or(false);
if account_cares_about_shard {
// An account has to track this shard because of its validation duties.
return true;
}
if !is_me {
// We don't know how another node is configured.
// It may track all shards, it may track no additional shards.
return false;
} else {
// We have access to the node config. Use the config to find a definite answer.
}
}
match self.tracked_config {
TrackedConfig::AllShards => {
// Avoid looking up EpochId as a performance optimization.
true
}
_ => {
self.tracks_shard_prev_epoch_from_prev_block(shard_id, parent_hash).unwrap_or(false)
}
}
}

/// Whether the client cares about some shard right now.
/// * If `account_id` is None, `is_me` is not checked and the
/// result indicates whether the client is tracking the shard
Expand Down
Loading

0 comments on commit 618ef69

Please sign in to comment.