From 8cae8ccdb95c1b26577cd7a8865acdfffe5666aa Mon Sep 17 00:00:00 2001 From: Marcelo Diop-Gonzalez Date: Sat, 14 Dec 2024 19:40:35 -0500 Subject: [PATCH] fix(resharding): wait until child flat storages are split to take snapshots (#12589) `test_resharding_v3_shard_shuffling_slower_post_processing_tasks` exposes a bug that can be triggered if child flat storages are not split after a resharding by the time we want to take a state snapshot. Then the state snapshot code will fail because the flat storage is not ready, but will not retry. To fix it, we add a `want_snapshot` field that will be set when we decide to take a state snapshot. We also add a `split_in_progress` field to the `FlatStorageManager` that will be set to `true` when a resharding is started, and back to false when it's finished and the catchup code has progressed to a height close to the desired snapshot height. The state snapshot code will wait until `split_in_progress` is false to proceed, and the flat storage catchup code will wait until `want_snapshot` is cleared if it has already advanced to the desired snapshot hash, so that we don't advance past the point that was wanted by the state snapshot. The first one is the one actually causing the test failure, but the second one is also required. We implement this waiting by rescheduling the message sends in the future. A Condvar would be a very natural choice, but it unfortunately doesn't seem to work in testloop, since actors that are normally running on different threads are put on the same thread, and a blocker on a Condvar won't be woken up. Here we are making a change to the behavior of the old `set_flat_state_updates_mode()`, which used to refuse to proceed if the update mode was already set to the same value. This seems to be an artifact of the fact that when state snapshots were implemented in https://github.com/near/nearcore/pull/9090, this extra logic was added because there was another user of this function (`inline_flat_state_values()` added in https://github.com/near/nearcore/pull/9037), but that function has since been deleted, so the state snapshot code is now the only user of `set_flat_state_updates_mode()`. --- Cargo.lock | 1 + chain/chain/Cargo.toml | 1 + chain/chain/src/chain.rs | 27 ++- chain/chain/src/flat_storage_resharder.rs | 172 +++++++++--------- .../chain/src/resharding/resharding_actor.rs | 64 ++++--- chain/chain/src/resharding/types.rs | 2 - chain/chain/src/state_snapshot_actor.rs | 132 +++++++++++--- .../client/src/test_utils/test_env_builder.rs | 2 +- core/store/src/adapter/chunk_store.rs | 4 +- core/store/src/adapter/flat_store.rs | 4 +- core/store/src/adapter/mod.rs | 6 +- core/store/src/adapter/trie_store.rs | 4 +- core/store/src/flat/chunk_view.rs | 2 + core/store/src/flat/manager.rs | 172 ++++++++++++++---- core/store/src/flat/storage.rs | 9 +- core/store/src/lib.rs | 4 +- core/store/src/trie/state_snapshot.rs | 2 +- .../src/test_loop/tests/resharding_v3.rs | 16 +- 18 files changed, 430 insertions(+), 194 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cc140563ee6..ebc33b1821a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4164,6 +4164,7 @@ name = "near-chain" version = "0.0.0" dependencies = [ "actix", + "anyhow", "assert_matches", "borsh", "bytesize", diff --git a/chain/chain/Cargo.toml b/chain/chain/Cargo.toml index 5c956a6a27a..1b544c5f313 100644 --- a/chain/chain/Cargo.toml +++ b/chain/chain/Cargo.toml @@ -10,6 +10,7 @@ workspace = true [dependencies] actix.workspace = true +anyhow.workspace = true borsh.workspace = true bytesize.workspace = true chrono.workspace = true diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index d31d848cde4..b8029caa135 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -3838,6 +3838,24 @@ impl Chain { ))) } + fn min_chunk_prev_height(&self, block: &Block) -> Result { + let mut ret = None; + for chunk in block.chunks().iter_raw() { + let prev_height = if chunk.prev_block_hash() == &CryptoHash::default() { + 0 + } else { + let prev_header = self.get_block_header(chunk.prev_block_hash())?; + prev_header.height() + }; + if let Some(min_height) = ret { + ret = Some(std::cmp::min(min_height, prev_height)); + } else { + ret = Some(prev_height); + } + } + Ok(ret.unwrap_or(0)) + } + /// Function to create or delete a snapshot if necessary. /// TODO: this function calls head() inside of start_process_block_impl(), consider moving this to be called right after HEAD gets updated fn process_snapshot(&mut self) -> Result<(), Error> { @@ -3847,6 +3865,7 @@ impl Chain { SnapshotAction::MakeSnapshot(prev_hash) => { let prev_block = self.get_block(&prev_hash)?; let prev_prev_hash = prev_block.header().prev_hash(); + let min_chunk_prev_height = self.min_chunk_prev_height(&prev_block)?; let epoch_height = self.epoch_manager.get_epoch_height_from_prev_block(prev_prev_hash)?; let shard_layout = @@ -3854,7 +3873,13 @@ impl Chain { let shard_uids = shard_layout.shard_uids().enumerate().collect(); let make_snapshot_callback = &snapshot_callbacks.make_snapshot_callback; - make_snapshot_callback(*prev_prev_hash, epoch_height, shard_uids, prev_block); + make_snapshot_callback( + *prev_prev_hash, + min_chunk_prev_height, + epoch_height, + shard_uids, + prev_block, + ); } SnapshotAction::DeleteSnapshot => { let delete_snapshot_callback = &snapshot_callbacks.delete_snapshot_callback; diff --git a/chain/chain/src/flat_storage_resharder.rs b/chain/chain/src/flat_storage_resharder.rs index 272c81199d9..83bd481f3fa 100644 --- a/chain/chain/src/flat_storage_resharder.rs +++ b/chain/chain/src/flat_storage_resharder.rs @@ -27,9 +27,9 @@ use near_primitives::trie_key::trie_key_parsers::{ parse_account_id_from_contract_code_key, parse_account_id_from_contract_data_key, parse_account_id_from_received_data_key, parse_account_id_from_trie_key_with_separator, }; -use near_primitives::types::AccountId; #[cfg(feature = "test_features")] use near_primitives::types::BlockHeightDelta; +use near_primitives::types::{AccountId, BlockHeight}; use near_store::adapter::flat_store::{FlatStoreAdapter, FlatStoreUpdateAdapter}; use near_store::adapter::StoreAdapter; use near_store::flat::{ @@ -62,8 +62,7 @@ use std::iter; /// [FlatStorageResharderController]. /// - In the case of event `Split` the state of flat storage will go back to what it was /// previously. -/// - Children shard catchup is a consequence of splitting a shard, not a resharding event on -/// its own. As such, it can't be manually cancelled. +/// - Children shard catchup can be cancelled and will resume from the point where it left. /// - Resilience to chain forks. /// - Resharding events will perform changes on the state only after their resharding block /// becomes final. @@ -155,16 +154,12 @@ impl FlatStorageResharder { self.clean_children_shards(&status)?; self.schedule_split_shard(parent_shard_uid, &status); } - FlatStorageReshardingStatus::CatchingUp(block_hash) => { + FlatStorageReshardingStatus::CatchingUp(_) => { info!(target: "resharding", ?shard_uid, ?status, "resuming flat storage shard catchup"); // Send a request to schedule the execution of `shard_catchup_task` for this shard. - self.sender.flat_storage_shard_catchup_sender.send( - FlatStorageShardCatchupRequest { - resharder: self.clone(), - shard_uid, - flat_head_block_hash: *block_hash, - }, - ); + self.sender + .flat_storage_shard_catchup_sender + .send(FlatStorageShardCatchupRequest { resharder: self.clone(), shard_uid }); } } Ok(()) @@ -316,10 +311,7 @@ impl FlatStorageResharder { /// /// Conceptually it simply copies each key-value pair from the parent shard to the correct /// child. This task may get cancelled or postponed. - pub fn split_shard_task( - &self, - chain_store: &ChainStore, - ) -> FlatStorageReshardingSchedulableTaskResult { + pub fn split_shard_task(&self, chain_store: &ChainStore) -> FlatStorageReshardingTaskResult { info!(target: "resharding", "flat storage shard split task execution"); // Make sure that the resharding block is final. @@ -336,11 +328,11 @@ impl FlatStorageResharder { self.cancel_scheduled_event(); error!(target: "resharding", "flat storage shard split task failed during scheduling!"); // TODO(resharding): return failed only if scheduling of all resharding blocks have failed. - return FlatStorageReshardingSchedulableTaskResult::Failed; + return FlatStorageReshardingTaskResult::Failed; } FlatStorageReshardingTaskSchedulingStatus::Postponed => { info!(target: "resharding", "flat storage shard split task has been postponed"); - return FlatStorageReshardingSchedulableTaskResult::Postponed; + return FlatStorageReshardingTaskResult::Postponed; } }; @@ -348,7 +340,7 @@ impl FlatStorageResharder { { if self.adv_should_delay_task(&resharding_hash, chain_store) { info!(target: "resharding", "flat storage shard split task has been artificially postponed!"); - return FlatStorageReshardingSchedulableTaskResult::Postponed; + return FlatStorageReshardingTaskResult::Postponed; } } @@ -376,12 +368,12 @@ impl FlatStorageResharder { parent_shard: ShardUId, split_params: &ParentSplitParameters, metrics: &FlatStorageReshardingShardSplitMetrics, - ) -> FlatStorageReshardingSchedulableTaskResult { + ) -> FlatStorageReshardingTaskResult { self.set_resharding_event_execution_status(TaskExecutionStatus::Started); // Exit early if the task has already been cancelled. if self.controller.is_cancelled() { - return FlatStorageReshardingSchedulableTaskResult::Cancelled; + return FlatStorageReshardingTaskResult::Cancelled; } // Determines after how many bytes worth of key-values the process stops to commit changes @@ -403,7 +395,7 @@ impl FlatStorageResharder { Ok(iter) => iter, Err(err) => { error!(target: "resharding", ?parent_shard, block_hash=?split_params.resharding_hash, ?err, "failed to build flat storage iterator"); - return FlatStorageReshardingSchedulableTaskResult::Failed; + return FlatStorageReshardingTaskResult::Failed; } }; @@ -434,12 +426,12 @@ impl FlatStorageResharder { &split_params, ) { error!(target: "resharding", ?err, "failed to handle flat storage key"); - return FlatStorageReshardingSchedulableTaskResult::Failed; + return FlatStorageReshardingTaskResult::Failed; } } Some(FlatStorageAndDeltaIterItem::Entry(Err(err))) => { error!(target: "resharding", ?err, "failed to read flat storage value from parent shard"); - return FlatStorageReshardingSchedulableTaskResult::Failed; + return FlatStorageReshardingTaskResult::Failed; } None => { iter_exhausted = true; @@ -450,7 +442,7 @@ impl FlatStorageResharder { // Make a pause to commit and check if the routine should stop. if let Err(err) = store_update.commit() { error!(target: "resharding", ?err, "failed to commit store update"); - return FlatStorageReshardingSchedulableTaskResult::Failed; + return FlatStorageReshardingTaskResult::Failed; } num_batches_done += 1; @@ -459,10 +451,10 @@ impl FlatStorageResharder { // If `iter`` is exhausted we can exit after the store commit. if iter_exhausted { - return FlatStorageReshardingSchedulableTaskResult::Successful { num_batches_done }; + return FlatStorageReshardingTaskResult::Successful { num_batches_done }; } if self.controller.is_cancelled() { - return FlatStorageReshardingSchedulableTaskResult::Cancelled; + return FlatStorageReshardingTaskResult::Cancelled; } // Sleep between batches in order to throttle resharding and leave some resource for the @@ -484,7 +476,7 @@ impl FlatStorageResharder { parent_shard: ShardUId, split_params: ParentSplitParameters, metrics: &FlatStorageReshardingShardSplitMetrics, - task_status: FlatStorageReshardingSchedulableTaskResult, + task_status: FlatStorageReshardingTaskResult, ) { let ParentSplitParameters { left_child_shard, @@ -498,7 +490,7 @@ impl FlatStorageResharder { let mut store_update = flat_store.store_update(); match task_status { - FlatStorageReshardingSchedulableTaskResult::Successful { .. } => { + FlatStorageReshardingTaskResult::Successful { .. } => { // Split shard completed successfully. // Parent flat storage can be deleted from the FlatStoreManager. // If FlatStoreManager has no reference to the shard, delete it manually. @@ -524,13 +516,12 @@ impl FlatStorageResharder { FlatStorageShardCatchupRequest { resharder: self.clone(), shard_uid: child_shard, - flat_head_block_hash: resharding_hash, }, ); } } - FlatStorageReshardingSchedulableTaskResult::Failed - | FlatStorageReshardingSchedulableTaskResult::Cancelled => { + FlatStorageReshardingTaskResult::Failed + | FlatStorageReshardingTaskResult::Cancelled => { // We got an error or a cancellation request. // Reset parent. store_update.set_flat_storage_status( @@ -542,7 +533,7 @@ impl FlatStorageResharder { store_update.remove_flat_storage(child_shard); } } - FlatStorageReshardingSchedulableTaskResult::Postponed => { + FlatStorageReshardingTaskResult::Postponed => { panic!("can't finalize processing of a postponed split task!"); } } @@ -612,18 +603,23 @@ impl FlatStorageResharder { pub fn shard_catchup_task( &self, shard_uid: ShardUId, - flat_head_block_hash: CryptoHash, chain_store: &ChainStore, ) -> FlatStorageReshardingTaskResult { - info!(target: "resharding", ?shard_uid, ?flat_head_block_hash, "flat storage shard catchup task started"); + // Exit early if the task has already been cancelled. + if self.controller.is_cancelled() { + return FlatStorageReshardingTaskResult::Cancelled; + } + info!(target: "resharding", ?shard_uid, "flat storage shard catchup task started"); let metrics = FlatStorageReshardingShardCatchUpMetrics::new(&shard_uid); // Apply deltas and then create the flat storage. - let apply_result = - self.shard_catchup_apply_deltas(shard_uid, flat_head_block_hash, chain_store, &metrics); - let Ok((num_batches_done, flat_head)) = apply_result else { + let apply_result = self.shard_catchup_apply_deltas(shard_uid, chain_store, &metrics); + let Ok(res) = apply_result else { error!(target: "resharding", ?shard_uid, err = ?apply_result.unwrap_err(), "flat storage shard catchup delta application failed!"); return FlatStorageReshardingTaskResult::Failed; }; + let Some((num_batches_done, flat_head)) = res else { + return FlatStorageReshardingTaskResult::Postponed; + }; match self.shard_catchup_finalize_storage(shard_uid, &flat_head, &metrics) { Ok(_) => { let task_status = FlatStorageReshardingTaskResult::Successful { num_batches_done }; @@ -639,16 +635,26 @@ impl FlatStorageResharder { } } + /// checks whether there's a snapshot in progress. Returns true if we've already applied all deltas up + /// to the desired snapshot height, and should no longer continue to give the state snapshot + /// code a chance to finish first. + fn coordinate_snapshot(&self, height: BlockHeight) -> bool { + let manager = self.runtime.get_flat_storage_manager(); + let Some(min_chunk_prev_height) = manager.snapshot_wanted() else { + return false; + }; + height >= min_chunk_prev_height + } + /// Applies flat storage deltas in batches on a shard that is in catchup status. /// /// Returns the number of delta batches applied and the final tip of the flat storage. fn shard_catchup_apply_deltas( &self, shard_uid: ShardUId, - mut flat_head_block_hash: CryptoHash, chain_store: &ChainStore, metrics: &FlatStorageReshardingShardCatchUpMetrics, - ) -> Result<(usize, Tip), Error> { + ) -> Result, Error> { // How many block heights of deltas are applied in a single commit. let catch_up_blocks = self.resharding_config.get().catch_up_blocks; // Delay between every batch. @@ -658,7 +664,24 @@ impl FlatStorageResharder { let mut num_batches_done: usize = 0; + let status = self + .runtime + .store() + .flat_store() + .get_flat_storage_status(shard_uid) + .map_err(|e| Into::::into(e))?; + let FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp( + mut flat_head_block_hash, + )) = status + else { + return Err(Error::Other(format!( + "unexpected resharding catchup flat storage status for {}: {:?}", + shard_uid, &status + ))); + }; + loop { + // TODO:(resharding): check self.controller.is_cancelled() here as well. let _span = tracing::debug_span!( target: "resharding", "shard_catchup_apply_deltas/batch", @@ -670,15 +693,16 @@ impl FlatStorageResharder { // If we reached the desired new flat head, we can terminate the delta application step. if is_flat_head_on_par_with_chain(&flat_head_block_hash, &chain_final_head) { - return Ok(( + return Ok(Some(( num_batches_done, Tip::from_header(&chain_store.get_block_header(&flat_head_block_hash)?), - )); + ))); } let mut merged_changes = FlatStateChanges::default(); let store = self.runtime.store().flat_store(); let mut store_update = store.store_update(); + let mut postpone = false; // Merge deltas from the next blocks until we reach chain final head. for _ in 0..catch_up_blocks { @@ -691,6 +715,10 @@ impl FlatStorageResharder { if is_flat_head_on_par_with_chain(&flat_head_block_hash, &chain_final_head) { break; } + if self.coordinate_snapshot(height) { + postpone = true; + break; + } flat_head_block_hash = chain_store.get_next_block_hash(&flat_head_block_hash)?; if let Some(changes) = store .get_delta(shard_uid, flat_head_block_hash) @@ -715,6 +743,9 @@ impl FlatStorageResharder { num_batches_done += 1; metrics.set_head_height(chain_store.get_block_height(&flat_head_block_hash)?); + if postpone { + return Ok(None); + } // Sleep between batches in order to throttle resharding and leave some resource for the // regular node operation. std::thread::sleep(batch_delay); @@ -1081,19 +1112,11 @@ pub enum TaskExecutionStatus { NotStarted, } -/// Result of a simple flat storage resharding task. +/// Result of a schedulable flat storage resharding task. #[derive(Clone, Debug, Copy, Eq, PartialEq)] pub enum FlatStorageReshardingTaskResult { Successful { num_batches_done: usize }, Failed, -} - -/// Result of a schedulable flat storage resharding task. Extends [FlatStorageReshardingTaskResult] -/// with the option to cancel or postpone the task. -#[derive(Clone, Debug, Copy, Eq, PartialEq)] -pub enum FlatStorageReshardingSchedulableTaskResult { - Successful { num_batches_done: usize }, - Failed, Cancelled, Postponed, } @@ -1207,11 +1230,7 @@ mod tests { impl CanSend for SimpleSender { fn send(&self, msg: FlatStorageShardCatchupRequest) { - msg.resharder.shard_catchup_task( - msg.shard_uid, - msg.flat_head_block_hash, - &self.chain_store.lock().unwrap(), - ); + msg.resharder.shard_catchup_task(msg.shard_uid, &self.chain_store.lock().unwrap()); } } @@ -1240,7 +1259,7 @@ mod tests { } impl DelayedSender { - fn call_split_shard_task(&self) -> FlatStorageReshardingSchedulableTaskResult { + fn call_split_shard_task(&self) -> FlatStorageReshardingTaskResult { let request = self.split_shard_request.lock().unwrap(); request.as_ref().unwrap().resharder.split_shard_task(&self.chain_store.lock().unwrap()) } @@ -1251,11 +1270,9 @@ mod tests { .unwrap() .iter() .map(|request| { - request.resharder.shard_catchup_task( - request.shard_uid, - request.flat_head_block_hash, - &self.chain_store.lock().unwrap(), - ) + request + .resharder + .shard_catchup_task(request.shard_uid, &self.chain_store.lock().unwrap()) }) .collect() } @@ -1645,7 +1662,7 @@ mod tests { assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); // Check that more than one batch has been processed. - let FlatStorageReshardingSchedulableTaskResult::Successful { num_batches_done } = + let FlatStorageReshardingTaskResult::Successful { num_batches_done } = sender.call_split_shard_task() else { assert!(false); @@ -1868,7 +1885,7 @@ mod tests { assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); assert_eq!( sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Successful { num_batches_done: 3 } + FlatStorageReshardingTaskResult::Successful { num_batches_done: 3 } ); // Validate integrity of children shards. @@ -2412,10 +2429,7 @@ mod tests { ReshardingEventType::SplitShard(params) => params, }; assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); - assert_eq!( - sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Postponed - ); + assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskResult::Postponed); assert_gt!(flat_store.iter(parent_shard).count(), 0); // Move the chain final head to the resharding block height (2). @@ -2429,7 +2443,7 @@ mod tests { // Trigger resharding again and now it should split the parent shard. assert_eq!( sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Successful { num_batches_done: 3 } + FlatStorageReshardingTaskResult::Successful { num_batches_done: 3 } ); assert_eq!(flat_store.iter(parent_shard).count(), 0); } @@ -2461,10 +2475,7 @@ mod tests { ReshardingEventType::SplitShard(params) => params, }; assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); - assert_eq!( - sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Postponed - ); + assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskResult::Postponed); assert_gt!(flat_store.iter(parent_shard).count(), 0); // Add two blocks on top of the first block (simulate a fork). @@ -2481,10 +2492,7 @@ mod tests { ReshardingEventType::SplitShard(params) => params, }; assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); - assert_eq!( - sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Postponed - ); + assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskResult::Postponed); assert_gt!(flat_store.iter(parent_shard).count(), 0); // Add two additional blocks on the fork to make the resharding block (height 1) final. @@ -2498,7 +2506,7 @@ mod tests { // Now the second resharding event should take place. assert_matches!( sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Successful { .. } + FlatStorageReshardingTaskResult::Successful { .. } ); assert_eq!(flat_store.iter(parent_shard).count(), 0); @@ -2526,10 +2534,7 @@ mod tests { assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); let (parent_shard, split_params) = resharder.get_parent_shard_and_split_params().unwrap(); let ParentSplitParameters { flat_head, .. } = split_params; - assert_eq!( - sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Postponed - ); + assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskResult::Postponed); // Fork the chain before the resharding block and make it final, but don't update the // resharding block hash. @@ -2541,10 +2546,7 @@ mod tests { ); // Scheduling of the shard split should fail. - assert_eq!( - sender.call_split_shard_task(), - FlatStorageReshardingSchedulableTaskResult::Failed - ); + assert_eq!(sender.call_split_shard_task(), FlatStorageReshardingTaskResult::Failed); assert!(resharder.resharding_event().is_none()); let flat_store = resharder.runtime.store().flat_store(); assert_eq!( diff --git a/chain/chain/src/resharding/resharding_actor.rs b/chain/chain/src/resharding/resharding_actor.rs index bc4a371dfe7..e92672b76d0 100644 --- a/chain/chain/src/resharding/resharding_actor.rs +++ b/chain/chain/src/resharding/resharding_actor.rs @@ -1,13 +1,11 @@ use super::types::{ FlatStorageShardCatchupRequest, FlatStorageSplitShardRequest, MemtrieReloadRequest, }; -use crate::flat_storage_resharder::{ - FlatStorageResharder, FlatStorageReshardingSchedulableTaskResult, - FlatStorageReshardingTaskResult, -}; +use crate::flat_storage_resharder::{FlatStorageResharder, FlatStorageReshardingTaskResult}; use crate::ChainStore; use near_async::futures::{DelayedActionRunner, DelayedActionRunnerExt}; use near_async::messaging::{self, Handler, HandlerWithContext}; +use near_primitives::shard_layout::ShardUId; use near_primitives::types::BlockHeight; use near_store::Store; use time::Duration; @@ -29,20 +27,13 @@ impl HandlerWithContext for ReshardingActor { } } -impl Handler for ReshardingActor { - fn handle(&mut self, msg: FlatStorageShardCatchupRequest) { - match msg.resharder.shard_catchup_task( - msg.shard_uid, - msg.flat_head_block_hash, - &self.chain_store, - ) { - FlatStorageReshardingTaskResult::Successful { .. } => { - // All good. - } - FlatStorageReshardingTaskResult::Failed => { - panic!("impossible to recover from a flat storage shard catchup failure!") - } - } +impl HandlerWithContext for ReshardingActor { + fn handle( + &mut self, + msg: FlatStorageShardCatchupRequest, + ctx: &mut dyn DelayedActionRunner, + ) { + self.handle_flat_storage_catchup(msg.resharder, msg.shard_uid, ctx); } } @@ -66,16 +57,16 @@ impl ReshardingActor { // becomes final. If the resharding block is not yet final, the task will exit early with // `Postponed` status and it must be rescheduled. match resharder.split_shard_task(&self.chain_store) { - FlatStorageReshardingSchedulableTaskResult::Successful { .. } => { + FlatStorageReshardingTaskResult::Successful { .. } => { // All good. } - FlatStorageReshardingSchedulableTaskResult::Failed => { + FlatStorageReshardingTaskResult::Failed => { panic!("impossible to recover from a flat storage split shard failure!") } - FlatStorageReshardingSchedulableTaskResult::Cancelled => { + FlatStorageReshardingTaskResult::Cancelled => { // The task has been cancelled. Nothing else to do. } - FlatStorageReshardingSchedulableTaskResult::Postponed => { + FlatStorageReshardingTaskResult::Postponed => { // The task must be retried later. ctx.run_later( "ReshardingActor FlatStorageSplitShard", @@ -87,4 +78,33 @@ impl ReshardingActor { } } } + + fn handle_flat_storage_catchup( + &self, + resharder: FlatStorageResharder, + shard_uid: ShardUId, + ctx: &mut dyn DelayedActionRunner, + ) { + match resharder.shard_catchup_task(shard_uid, &self.chain_store) { + FlatStorageReshardingTaskResult::Successful { .. } => { + // All good. + } + FlatStorageReshardingTaskResult::Failed => { + panic!("impossible to recover from a flat storage shard catchup failure!") + } + FlatStorageReshardingTaskResult::Cancelled => { + // The task has been cancelled. Nothing else to do. + } + FlatStorageReshardingTaskResult::Postponed => { + // The task must be retried later. + ctx.run_later( + "ReshardingActor FlatStorageCatchup", + Duration::milliseconds(1000), + move |act, ctx| { + act.handle_flat_storage_catchup(resharder, shard_uid, ctx); + }, + ); + } + } + } } diff --git a/chain/chain/src/resharding/types.rs b/chain/chain/src/resharding/types.rs index b042f3bab56..a6fae87d991 100644 --- a/chain/chain/src/resharding/types.rs +++ b/chain/chain/src/resharding/types.rs @@ -1,6 +1,5 @@ use crate::flat_storage_resharder::FlatStorageResharder; use near_async::messaging::Sender; -use near_primitives::hash::CryptoHash; use near_store::ShardUId; /// Represents a request to start the split of a parent shard flat storage into two children flat @@ -17,7 +16,6 @@ pub struct FlatStorageSplitShardRequest { pub struct FlatStorageShardCatchupRequest { pub resharder: FlatStorageResharder, pub shard_uid: ShardUId, - pub flat_head_block_hash: CryptoHash, } /// Represents a request to reload a Mem Trie for a shard after its Flat Storage resharding is diff --git a/chain/chain/src/state_snapshot_actor.rs b/chain/chain/src/state_snapshot_actor.rs index 62cb894c53c..02db1c085cd 100644 --- a/chain/chain/src/state_snapshot_actor.rs +++ b/chain/chain/src/state_snapshot_actor.rs @@ -1,11 +1,13 @@ -use near_async::messaging::{Actor, CanSend, Handler, Sender}; +use near_async::futures::{DelayedActionRunner, DelayedActionRunnerExt}; +use near_async::messaging::{Actor, CanSend, Handler, HandlerWithContext, Sender}; +use near_async::time::Duration; use near_async::{MultiSend, MultiSenderFrom}; use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest}; use near_performance_metrics_macros::perf; use near_primitives::block::Block; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; -use near_primitives::types::{EpochHeight, ShardIndex}; +use near_primitives::types::{BlockHeight, EpochHeight, ShardIndex}; use near_store::flat::FlatStorageManager; use near_store::ShardTries; use std::sync::Arc; @@ -44,13 +46,15 @@ pub struct DeleteAndMaybeCreateSnapshotRequest { #[derive(actix::Message)] #[rtype(result = "()")] pub struct CreateSnapshotRequest { - /// prev_hash of the last processed block. + /// equal to self.block.header().prev_hash() prev_block_hash: CryptoHash, + /// Min height of chunk.prev_block_hash() for each chunk in `block` + min_chunk_prev_height: BlockHeight, /// epoch height associated with prev_block_hash epoch_height: EpochHeight, /// Shards that need to be present in the snapshot. shard_indexes_and_uids: Vec<(ShardIndex, ShardUId)>, - /// Last block of the prev epoch. + /// prev block of the "sync_hash" block. block: Block, } @@ -59,6 +63,7 @@ impl std::fmt::Debug for CreateSnapshotRequest { f.debug_struct("CreateSnapshotRequest") .field("block_hash", self.block.hash()) .field("prev_block_hash", &self.prev_block_hash) + .field("min_chunk_prev_height", &self.min_chunk_prev_height) .field("epoch_height", &self.epoch_height) .field( "shard_uids", @@ -85,19 +90,78 @@ impl StateSnapshotActor { } } - pub fn handle_create_snapshot_request(&mut self, msg: CreateSnapshotRequest) { - tracing::debug!(target: "state_snapshot", ?msg); + /// Returns true if we shouldn't yet try to create a snapshot because a flat storage resharding + /// is in progress. + fn should_wait_for_resharding_split( + &self, + min_chunk_prev_height: BlockHeight, + shard_indexes_and_uids: &[(ShardIndex, ShardUId)], + ) -> anyhow::Result { + let shard_uids = shard_indexes_and_uids.iter().map(|(_idx, uid)| *uid); + let Some(min_height) = + self.flat_storage_manager.resharding_catchup_height_reached(shard_uids)? + else { + // No flat storage split + catchup is in progress, ok to proceed + return Ok(false); + }; + let Some(min_height) = min_height else { + // storage split + catchup is in progress and not all shards have reached the catchup phase yet. Can't proceed + return Ok(true); + }; + // Proceed if the catchup code is already reasonably close to being finished. This is not a correctness issue, + // as this line of code could just be replaced with Ok(false), and things would work. But in that case, if there are for + // some reason lots of deltas to apply (e.g. the sync hash is 1000s of blocks past the start of the epoch because of missed + // chunks), then we'll duplicate a lot of work that's being done by the resharding catchup code. So we might as well just + // come back later after most of that work has already been done. + Ok(min_height + 10 < min_chunk_prev_height) + } + + pub fn handle_create_snapshot_request( + &mut self, + msg: CreateSnapshotRequest, + ctx: &mut dyn DelayedActionRunner, + ) { + let should_wait = match self.should_wait_for_resharding_split( + msg.min_chunk_prev_height, + &msg.shard_indexes_and_uids, + ) { + Ok(s) => s, + Err(err) => { + tracing::error!(target: "state_snapshot", ?err, "State Snapshot Actor failed to check resharding status. Not making snapshot"); + return; + } + }; + // TODO: instead of resending the same message over and over, wait on a Condvar. + // This would require making testloop work with Condvars that normally are meant to be woken up by another thread + if should_wait { + tracing::debug!(target: "state_snapshot", prev_block_hash=?&msg.prev_block_hash, "Postpone CreateSnapshotRequest"); + ctx.run_later( + "ReshardingActor FlatStorageSplitShard", + Duration::seconds(1), + move |act, ctx| { + act.handle_create_snapshot_request(msg, ctx); + }, + ); + return; + } - let CreateSnapshotRequest { prev_block_hash, epoch_height, shard_indexes_and_uids, block } = - msg; + tracing::debug!(target: "state_snapshot", prev_block_hash=?&msg.prev_block_hash, "Handle CreateSnapshotRequest"); + let CreateSnapshotRequest { + prev_block_hash, + epoch_height, + shard_indexes_and_uids, + block, + .. + } = msg; let res = self.tries.create_state_snapshot(prev_block_hash, &shard_indexes_and_uids, &block); // Unlocking flat state head can be done asynchronously in state_snapshot_actor. // The next flat storage update will bring flat storage to latest head. - if !self.flat_storage_manager.set_flat_state_updates_mode(true) { - tracing::error!(target: "state_snapshot", ?prev_block_hash, ?shard_indexes_and_uids, "Failed to unlock flat state updates"); - } + // TODO(resharding): check what happens if two calls to want_snapshot() are made before this point, + // which can happen with short epochs if a state snapshot takes longer than the rest of the epoch to complete. + // TODO(resharding): this can actually be called sooner, just after the rocksdb checkpoint is made. + self.flat_storage_manager.snapshot_taken(); match res { Ok(res_shard_uids) => { let Some(res_shard_uids) = res_shard_uids else { @@ -126,10 +190,10 @@ impl Handler for StateSnapshotActor { } } -impl Handler for StateSnapshotActor { +impl HandlerWithContext for StateSnapshotActor { #[perf] - fn handle(&mut self, msg: CreateSnapshotRequest) { - self.handle_create_snapshot_request(msg) + fn handle(&mut self, msg: CreateSnapshotRequest, ctx: &mut dyn DelayedActionRunner) { + self.handle_create_snapshot_request(msg, ctx) } } @@ -142,7 +206,7 @@ pub struct StateSnapshotSenderForStateSnapshot { pub struct StateSnapshotSenderForClient(Sender); type MakeSnapshotCallback = Arc< - dyn Fn(CryptoHash, EpochHeight, Vec<(ShardIndex, ShardUId)>, Block) -> () + dyn Fn(CryptoHash, BlockHeight, EpochHeight, Vec<(ShardIndex, ShardUId)>, Block) -> () + Send + Sync + 'static, @@ -156,28 +220,38 @@ pub struct SnapshotCallbacks { } /// Sends a request to make a state snapshot. +// TODO: remove the `prev_block_hash` argument. It's just block.header().prev_hash() pub fn get_make_snapshot_callback( sender: StateSnapshotSenderForClient, flat_storage_manager: FlatStorageManager, ) -> MakeSnapshotCallback { - Arc::new(move |prev_block_hash, epoch_height, shard_indexes_and_uids, block| { - tracing::info!( + Arc::new( + move |prev_block_hash, + min_chunk_prev_height, + epoch_height, + shard_indexes_and_uids, + block| { + tracing::info!( target: "state_snapshot", ?prev_block_hash, ?shard_indexes_and_uids, "make_snapshot_callback sends `DeleteAndMaybeCreateSnapshotRequest` to state_snapshot_addr"); - // We need to stop flat head updates synchronously in the client thread. - // Async update in state_snapshot_actor and potentially lead to flat head progressing beyond prev_block_hash - if !flat_storage_manager.set_flat_state_updates_mode(false) { - tracing::error!(target: "state_snapshot", ?prev_block_hash, ?shard_indexes_and_uids, "Failed to lock flat state updates"); - return; - } - let create_snapshot_request = - CreateSnapshotRequest { prev_block_hash, epoch_height, shard_indexes_and_uids, block }; - sender.send(DeleteAndMaybeCreateSnapshotRequest { - create_snapshot_request: Some(create_snapshot_request), - }); - }) + // We need to stop flat head updates synchronously in the client thread. + // Async update in state_snapshot_actor can potentially lead to flat head progressing beyond prev_block_hash + // This also prevents post-resharding flat storage catchup from advancing past `prev_block_hash` + flat_storage_manager.want_snapshot(min_chunk_prev_height); + let create_snapshot_request = CreateSnapshotRequest { + prev_block_hash, + min_chunk_prev_height, + epoch_height, + shard_indexes_and_uids, + block, + }; + sender.send(DeleteAndMaybeCreateSnapshotRequest { + create_snapshot_request: Some(create_snapshot_request), + }); + }, + ) } /// Sends a request to delete a state snapshot. diff --git a/chain/client/src/test_utils/test_env_builder.rs b/chain/client/src/test_utils/test_env_builder.rs index cbf0adc50c6..351b4c4a15c 100644 --- a/chain/client/src/test_utils/test_env_builder.rs +++ b/chain/client/src/test_utils/test_env_builder.rs @@ -588,7 +588,7 @@ impl TestEnvBuilder { None => TEST_SEED, }; let tries = runtime.get_tries(); - let make_snapshot_callback = Arc::new(move |prev_block_hash, _epoch_height, shard_uids: Vec<(ShardIndex, ShardUId)>, block| { + let make_snapshot_callback = Arc::new(move |prev_block_hash, _min_chunk_prev_height, _epoch_height, shard_uids: Vec<(ShardIndex, ShardUId)>, block| { tracing::info!(target: "state_snapshot", ?prev_block_hash, "make_snapshot_callback"); tries.delete_state_snapshot(); tries.create_state_snapshot(prev_block_hash, &shard_uids, &block).unwrap(); diff --git a/core/store/src/adapter/chunk_store.rs b/core/store/src/adapter/chunk_store.rs index e3b8a3cf294..f1d84bd11cc 100644 --- a/core/store/src/adapter/chunk_store.rs +++ b/core/store/src/adapter/chunk_store.rs @@ -13,8 +13,8 @@ pub struct ChunkStoreAdapter { } impl StoreAdapter for ChunkStoreAdapter { - fn store(&self) -> Store { - self.store.clone() + fn store_ref(&self) -> &Store { + &self.store } } diff --git a/core/store/src/adapter/flat_store.rs b/core/store/src/adapter/flat_store.rs index 322f4dd09cc..d5200e038f2 100644 --- a/core/store/src/adapter/flat_store.rs +++ b/core/store/src/adapter/flat_store.rs @@ -20,8 +20,8 @@ pub struct FlatStoreAdapter { } impl StoreAdapter for FlatStoreAdapter { - fn store(&self) -> Store { - self.store.clone() + fn store_ref(&self) -> &Store { + &self.store } } diff --git a/core/store/src/adapter/mod.rs b/core/store/src/adapter/mod.rs index ba31d691775..fe975dc11c9 100644 --- a/core/store/src/adapter/mod.rs +++ b/core/store/src/adapter/mod.rs @@ -85,7 +85,11 @@ impl Into for StoreUpdateHolder<'static> { /// Simple adapter wrapper on top of Store to provide a more ergonomic interface for different store types. /// We provide simple inter-convertibility between different store types like FlatStoreAdapter and TrieStoreAdapter. pub trait StoreAdapter { - fn store(&self) -> Store; + fn store_ref(&self) -> &Store; + + fn store(&self) -> Store { + self.store_ref().clone() + } fn trie_store(&self) -> trie_store::TrieStoreAdapter { trie_store::TrieStoreAdapter::new(self.store()) diff --git a/core/store/src/adapter/trie_store.rs b/core/store/src/adapter/trie_store.rs index e01a4232e4b..bedec4578ea 100644 --- a/core/store/src/adapter/trie_store.rs +++ b/core/store/src/adapter/trie_store.rs @@ -18,8 +18,8 @@ pub struct TrieStoreAdapter { } impl StoreAdapter for TrieStoreAdapter { - fn store(&self) -> Store { - self.store.clone() + fn store_ref(&self) -> &Store { + &self.store } } diff --git a/core/store/src/flat/chunk_view.rs b/core/store/src/flat/chunk_view.rs index d704e2a013c..edff03ba670 100644 --- a/core/store/src/flat/chunk_view.rs +++ b/core/store/src/flat/chunk_view.rs @@ -47,6 +47,8 @@ impl FlatStorageChunkView { self.flat_storage.contains_key(&self.block_hash, key) } + // TODO: this should be changed to check the values that haven't yet been applied, like in get_value() and contains_key(), + // because otherwise we're iterating over old state that might have been updated by `self.block_hash` pub fn iter_range(&self, from: Option<&[u8]>, to: Option<&[u8]>) -> FlatStateIterator { self.store.iter_range(self.flat_storage.shard_uid(), from, to) } diff --git a/core/store/src/flat/manager.rs b/core/store/src/flat/manager.rs index 47168512acb..31cb9a93e39 100644 --- a/core/store/src/flat/manager.rs +++ b/core/store/src/flat/manager.rs @@ -1,5 +1,10 @@ use crate::adapter::flat_store::{FlatStoreAdapter, FlatStoreUpdateAdapter}; -use crate::flat::{BlockInfo, FlatStorageReadyStatus, FlatStorageStatus, POISONED_LOCK_ERR}; +use crate::flat::{ + BlockInfo, FlatStorageReadyStatus, FlatStorageReshardingStatus, FlatStorageStatus, + POISONED_LOCK_ERR, +}; +use crate::{DBCol, StoreAdapter}; +use near_primitives::block_header::BlockHeader; use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; @@ -30,11 +35,18 @@ pub struct FlatStorageManagerInner { /// this epoch can share the same `head` and `tail`, similar for shards for the next epoch, /// but such overhead is negligible comparing the delta sizes, so we think it's ok. flat_storages: Mutex>, + /// Set to Some() when there's a state snapshot in progress. Used to signal to the resharding flat + /// storage catchup code that it shouldn't advance past this block height + want_snapshot: Mutex>, } impl FlatStorageManager { pub fn new(store: FlatStoreAdapter) -> Self { - Self(Arc::new(FlatStorageManagerInner { store, flat_storages: Default::default() })) + Self(Arc::new(FlatStorageManagerInner { + store, + flat_storages: Default::default(), + want_snapshot: Default::default(), + })) } /// When a node starts from an empty database, this function must be called to ensure @@ -66,8 +78,14 @@ impl FlatStorageManager { /// and resharding. pub fn create_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), StorageError> { tracing::debug!(target: "store", ?shard_uid, "Creating flat storage for shard"); + let want_snapshot = self.0.want_snapshot.lock().expect(POISONED_LOCK_ERR); + let disable_updates = want_snapshot.is_some(); + let mut flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR); let flat_storage = FlatStorage::new(self.0.store.clone(), shard_uid)?; + if disable_updates { + flat_storage.set_flat_head_update_mode(false); + } let original_value = flat_storages.insert(shard_uid, flat_storage); if original_value.is_some() { // Generally speaking this shouldn't happen. It may only happen when @@ -81,6 +99,67 @@ impl FlatStorageManager { Ok(()) } + fn read_block_info(&self, hash: &CryptoHash) -> Result { + let header = self + .0 + .store + .store_ref() + .get_ser::(DBCol::BlockHeader, hash.as_ref()) + .map_err(|e| { + StorageError::StorageInconsistentState(format!( + "could not read block header {}: {:?}", + hash, e + )) + })? + .ok_or_else(|| { + StorageError::StorageInconsistentState(format!("block header {} not found", hash)) + })?; + Ok(BlockInfo { + hash: *header.hash(), + prev_hash: *header.prev_hash(), + height: header.height(), + }) + } + + /// Sets the status to `Ready` if it's currently `Resharding(CatchingUp)` + fn mark_flat_storage_ready(&self, shard_uid: ShardUId) -> Result<(), StorageError> { + // Don't use Self::get_flat_storage_status() because there's no need to panic if this fails, since this is used + // during state snapshotting where an error isn't critical to node operation. + let status = self.0.store.get_flat_storage_status(shard_uid)?; + let catchup_flat_head = match status { + FlatStorageStatus::Ready(_) => return Ok(()), + FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp(flat_head)) => { + flat_head + } + _ => { + return Err(StorageError::StorageInconsistentState(format!( + "Unexpected flat storage status: {:?}", + &status + ))) + } + }; + let flat_head = self.read_block_info(&catchup_flat_head)?; + let mut store_update = self.0.store.store_update(); + store_update.set_flat_storage_status( + shard_uid, + FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }), + ); + // TODO: Consider adding a StorageError::IO variant? + store_update.commit().map_err(|_| StorageError::StorageInternalError)?; + Ok(()) + } + + // If the flat storage status is Resharding(CatchingUp), sets it to Ready(), and then calls create_flat_storage_for_shard() + // This is used in creating state snapshots when this might be a flat storage that is in the middle of catchup, and that + // should now be considered `Ready` in the state snapshot, even if not in the main DB. + pub fn mark_ready_and_create_flat_storage( + &self, + shard_uid: ShardUId, + ) -> Result<(), StorageError> { + self.mark_flat_storage_ready(shard_uid)?; + self.create_flat_storage_for_shard(shard_uid) + } + /// Update flat storage for given processed or caught up block, which includes: /// - merge deltas from current flat storage head to new one given in /// `new_flat_head`; @@ -220,39 +299,66 @@ impl FlatStorageManager { } } - /// Updates `move_head_enabled` for all shards and returns whether it succeeded. - /// If at least one of the shards fails to update move_head_enabled, then that operation is rolled back for all shards. - /// - /// Rollbacks should work, because we assume that this function is the only - /// entry point to locking/unlocking flat head updates in a system with - /// multiple FlatStorages running in parallel. - pub fn set_flat_state_updates_mode(&self, enabled: bool) -> bool { + /// Returns None if there's no resharding flat storage split in progress + /// If there is, returns Some(None) if there's at least one child shard that hasn't been split and had its + /// status set to `CatchingUp`. If they've all been split already and are in the catchup phase, + /// returns the lowest height among all shards that resharding catchup has advanced to. + pub fn resharding_catchup_height_reached( + &self, + shard_uids: impl Iterator, + ) -> Result>, StorageError> { + let mut ret = None; + for shard_uid in shard_uids { + match self.0.store.get_flat_storage_status(shard_uid)? { + FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp( + catchup_flat_head, + )) => { + let flat_head = self.read_block_info(&catchup_flat_head)?; + if let Some(Some(min_height)) = ret { + ret = Some(Some(std::cmp::min(min_height, flat_head.height))); + } else { + ret = Some(Some(flat_head.height)); + } + } + FlatStorageStatus::Resharding(_) => return Ok(Some(None)), + _ => {} + }; + } + Ok(ret) + } + + /// Should be called when we want to take a state snapshot. Disallows flat head updates, and signals to any resharding + /// flat storage code that it should not advance beyond this hash + pub fn want_snapshot(&self, min_chunk_prev_height: BlockHeight) { + { + let mut want_snapshot = self.0.want_snapshot.lock().expect(POISONED_LOCK_ERR); + *want_snapshot = Some(min_chunk_prev_height); + } let flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR); - let mut all_updated = true; - let mut updated_flat_storages = vec![]; - let mut updated_shard_uids = vec![]; - for (shard_uid, flat_storage) in flat_storages.iter() { - if flat_storage.set_flat_head_update_mode(enabled) { - updated_flat_storages.push(flat_storage); - updated_shard_uids.push(shard_uid); - } else { - all_updated = false; - tracing::error!(target: "store", rolling_back_shards = ?updated_shard_uids, enabled, ?shard_uid, "Locking/Unlocking of flat head updates failed for shard. Reverting."); - break; - } + for flat_storage in flat_storages.values() { + flat_storage.set_flat_head_update_mode(false); } - if all_updated { - tracing::debug!(target: "store", enabled, "Locking/Unlocking of flat head updates succeeded"); - true - } else { - // Do rollback. - // It does allow for a data race if somebody updates move_head_enabled on individual shards. - // The assumption is that all shards get locked/unlocked at the same time. - for flat_storage in updated_flat_storages { - flat_storage.set_flat_head_update_mode(!enabled); - } - tracing::error!(target: "store", enabled, "Locking/Unlocking of flat head updates failed"); - false + tracing::debug!(target: "store", "Locked flat head updates"); + } + + /// Should be called when we're done taking a state snapshot. Allows flat head updates, and signals to any resharding + /// flat storage code that it can advance now. + pub fn snapshot_taken(&self) { + { + let mut want_snapshot = self.0.want_snapshot.lock().expect(POISONED_LOCK_ERR); + *want_snapshot = None; } + let flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR); + for flat_storage in flat_storages.values() { + flat_storage.set_flat_head_update_mode(true); + } + tracing::debug!(target: "store", "Unlocked flat head updates"); + } + + // Returns Some() if a state snapshot should be taken, and therefore any resharding flat storage code should not advance + // past the given hash + pub fn snapshot_wanted(&self) -> Option { + let want_snapshot = self.0.want_snapshot.lock().expect(POISONED_LOCK_ERR); + *want_snapshot } } diff --git a/core/store/src/flat/storage.rs b/core/store/src/flat/storage.rs index c24917782d6..8db1b72ea6f 100644 --- a/core/store/src/flat/storage.rs +++ b/core/store/src/flat/storage.rs @@ -492,14 +492,9 @@ impl FlatStorage { } /// Updates `move_head_enabled` and returns whether the change was done. - pub(crate) fn set_flat_head_update_mode(&self, enabled: bool) -> bool { + pub(crate) fn set_flat_head_update_mode(&self, enabled: bool) { let mut guard = self.0.write().expect(crate::flat::POISONED_LOCK_ERR); - if enabled != guard.move_head_enabled { - guard.move_head_enabled = enabled; - true - } else { - false - } + guard.move_head_enabled = enabled; } } diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index 7c1c288ba19..10579acda8a 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -117,8 +117,8 @@ pub struct Store { } impl StoreAdapter for Store { - fn store(&self) -> Store { - self.clone() + fn store_ref(&self) -> &Store { + self } } diff --git a/core/store/src/trie/state_snapshot.rs b/core/store/src/trie/state_snapshot.rs index edbff71b09a..77ff792aca0 100644 --- a/core/store/src/trie/state_snapshot.rs +++ b/core/store/src/trie/state_snapshot.rs @@ -90,7 +90,7 @@ impl StateSnapshot { tracing::debug!(target: "state_snapshot", ?shard_indexes_and_uids, ?prev_block_hash, "new StateSnapshot"); let mut included_shard_uids = vec![]; for &(shard_index, shard_uid) in shard_indexes_and_uids { - if let Err(err) = flat_storage_manager.create_flat_storage_for_shard(shard_uid) { + if let Err(err) = flat_storage_manager.mark_ready_and_create_flat_storage(shard_uid) { tracing::warn!(target: "state_snapshot", ?err, ?shard_uid, "Failed to create a flat storage for snapshot shard"); continue; } diff --git a/integration-tests/src/test_loop/tests/resharding_v3.rs b/integration-tests/src/test_loop/tests/resharding_v3.rs index 0bda17d80bf..c04d89ba630 100644 --- a/integration-tests/src/test_loop/tests/resharding_v3.rs +++ b/integration-tests/src/test_loop/tests/resharding_v3.rs @@ -87,7 +87,13 @@ impl TestReshardingParametersBuilder { fn build(self) -> TestReshardingParameters { let num_accounts = self.num_accounts.unwrap_or(8); let num_clients = self.num_clients.unwrap_or(3); - let epoch_length = self.epoch_length.unwrap_or(6); + // When there's a resharding task delay and single-shard tracking, the delay might be pushed out + // even further because the resharding task might have to wait for the state snapshot to be made + // before it can proceed, which might mean that flat storage won't be ready for the child shard for a whole epoch. + // So we extend the epoch length a bit in this case. + let epoch_length = self + .epoch_length + .unwrap_or_else(|| self.delay_flat_state_resharding.map_or(6, |delay| delay + 7)); // #12195 prevents number of BPs bigger than `epoch_length`. assert!(num_clients > 0 && num_clients <= epoch_length); @@ -968,9 +974,11 @@ fn test_resharding_v3_slower_post_processing_tasks() { } #[test] -// TODO(resharding): fix nearcore and change the ignore condition -// #[cfg_attr(not(feature = "test_features"), ignore)] -#[ignore] +// TODO(resharding): fix the fact that this test fails if the epoch length is set to 10, (and state sync +// is made to run before shard catchup) because set_state_finalize() sets flat storage state to +// ready before child catchup is done. Also fix the failure in +// check_state_shard_uid_mapping_after_resharding() if the epoch length is set to 11 +#[cfg_attr(not(feature = "test_features"), ignore)] fn test_resharding_v3_shard_shuffling_slower_post_processing_tasks() { let params = TestReshardingParametersBuilder::default() .shuffle_shard_assignment_for_chunk_producers(true)