From 88c3efc7b36881e4ad669d888ee592ad2a711030 Mon Sep 17 00:00:00 2001 From: Aleksandr Logunov Date: Mon, 20 Nov 2023 10:37:02 +0100 Subject: [PATCH] refactor: prepare shard update impl for stateless validation (#10187) This is just a refactor, which shouldn't change any behaviour of the code but prepares it for stateless validation. We came up with idea that logic contained in `get_apply_chunk_job_*` method should not depend on Chain that heavily. Here I extract all logic where Chain is required to `get_apply_chunk_job` only. All remaining logic is moved to `update_shard.rs`. For better readability, I introduce more updates: * "apply_chunk" is mostly replaced with "update_shard" because it is closer to what actually happens. When new chunk exists, we indeed apply it. But in two other cases listed in `ShardUpdateReason` enum we only update validator accounts or process split state info. * excessive info about blocks and chunks is replaced with exact `BlockContext` and `ShardUpdateReason` contents which are specific for all shard updates. Now it should be much more clear which data is necessary. Note that receipts are passed only when new chunk appears. chain.rs size is reduced by 250 lines which is good I think. In the follow-up PRs you can see more refactoring (but less invasive) and introducing of "shadow" jobs for stateless validation on nightly, which will in fact call `apply_old_chunk` N times and then `apply_new_chunk` 1 time. It was impossible to do using previous API. Nayduck https://nayduck.near.org/#/run/3276 --------- Co-authored-by: Longarithm --- chain/chain/src/chain.rs | 596 ++++++++++---------------------- chain/chain/src/lib.rs | 1 + chain/chain/src/update_shard.rs | 345 ++++++++++++++++++ 3 files changed, 525 insertions(+), 417 deletions(-) create mode 100644 chain/chain/src/update_shard.rs diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index a4216689100..172bc94a435 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -11,9 +11,13 @@ use crate::state_request_tracker::StateRequestTracker; use crate::state_snapshot_actor::SnapshotCallbacks; use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, GCMode}; use crate::types::{ - AcceptedBlock, ApplySplitStateResult, ApplySplitStateResultOrStateChanges, - ApplyTransactionResult, Block, BlockEconomicsConfig, BlockHeader, BlockStatus, ChainConfig, - ChainGenesis, Provenance, RuntimeAdapter, RuntimeStorageConfig, + AcceptedBlock, ApplySplitStateResultOrStateChanges, ApplyTransactionResult, Block, + BlockEconomicsConfig, BlockHeader, BlockStatus, ChainConfig, ChainGenesis, Provenance, + RuntimeAdapter, RuntimeStorageConfig, +}; +use crate::update_shard::{ + process_shard_update, ApplyChunkResult, BlockContext, NewChunkResult, OldChunkResult, + ShardContext, ShardUpdateReason, StateSplitResult, }; use crate::validate::{ validate_challenge, validate_chunk_proofs, validate_chunk_with_chunk_extra, @@ -63,8 +67,8 @@ use near_primitives::static_clock::StaticClock; use near_primitives::transaction::{ExecutionOutcomeWithIdAndProof, SignedTransaction}; use near_primitives::types::chunk_extra::ChunkExtra; use near_primitives::types::{ - AccountId, Balance, BlockExtra, BlockHeight, BlockHeightDelta, EpochId, Gas, MerkleHash, - NumBlocks, NumShards, ShardId, StateChangesForSplitStates, StateRoot, + AccountId, Balance, BlockExtra, BlockHeight, BlockHeightDelta, EpochId, MerkleHash, NumBlocks, + NumShards, ShardId, StateRoot, }; use near_primitives::unwrap_or_return; use near_primitives::utils::MaybeValidated; @@ -485,13 +489,13 @@ impl Drop for Chain { } } -/// ApplyChunkJob is a closure that is responsible for applying of a single chunk. -/// All of the chunk details and other arguments are already captured within. -type ApplyChunkJob = Box Result + Send + 'static>; +/// UpdateShardJob is a closure that is responsible for updating a shard for a single block. +/// Execution context (latest blocks/chunks details) are already captured within. +type UpdateShardJob = Box Result + Send + 'static>; /// PreprocessBlockResult is a tuple where the first element is a vector of jobs -/// to apply chunks the second element is BlockPreprocessInfo -type PreprocessBlockResult = (Vec, BlockPreprocessInfo); +/// to update shards, the second element is BlockPreprocessInfo +type PreprocessBlockResult = (Vec, BlockPreprocessInfo); // Used only for verify_block_hash_and_signature. See that method. #[derive(Clone, Copy, PartialEq, Eq)] @@ -3488,6 +3492,87 @@ impl Chain { Ok(()) } + /// Validates basic correctness of array of transactions included in chunk. + /// Doesn't require state. + fn validate_chunk_transactions( + &self, + block: &Block, + prev_block_header: &BlockHeader, + chunk: &ShardChunk, + ) -> Result<(), Error> { + if !validate_transactions_order(chunk.transactions()) { + let merkle_paths = Block::compute_chunk_headers_root(block.chunks().iter()).1; + let chunk_proof = ChunkProofs { + block_header: borsh::to_vec(&block.header()).expect("Failed to serialize"), + merkle_proof: merkle_paths[chunk.shard_id() as usize].clone(), + chunk: MaybeEncodedShardChunk::Decoded(chunk.clone()), + }; + return Err(Error::InvalidChunkProofs(Box::new(chunk_proof))); + } + + let protocol_version = + self.epoch_manager.get_epoch_protocol_version(block.header().epoch_id())?; + if checked_feature!("stable", AccessKeyNonceRange, protocol_version) { + let transaction_validity_period = self.transaction_validity_period; + for transaction in chunk.transactions() { + self.store() + .check_transaction_validity_period( + prev_block_header, + &transaction.transaction.block_hash, + transaction_validity_period, + ) + .map_err(|_| Error::from(Error::InvalidTransactions))?; + } + }; + + return Ok(()); + } + + /// For given pair of block headers and shard id, return information about + /// block necessary for processing shard update. + fn get_block_context_for_shard_update( + &self, + block_header: &BlockHeader, + prev_block_header: &BlockHeader, + shard_id: ShardId, + is_new_chunk: bool, + ) -> Result { + let epoch_id = block_header.epoch_id(); + let protocol_version = self.epoch_manager.get_epoch_protocol_version(epoch_id)?; + // Before `FixApplyChunks` feature, gas price was taken from current + // block by mistake. Preserve it for backwards compatibility. + let gas_price = if !is_new_chunk + && protocol_version < ProtocolFeature::FixApplyChunks.protocol_version() + { + block_header.next_gas_price() + } else { + prev_block_header.next_gas_price() + }; + + // This variable is responsible for checking to which block we can apply receipts previously lost in apply_chunks + // (see https://github.com/near/nearcore/pull/4248/) + // We take the first block with existing chunk in the first epoch in which protocol feature + // RestoreReceiptsAfterFixApplyChunks was enabled, and put the restored receipts there. + let is_first_block_with_chunk_of_version = is_new_chunk + && check_if_block_is_first_with_chunk_of_version( + self.store(), + self.epoch_manager.as_ref(), + block_header.prev_hash(), + shard_id, + )?; + + Ok(BlockContext { + block_hash: *block_header.hash(), + prev_block_hash: *block_header.prev_hash(), + challenges_result: block_header.challenges_result().clone(), + block_timestamp: block_header.raw_timestamp(), + gas_price, + height: block_header.height(), + random_seed: *block_header.random_value(), + is_first_block_with_chunk_of_version, + }) + } + fn block_catch_up_postprocess( &mut self, me: &Option, @@ -3786,7 +3871,8 @@ impl Chain { .collect() } - /// Creates jobs that would apply chunks + /// Creates jobs which will update shards for the given block and incoming + /// receipts aggregated for it. fn apply_chunks_preprocessing( &self, me: &Option, @@ -3796,10 +3882,8 @@ impl Chain { mode: ApplyChunksMode, mut state_patch: SandboxStatePatch, invalid_chunks: &mut Vec, - ) -> Result, Error> { + ) -> Result, Error> { let _span = tracing::debug_span!(target: "chain", "apply_chunks_preprocessing").entered(); - let prev_hash = block.header().prev_hash(); - let will_shard_layout_change = self.epoch_manager.will_shard_layout_change(prev_hash)?; let prev_chunk_headers = Chain::get_prev_chunk_headers(self.epoch_manager.as_ref(), prev_block)?; block @@ -3811,21 +3895,18 @@ impl Chain { // XXX: This is a bit questionable -- sandbox state patching works // only for a single shard. This so far has been enough. let state_patch = state_patch.take(); - - let apply_chunk_job = self.get_apply_chunk_job( + let update_shard_job = self.get_update_shard_job( me, block, prev_block, chunk_header, prev_chunk_header, - shard_id, + shard_id as ShardId, mode, - will_shard_layout_change, incoming_receipts, state_patch, ); - - match apply_chunk_job { + match update_shard_job { Ok(Some(processor)) => Some(Ok(processor)), Ok(None) => None, Err(err) => { @@ -3839,26 +3920,25 @@ impl Chain { .collect() } - /// This method returns the closure that is responsible for applying of a single chunk. - fn get_apply_chunk_job( + /// This method returns the closure that is responsible for updating a shard. + fn get_update_shard_job( &self, me: &Option, block: &Block, prev_block: &Block, chunk_header: &ShardChunkHeader, prev_chunk_header: &ShardChunkHeader, - shard_id: usize, + shard_id: ShardId, mode: ApplyChunksMode, - will_shard_layout_change: bool, incoming_receipts: &HashMap>, state_patch: SandboxStatePatch, - ) -> Result, Error> { - let shard_id = shard_id as ShardId; + ) -> Result, Error> { let prev_hash = block.header().prev_hash(); let cares_about_shard_this_epoch = self.shard_tracker.care_about_shard(me.as_ref(), prev_hash, shard_id, true); let cares_about_shard_next_epoch = self.shard_tracker.will_care_about_shard(me.as_ref(), prev_hash, shard_id, true); + let will_shard_layout_change = self.epoch_manager.will_shard_layout_change(prev_hash)?; let should_apply_transactions = get_should_apply_transactions( mode, cares_about_shard_this_epoch, @@ -3888,343 +3968,93 @@ impl Chain { let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, block.header().epoch_id())?; let is_new_chunk = chunk_header.height_included() == block.header().height(); - let epoch_manager = self.epoch_manager.clone(); - let runtime = self.runtime_adapter.clone(); - if should_apply_transactions { + let shard_update_reason = if should_apply_transactions { if is_new_chunk { - self.get_apply_chunk_job_new_chunk( - block, - prev_block, - chunk_header, - prev_chunk_header, - shard_uid, - will_shard_layout_change, - incoming_receipts, - state_patch, - runtime, - epoch_manager, - split_state_roots, + // Validate new chunk and collect incoming receipts for it. + + let prev_chunk_extra = self.get_chunk_extra(prev_hash, &shard_uid)?; + let chunk = self.get_chunk_clone_from_header(&chunk_header)?; + let prev_chunk_height_included = prev_chunk_header.height_included(); + + // Validate that all next chunk information matches previous chunk extra. + validate_chunk_with_chunk_extra( + // It's safe here to use ChainStore instead of ChainStoreUpdate + // because we're asking prev_chunk_header for already committed block + self.store(), + self.epoch_manager.as_ref(), + prev_hash, + prev_chunk_extra.as_ref(), + prev_chunk_height_included, + &chunk_header, ) + .map_err(|err| { + warn!( + target: "chain", + ?err, + prev_block_hash=?prev_hash, + block_hash=?block.header().hash(), + shard_id, + prev_chunk_height_included, + ?prev_chunk_extra, + ?chunk_header, + "Failed to validate chunk extra" + ); + byzantine_assert!(false); + match self.create_chunk_state_challenge(prev_block, block, &chunk_header) { + Ok(chunk_state) => Error::InvalidChunkState(Box::new(chunk_state)), + Err(err) => err, + } + })?; + + self.validate_chunk_transactions(&block, prev_block.header(), &chunk)?; + + // we can't use hash from the current block here yet because the incoming receipts + // for this block is not stored yet + let new_receipts = collect_receipts(incoming_receipts.get(&shard_id).unwrap()); + let old_receipts = &self.store().get_incoming_receipts_for_shard( + self.epoch_manager.as_ref(), + shard_id, + *prev_hash, + prev_chunk_height_included, + )?; + let old_receipts = collect_receipts_from_response(old_receipts); + let receipts = [new_receipts, old_receipts].concat(); + + ShardUpdateReason::NewChunk(chunk, receipts, split_state_roots) } else { - self.get_apply_chunk_job_old_chunk( - block, - prev_block, - shard_uid, - will_shard_layout_change, - state_patch, - runtime, - epoch_manager, + ShardUpdateReason::OldChunk( + ChunkExtra::clone(self.get_chunk_extra(prev_hash, &shard_uid)?.as_ref()), split_state_roots, ) } } else if let Some(split_state_roots) = split_state_roots { - // Case 3), split state are ready. Read the state changes from the - // database and apply them to the split states. assert!(mode == ApplyChunksMode::CatchingUp && cares_about_shard_this_epoch); - self.get_apply_chunk_job_split_state( - block, - shard_uid, - runtime, - epoch_manager, + ShardUpdateReason::StateSplit( split_state_roots, + self.store().get_state_changes_for_split_states(block.hash(), shard_id)?, ) } else { - Ok(None) - } - } - - /// Returns the apply chunk job when applying a new chunk and applying transactions. - fn get_apply_chunk_job_new_chunk( - &self, - block: &Block, - prev_block: &Block, - chunk_header: &ShardChunkHeader, - prev_chunk_header: &ShardChunkHeader, - shard_uid: ShardUId, - will_shard_layout_change: bool, - incoming_receipts: &HashMap>, - state_patch: SandboxStatePatch, - runtime: Arc, - epoch_manager: Arc, - split_state_roots: Option>, - ) -> Result, Error> { - let prev_hash = block.header().prev_hash(); - let shard_id = shard_uid.shard_id(); - - let prev_chunk_height_included = prev_chunk_header.height_included(); - // Validate state root. - let prev_chunk_extra = self.get_chunk_extra(prev_hash, &shard_uid)?; - - // Validate that all next chunk information matches previous chunk extra. - validate_chunk_with_chunk_extra( - // It's safe here to use ChainStore instead of ChainStoreUpdate - // because we're asking prev_chunk_header for already committed block - self.store(), - self.epoch_manager.as_ref(), - prev_hash, - &prev_chunk_extra, - prev_chunk_height_included, - chunk_header, - ) - .map_err(|err| { - warn!( - target: "chain", - ?err, - prev_block_hash=?prev_hash, - block_hash=?block.header().hash(), - shard_id, - prev_chunk_height_included, - ?prev_chunk_extra, - ?chunk_header, - "Failed to validate chunk extra"); - byzantine_assert!(false); - match self.create_chunk_state_challenge(prev_block, block, chunk_header) { - Ok(chunk_state) => Error::InvalidChunkState(Box::new(chunk_state)), - Err(err) => err, - } - })?; - // we can't use hash from the current block here yet because the incoming receipts - // for this block is not stored yet - let new_receipts = collect_receipts(incoming_receipts.get(&shard_id).unwrap()); - let old_receipts = &self.store().get_incoming_receipts_for_shard( - self.epoch_manager.as_ref(), - shard_id, - *prev_hash, - prev_chunk_height_included, - )?; - let old_receipts = collect_receipts_from_response(old_receipts); - let receipts = [new_receipts, old_receipts].concat(); - - let chunk = self.get_chunk_clone_from_header(&chunk_header.clone())?; - - let transactions = chunk.transactions(); - if !validate_transactions_order(transactions) { - let merkle_paths = Block::compute_chunk_headers_root(block.chunks().iter()).1; - let chunk_proof = ChunkProofs { - block_header: borsh::to_vec(&block.header()).expect("Failed to serialize"), - merkle_proof: merkle_paths[shard_id as usize].clone(), - chunk: MaybeEncodedShardChunk::Decoded(chunk), - }; - return Err(Error::InvalidChunkProofs(Box::new(chunk_proof))); - } - - let protocol_version = - self.epoch_manager.get_epoch_protocol_version(block.header().epoch_id())?; - - if checked_feature!("stable", AccessKeyNonceRange, protocol_version) { - let transaction_validity_period = self.transaction_validity_period; - for transaction in transactions { - self.store() - .check_transaction_validity_period( - prev_block.header(), - &transaction.transaction.block_hash, - transaction_validity_period, - ) - .map_err(|_| Error::from(Error::InvalidTransactions))?; - } + return Ok(None); }; - let chunk_inner = chunk.cloned_header().take_inner(); - let gas_limit = chunk_inner.gas_limit(); - - // This variable is responsible for checking to which block we can apply receipts previously lost in apply_chunks - // (see https://github.com/near/nearcore/pull/4248/) - // We take the first block with existing chunk in the first epoch in which protocol feature - // RestoreReceiptsAfterFixApplyChunks was enabled, and put the restored receipts there. - let is_first_block_with_chunk_of_version = check_if_block_is_first_with_chunk_of_version( - self.store(), - epoch_manager.as_ref(), - prev_block.hash(), + let runtime = self.runtime_adapter.clone(); + let epoch_manager = self.epoch_manager.clone(); + let block_context = self.get_block_context_for_shard_update( + block.header(), + prev_block.header(), shard_id, + is_new_chunk, )?; - - let block_hash = *block.hash(); - let block_height = block.header().height(); - let challenges_result = block.header().challenges_result().clone(); - let block_timestamp = block.header().raw_timestamp(); - let next_gas_price = prev_block.header().next_gas_price(); - let random_seed = *block.header().random_value(); - let height = chunk_header.height_included(); - let prev_block_hash = *chunk_header.prev_block_hash(); - Ok(Some(Box::new(move |parent_span| -> Result { - let _span = tracing::debug_span!( - target: "chain", - parent: parent_span, - "new_chunk", - shard_id) - .entered(); - let _timer = CryptoHashTimer::new(chunk.chunk_hash().0); - let storage_config = RuntimeStorageConfig { - state_root: *chunk_inner.prev_state_root(), - use_flat_storage: true, - source: crate::types::StorageDataSource::Db, - state_patch, - record_storage: false, - }; - match runtime.apply_transactions( - shard_id, - storage_config, - height, - block_timestamp, - &prev_block_hash, - &block_hash, - &receipts, - chunk.transactions(), - chunk_inner.prev_validator_proposals(), - next_gas_price, - gas_limit, - &challenges_result, - random_seed, - true, - is_first_block_with_chunk_of_version, - ) { - Ok(apply_result) => { - let apply_split_result_or_state_changes = if will_shard_layout_change { - Some(ChainUpdate::apply_split_state_changes( - epoch_manager.as_ref(), - runtime.as_ref(), - &block_hash, - block_height, - &prev_block_hash, - &apply_result, - split_state_roots, - )?) - } else { - None - }; - Ok(ApplyChunkResult::SameHeight(SameHeightResult { - gas_limit, - shard_uid, - apply_result, - apply_split_result_or_state_changes, - })) - } - Err(err) => Err(err), - } - }))) - } - - /// Returns the apply chunk job when applying an old chunk and applying transactions. - fn get_apply_chunk_job_old_chunk( - &self, - block: &Block, - prev_block: &Block, - shard_uid: ShardUId, - will_shard_layout_change: bool, - state_patch: SandboxStatePatch, - runtime: Arc, - epoch_manager: Arc, - split_state_roots: Option>, - ) -> Result, Error> { - let shard_id = shard_uid.shard_id(); - let prev_block_hash = *prev_block.hash(); - let new_extra = self.get_chunk_extra(&prev_block_hash, &shard_uid)?; - - let block_hash = *block.hash(); - let block_height = block.header().height(); - let challenges_result = block.header().challenges_result().clone(); - let block_timestamp = block.header().raw_timestamp(); - let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&prev_block_hash)?; - let protocol_version = self.epoch_manager.get_epoch_protocol_version(&epoch_id)?; - - let next_gas_price = - if protocol_version >= ProtocolFeature::FixApplyChunks.protocol_version() { - prev_block.header().next_gas_price() - } else { - block.header().next_gas_price() - }; - let random_seed = *block.header().random_value(); - let height = block.header().height(); - - Ok(Some(Box::new(move |parent_span| -> Result { - let _span = tracing::debug_span!( - target: "chain", - parent: parent_span, - "existing_chunk", - shard_id) - .entered(); - let storage_config = RuntimeStorageConfig { - state_root: *new_extra.state_root(), - use_flat_storage: true, - source: crate::types::StorageDataSource::Db, + Ok(process_shard_update( + parent_span, + runtime.as_ref(), + epoch_manager.as_ref(), + shard_update_reason, + block_context, + ShardContext { shard_uid, will_shard_layout_change }, state_patch, - record_storage: false, - }; - match runtime.apply_transactions( - shard_id, - storage_config, - height, - block_timestamp, - &prev_block_hash, - &block_hash, - &[], - &[], - new_extra.validator_proposals(), - next_gas_price, - new_extra.gas_limit(), - &challenges_result, - random_seed, - false, - false, - ) { - Ok(apply_result) => { - let apply_split_result_or_state_changes = if will_shard_layout_change { - Some(ChainUpdate::apply_split_state_changes( - epoch_manager.as_ref(), - runtime.as_ref(), - &block_hash, - block_height, - &prev_block_hash, - &apply_result, - split_state_roots, - )?) - } else { - None - }; - Ok(ApplyChunkResult::DifferentHeight(DifferentHeightResult { - shard_uid, - apply_result, - apply_split_result_or_state_changes, - })) - } - Err(err) => Err(err), - } - }))) - } - - /// Returns the apply chunk job when just splitting state but not applying transactions. - fn get_apply_chunk_job_split_state( - &self, - block: &Block, - shard_uid: ShardUId, - runtime: Arc, - epoch_manager: Arc, - split_state_roots: HashMap, - ) -> Result, Error> { - let shard_id = shard_uid.shard_id(); - let next_epoch_shard_layout = - epoch_manager.get_shard_layout(block.header().next_epoch_id())?; - let state_changes = - self.store().get_state_changes_for_split_states(block.hash(), shard_id)?; - let block_hash = *block.hash(); - let block_height = block.header().height(); - Ok(Some(Box::new(move |parent_span| -> Result { - let _span = tracing::debug_span!( - target: "chain", - parent: parent_span, - "split_state", - shard_id, - ?shard_uid) - .entered(); - let results = runtime.apply_update_to_split_states( - &block_hash, - block_height, - split_state_roots, - &next_epoch_shard_layout, - state_changes, - )?; - Ok(ApplyChunkResult::SplitState(SplitStateResult { shard_uid, results })) + )?) }))) } @@ -4965,35 +4795,6 @@ pub struct ChainUpdate<'a> { transaction_validity_period: BlockHeightDelta, } -#[derive(Debug)] -pub struct SameHeightResult { - shard_uid: ShardUId, - gas_limit: Gas, - apply_result: ApplyTransactionResult, - apply_split_result_or_state_changes: Option, -} - -#[derive(Debug)] -pub struct DifferentHeightResult { - shard_uid: ShardUId, - apply_result: ApplyTransactionResult, - apply_split_result_or_state_changes: Option, -} - -#[derive(Debug)] -pub struct SplitStateResult { - // parent shard of the split states - shard_uid: ShardUId, - results: Vec, -} - -#[derive(Debug)] -pub enum ApplyChunkResult { - SameHeight(SameHeightResult), - DifferentHeight(DifferentHeightResult), - SplitState(SplitStateResult), -} - impl<'a> ChainUpdate<'a> { pub fn new( store: &'a mut ChainStore, @@ -5105,45 +4906,6 @@ impl<'a> ChainUpdate<'a> { Ok(()) } - /// Process ApplyTransactionResult to apply changes to split states - /// When shards will change next epoch, - /// if `split_state_roots` is not None, that means states for the split shards are ready - /// this function updates these states and return apply results for these states - /// otherwise, this function returns state changes needed to be applied to split - /// states. These state changes will be stored in the database by `process_split_state` - fn apply_split_state_changes( - epoch_manager: &dyn EpochManagerAdapter, - runtime_adapter: &dyn RuntimeAdapter, - block_hash: &CryptoHash, - block_height: BlockHeight, - prev_block_hash: &CryptoHash, - apply_result: &ApplyTransactionResult, - split_state_roots: Option>, - ) -> Result { - let state_changes = StateChangesForSplitStates::from_raw_state_changes( - apply_result.trie_changes.state_changes(), - apply_result.processed_delayed_receipts.clone(), - ); - let next_epoch_shard_layout = { - let next_epoch_id = epoch_manager.get_next_epoch_id_from_prev_block(prev_block_hash)?; - epoch_manager.get_shard_layout(&next_epoch_id)? - }; - // split states are ready, apply update to them now - if let Some(state_roots) = split_state_roots { - let split_state_results = runtime_adapter.apply_update_to_split_states( - block_hash, - block_height, - state_roots, - &next_epoch_shard_layout, - state_changes, - )?; - Ok(ApplySplitStateResultOrStateChanges::ApplySplitStateResults(split_state_results)) - } else { - // split states are not ready yet, store state changes in consolidated_state_changes - Ok(ApplySplitStateResultOrStateChanges::StateChangesForSplitStates(state_changes)) - } - } - /// Postprocess split state results or state changes, do the necessary update on chain /// for split state results: store the chunk extras and trie changes for the split states /// for state changes, store the state changes for splitting states @@ -5294,7 +5056,7 @@ impl<'a> ChainUpdate<'a> { let prev_hash = block.header().prev_hash(); let height = block.header().height(); match result { - ApplyChunkResult::SameHeight(SameHeightResult { + ApplyChunkResult::NewChunk(NewChunkResult { gas_limit, shard_uid, apply_result, @@ -5345,7 +5107,7 @@ impl<'a> ChainUpdate<'a> { self.process_split_state(block, &shard_uid, apply_results_or_state_changes)?; } } - ApplyChunkResult::DifferentHeight(DifferentHeightResult { + ApplyChunkResult::OldChunk(OldChunkResult { shard_uid, apply_result, apply_split_result_or_state_changes, @@ -5372,7 +5134,7 @@ impl<'a> ChainUpdate<'a> { self.process_split_state(block, &shard_uid, apply_results_or_state_changes)?; } } - ApplyChunkResult::SplitState(SplitStateResult { shard_uid, results }) => { + ApplyChunkResult::StateSplit(StateSplitResult { shard_uid, results }) => { self.chain_store_update .remove_state_changes_for_split_states(*block.hash(), shard_uid.shard_id()); self.process_split_state( diff --git a/chain/chain/src/lib.rs b/chain/chain/src/lib.rs index dca14596187..bcf3be21320 100644 --- a/chain/chain/src/lib.rs +++ b/chain/chain/src/lib.rs @@ -30,6 +30,7 @@ pub mod validate; #[cfg(test)] mod tests; +mod update_shard; #[cfg(feature = "byzantine_asserts")] #[macro_export] diff --git a/chain/chain/src/update_shard.rs b/chain/chain/src/update_shard.rs new file mode 100644 index 00000000000..8e119033206 --- /dev/null +++ b/chain/chain/src/update_shard.rs @@ -0,0 +1,345 @@ +use crate::crypto_hash_timer::CryptoHashTimer; +use crate::types::{ + ApplySplitStateResult, ApplySplitStateResultOrStateChanges, ApplyTransactionResult, + RuntimeAdapter, RuntimeStorageConfig, +}; +use near_chain_primitives::Error; +use near_epoch_manager::EpochManagerAdapter; +use near_primitives::challenge::ChallengesResult; +use near_primitives::hash::CryptoHash; +use near_primitives::receipt::Receipt; +use near_primitives::sandbox::state_patch::SandboxStatePatch; +use near_primitives::shard_layout::ShardUId; +use near_primitives::sharding::ShardChunk; +use near_primitives::types::chunk_extra::ChunkExtra; +use near_primitives::types::{Balance, BlockHeight, Gas, StateChangesForSplitStates, StateRoot}; +use std::collections::HashMap; + +/// Information about block for which shard is updated. +/// Use cases include: +/// - queries to epoch manager +/// - allowing contracts to get current chain data +#[derive(Clone, Debug)] +pub(crate) struct BlockContext { + pub block_hash: CryptoHash, + pub prev_block_hash: CryptoHash, + pub challenges_result: ChallengesResult, + pub block_timestamp: u64, + pub gas_price: Balance, + pub height: BlockHeight, + pub random_seed: CryptoHash, + pub is_first_block_with_chunk_of_version: bool, +} + +/// Result of updating a shard for some block when it has a new chunk for this +/// shard. +#[derive(Debug)] +pub struct NewChunkResult { + pub(crate) shard_uid: ShardUId, + pub(crate) gas_limit: Gas, + pub(crate) apply_result: ApplyTransactionResult, + pub(crate) apply_split_result_or_state_changes: Option, +} + +/// Result of updating a shard for some block when it doesn't have a new chunk +/// for this shard, so previous chunk header is copied. +#[derive(Debug)] +pub struct OldChunkResult { + pub(crate) shard_uid: ShardUId, + /// Note that despite the naming, no transactions are applied in this case. + /// TODO(logunov): exclude receipts/txs context from all related types. + pub(crate) apply_result: ApplyTransactionResult, + pub(crate) apply_split_result_or_state_changes: Option, +} + +/// Result of updating a shard for some block when we apply only split state +/// changes due to resharding. +#[derive(Debug)] +pub struct StateSplitResult { + // parent shard of the split states + pub(crate) shard_uid: ShardUId, + pub(crate) results: Vec, +} + +#[derive(Debug)] +pub enum ApplyChunkResult { + NewChunk(NewChunkResult), + OldChunk(OldChunkResult), + StateSplit(StateSplitResult), +} + +/// State roots of split shards which are ready. +type SplitStateRoots = HashMap; + +/// Reason to update a shard when new block appears on chain. +/// All types include state roots for split shards in case of resharding. +pub(crate) enum ShardUpdateReason { + /// Block has a new chunk for the shard. + /// Contains chunk itself and all new incoming receipts to the shard. + NewChunk(ShardChunk, Vec, Option), + /// Block doesn't have a new chunk for the shard. + /// Instead, previous chunk header is copied. + /// Contains result of shard update for previous block. + OldChunk(ChunkExtra, Option), + /// See comment to `split_state_roots` in `Chain::get_update_shard_job`. + /// Process only state changes caused by resharding. + StateSplit(SplitStateRoots, StateChangesForSplitStates), +} + +/// Information about shard to update. +pub(crate) struct ShardContext { + pub shard_uid: ShardUId, + /// Whether shard layout changes in the next epoch. + pub will_shard_layout_change: bool, +} + +/// Processes shard update with given block and shard. +/// Doesn't modify chain, only produces result to be applied later. +pub(crate) fn process_shard_update( + parent_span: &tracing::Span, + runtime: &dyn RuntimeAdapter, + epoch_manager: &dyn EpochManagerAdapter, + shard_update_reason: ShardUpdateReason, + block_context: BlockContext, + shard_context: ShardContext, + state_patch: SandboxStatePatch, +) -> Result { + match shard_update_reason { + ShardUpdateReason::NewChunk(chunk, receipts, split_state_roots) => apply_new_chunk( + parent_span, + block_context, + chunk, + shard_context, + receipts, + state_patch, + runtime, + epoch_manager, + split_state_roots, + ), + ShardUpdateReason::OldChunk(prev_chunk_extra, split_state_roots) => apply_old_chunk( + parent_span, + block_context, + &prev_chunk_extra, + shard_context, + state_patch, + runtime, + epoch_manager, + split_state_roots, + ), + ShardUpdateReason::StateSplit(split_state_roots, state_changes) => apply_state_split( + parent_span, + block_context, + shard_context.shard_uid, + runtime, + epoch_manager, + split_state_roots, + state_changes, + ), + } +} + +/// Applies new chunk, which includes applying transactions from chunk and +/// receipts filtered from outgoing receipts from previous chunks. +fn apply_new_chunk( + parent_span: &tracing::Span, + block_context: BlockContext, + chunk: ShardChunk, + shard_info: ShardContext, + receipts: Vec, + state_patch: SandboxStatePatch, + runtime: &dyn RuntimeAdapter, + epoch_manager: &dyn EpochManagerAdapter, + split_state_roots: Option, +) -> Result { + let shard_id = shard_info.shard_uid.shard_id(); + let _span = tracing::debug_span!( + target: "chain", + parent: parent_span, + "new_chunk", + shard_id) + .entered(); + let chunk_inner = chunk.cloned_header().take_inner(); + let gas_limit = chunk_inner.gas_limit(); + + let _timer = CryptoHashTimer::new(chunk.chunk_hash().0); + let storage_config = RuntimeStorageConfig { + state_root: *chunk_inner.prev_state_root(), + use_flat_storage: true, + source: crate::types::StorageDataSource::Db, + state_patch, + record_storage: false, + }; + match runtime.apply_transactions( + shard_id, + storage_config, + block_context.height, + block_context.block_timestamp, + &block_context.prev_block_hash, + &block_context.block_hash, + &receipts, + chunk.transactions(), + chunk_inner.prev_validator_proposals(), + block_context.gas_price, + gas_limit, + &block_context.challenges_result, + block_context.random_seed, + true, + block_context.is_first_block_with_chunk_of_version, + ) { + Ok(apply_result) => { + let apply_split_result_or_state_changes = if shard_info.will_shard_layout_change { + Some(apply_split_state_changes( + epoch_manager, + runtime, + block_context, + &apply_result, + split_state_roots, + )?) + } else { + None + }; + Ok(ApplyChunkResult::NewChunk(NewChunkResult { + gas_limit, + shard_uid: shard_info.shard_uid, + apply_result, + apply_split_result_or_state_changes, + })) + } + Err(err) => Err(err), + } +} + +/// Applies shard update corresponding to missing chunk. +/// (logunov) From what I know, the state update may include only validator +/// accounts update on epoch start. +fn apply_old_chunk( + parent_span: &tracing::Span, + block_context: BlockContext, + prev_chunk_extra: &ChunkExtra, + shard_info: ShardContext, + state_patch: SandboxStatePatch, + runtime: &dyn RuntimeAdapter, + epoch_manager: &dyn EpochManagerAdapter, + split_state_roots: Option, +) -> Result { + let shard_id = shard_info.shard_uid.shard_id(); + let _span = tracing::debug_span!( + target: "chain", + parent: parent_span, + "existing_chunk", + shard_id) + .entered(); + + let storage_config = RuntimeStorageConfig { + state_root: *prev_chunk_extra.state_root(), + use_flat_storage: true, + source: crate::types::StorageDataSource::Db, + state_patch, + record_storage: false, + }; + match runtime.apply_transactions( + shard_id, + storage_config, + block_context.height, + block_context.block_timestamp, + &block_context.prev_block_hash, + &block_context.block_hash, + &[], + &[], + prev_chunk_extra.validator_proposals(), + block_context.gas_price, + prev_chunk_extra.gas_limit(), + &block_context.challenges_result, + block_context.random_seed, + false, + false, + ) { + Ok(apply_result) => { + let apply_split_result_or_state_changes = if shard_info.will_shard_layout_change { + Some(apply_split_state_changes( + epoch_manager, + runtime, + block_context, + &apply_result, + split_state_roots, + )?) + } else { + None + }; + Ok(ApplyChunkResult::OldChunk(OldChunkResult { + shard_uid: shard_info.shard_uid, + apply_result, + apply_split_result_or_state_changes, + })) + } + Err(err) => Err(err), + } +} + +/// Applies only split state changes but not applies any transactions. +fn apply_state_split( + parent_span: &tracing::Span, + block_context: BlockContext, + shard_uid: ShardUId, + runtime: &dyn RuntimeAdapter, + epoch_manager: &dyn EpochManagerAdapter, + split_state_roots: SplitStateRoots, + state_changes: StateChangesForSplitStates, +) -> Result { + let shard_id = shard_uid.shard_id(); + let _span = tracing::debug_span!( + target: "chain", + parent: parent_span, + "split_state", + shard_id, + ?shard_uid) + .entered(); + let next_epoch_id = epoch_manager.get_next_epoch_id(&block_context.block_hash)?; + let next_epoch_shard_layout = epoch_manager.get_shard_layout(&next_epoch_id)?; + let results = runtime.apply_update_to_split_states( + &block_context.block_hash, + block_context.height, + split_state_roots, + &next_epoch_shard_layout, + state_changes, + )?; + Ok(ApplyChunkResult::StateSplit(StateSplitResult { shard_uid, results })) +} + +/// Process ApplyTransactionResult to apply changes to split states +/// When shards will change next epoch, +/// if `split_state_roots` is not None, that means states for the split shards are ready +/// this function updates these states and return apply results for these states +/// otherwise, this function returns state changes needed to be applied to split +/// states. These state changes will be stored in the database by `process_split_state` +fn apply_split_state_changes( + epoch_manager: &dyn EpochManagerAdapter, + runtime_adapter: &dyn RuntimeAdapter, + block_context: BlockContext, + apply_result: &ApplyTransactionResult, + split_state_roots: Option, +) -> Result { + let state_changes = StateChangesForSplitStates::from_raw_state_changes( + apply_result.trie_changes.state_changes(), + apply_result.processed_delayed_receipts.clone(), + ); + let next_epoch_shard_layout = { + let next_epoch_id = + epoch_manager.get_next_epoch_id_from_prev_block(&block_context.prev_block_hash)?; + epoch_manager.get_shard_layout(&next_epoch_id)? + }; + // split states are ready, apply update to them now + if let Some(state_roots) = split_state_roots { + let split_state_results = runtime_adapter.apply_update_to_split_states( + &block_context.block_hash, + block_context.height, + state_roots, + &next_epoch_shard_layout, + state_changes, + )?; + Ok(ApplySplitStateResultOrStateChanges::ApplySplitStateResults(split_state_results)) + } else { + // split states are not ready yet, store state changes in consolidated_state_changes + Ok(ApplySplitStateResultOrStateChanges::StateChangesForSplitStates(state_changes)) + } +}