diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index a4216689100..172bc94a435 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -11,9 +11,13 @@ use crate::state_request_tracker::StateRequestTracker; use crate::state_snapshot_actor::SnapshotCallbacks; use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, GCMode}; use crate::types::{ - AcceptedBlock, ApplySplitStateResult, ApplySplitStateResultOrStateChanges, - ApplyTransactionResult, Block, BlockEconomicsConfig, BlockHeader, BlockStatus, ChainConfig, - ChainGenesis, Provenance, RuntimeAdapter, RuntimeStorageConfig, + AcceptedBlock, ApplySplitStateResultOrStateChanges, ApplyTransactionResult, Block, + BlockEconomicsConfig, BlockHeader, BlockStatus, ChainConfig, ChainGenesis, Provenance, + RuntimeAdapter, RuntimeStorageConfig, +}; +use crate::update_shard::{ + process_shard_update, ApplyChunkResult, BlockContext, NewChunkResult, OldChunkResult, + ShardContext, ShardUpdateReason, StateSplitResult, }; use crate::validate::{ validate_challenge, validate_chunk_proofs, validate_chunk_with_chunk_extra, @@ -63,8 +67,8 @@ use near_primitives::static_clock::StaticClock; use near_primitives::transaction::{ExecutionOutcomeWithIdAndProof, SignedTransaction}; use near_primitives::types::chunk_extra::ChunkExtra; use near_primitives::types::{ - AccountId, Balance, BlockExtra, BlockHeight, BlockHeightDelta, EpochId, Gas, MerkleHash, - NumBlocks, NumShards, ShardId, StateChangesForSplitStates, StateRoot, + AccountId, Balance, BlockExtra, BlockHeight, BlockHeightDelta, EpochId, MerkleHash, NumBlocks, + NumShards, ShardId, StateRoot, }; use near_primitives::unwrap_or_return; use near_primitives::utils::MaybeValidated; @@ -485,13 +489,13 @@ impl Drop for Chain { } } -/// ApplyChunkJob is a closure that is responsible for applying of a single chunk. -/// All of the chunk details and other arguments are already captured within. -type ApplyChunkJob = Box Result + Send + 'static>; +/// UpdateShardJob is a closure that is responsible for updating a shard for a single block. +/// Execution context (latest blocks/chunks details) are already captured within. +type UpdateShardJob = Box Result + Send + 'static>; /// PreprocessBlockResult is a tuple where the first element is a vector of jobs -/// to apply chunks the second element is BlockPreprocessInfo -type PreprocessBlockResult = (Vec, BlockPreprocessInfo); +/// to update shards, the second element is BlockPreprocessInfo +type PreprocessBlockResult = (Vec, BlockPreprocessInfo); // Used only for verify_block_hash_and_signature. See that method. #[derive(Clone, Copy, PartialEq, Eq)] @@ -3488,6 +3492,87 @@ impl Chain { Ok(()) } + /// Validates basic correctness of array of transactions included in chunk. + /// Doesn't require state. + fn validate_chunk_transactions( + &self, + block: &Block, + prev_block_header: &BlockHeader, + chunk: &ShardChunk, + ) -> Result<(), Error> { + if !validate_transactions_order(chunk.transactions()) { + let merkle_paths = Block::compute_chunk_headers_root(block.chunks().iter()).1; + let chunk_proof = ChunkProofs { + block_header: borsh::to_vec(&block.header()).expect("Failed to serialize"), + merkle_proof: merkle_paths[chunk.shard_id() as usize].clone(), + chunk: MaybeEncodedShardChunk::Decoded(chunk.clone()), + }; + return Err(Error::InvalidChunkProofs(Box::new(chunk_proof))); + } + + let protocol_version = + self.epoch_manager.get_epoch_protocol_version(block.header().epoch_id())?; + if checked_feature!("stable", AccessKeyNonceRange, protocol_version) { + let transaction_validity_period = self.transaction_validity_period; + for transaction in chunk.transactions() { + self.store() + .check_transaction_validity_period( + prev_block_header, + &transaction.transaction.block_hash, + transaction_validity_period, + ) + .map_err(|_| Error::from(Error::InvalidTransactions))?; + } + }; + + return Ok(()); + } + + /// For given pair of block headers and shard id, return information about + /// block necessary for processing shard update. + fn get_block_context_for_shard_update( + &self, + block_header: &BlockHeader, + prev_block_header: &BlockHeader, + shard_id: ShardId, + is_new_chunk: bool, + ) -> Result { + let epoch_id = block_header.epoch_id(); + let protocol_version = self.epoch_manager.get_epoch_protocol_version(epoch_id)?; + // Before `FixApplyChunks` feature, gas price was taken from current + // block by mistake. Preserve it for backwards compatibility. + let gas_price = if !is_new_chunk + && protocol_version < ProtocolFeature::FixApplyChunks.protocol_version() + { + block_header.next_gas_price() + } else { + prev_block_header.next_gas_price() + }; + + // This variable is responsible for checking to which block we can apply receipts previously lost in apply_chunks + // (see https://github.com/near/nearcore/pull/4248/) + // We take the first block with existing chunk in the first epoch in which protocol feature + // RestoreReceiptsAfterFixApplyChunks was enabled, and put the restored receipts there. + let is_first_block_with_chunk_of_version = is_new_chunk + && check_if_block_is_first_with_chunk_of_version( + self.store(), + self.epoch_manager.as_ref(), + block_header.prev_hash(), + shard_id, + )?; + + Ok(BlockContext { + block_hash: *block_header.hash(), + prev_block_hash: *block_header.prev_hash(), + challenges_result: block_header.challenges_result().clone(), + block_timestamp: block_header.raw_timestamp(), + gas_price, + height: block_header.height(), + random_seed: *block_header.random_value(), + is_first_block_with_chunk_of_version, + }) + } + fn block_catch_up_postprocess( &mut self, me: &Option, @@ -3786,7 +3871,8 @@ impl Chain { .collect() } - /// Creates jobs that would apply chunks + /// Creates jobs which will update shards for the given block and incoming + /// receipts aggregated for it. fn apply_chunks_preprocessing( &self, me: &Option, @@ -3796,10 +3882,8 @@ impl Chain { mode: ApplyChunksMode, mut state_patch: SandboxStatePatch, invalid_chunks: &mut Vec, - ) -> Result, Error> { + ) -> Result, Error> { let _span = tracing::debug_span!(target: "chain", "apply_chunks_preprocessing").entered(); - let prev_hash = block.header().prev_hash(); - let will_shard_layout_change = self.epoch_manager.will_shard_layout_change(prev_hash)?; let prev_chunk_headers = Chain::get_prev_chunk_headers(self.epoch_manager.as_ref(), prev_block)?; block @@ -3811,21 +3895,18 @@ impl Chain { // XXX: This is a bit questionable -- sandbox state patching works // only for a single shard. This so far has been enough. let state_patch = state_patch.take(); - - let apply_chunk_job = self.get_apply_chunk_job( + let update_shard_job = self.get_update_shard_job( me, block, prev_block, chunk_header, prev_chunk_header, - shard_id, + shard_id as ShardId, mode, - will_shard_layout_change, incoming_receipts, state_patch, ); - - match apply_chunk_job { + match update_shard_job { Ok(Some(processor)) => Some(Ok(processor)), Ok(None) => None, Err(err) => { @@ -3839,26 +3920,25 @@ impl Chain { .collect() } - /// This method returns the closure that is responsible for applying of a single chunk. - fn get_apply_chunk_job( + /// This method returns the closure that is responsible for updating a shard. + fn get_update_shard_job( &self, me: &Option, block: &Block, prev_block: &Block, chunk_header: &ShardChunkHeader, prev_chunk_header: &ShardChunkHeader, - shard_id: usize, + shard_id: ShardId, mode: ApplyChunksMode, - will_shard_layout_change: bool, incoming_receipts: &HashMap>, state_patch: SandboxStatePatch, - ) -> Result, Error> { - let shard_id = shard_id as ShardId; + ) -> Result, Error> { let prev_hash = block.header().prev_hash(); let cares_about_shard_this_epoch = self.shard_tracker.care_about_shard(me.as_ref(), prev_hash, shard_id, true); let cares_about_shard_next_epoch = self.shard_tracker.will_care_about_shard(me.as_ref(), prev_hash, shard_id, true); + let will_shard_layout_change = self.epoch_manager.will_shard_layout_change(prev_hash)?; let should_apply_transactions = get_should_apply_transactions( mode, cares_about_shard_this_epoch, @@ -3888,343 +3968,93 @@ impl Chain { let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, block.header().epoch_id())?; let is_new_chunk = chunk_header.height_included() == block.header().height(); - let epoch_manager = self.epoch_manager.clone(); - let runtime = self.runtime_adapter.clone(); - if should_apply_transactions { + let shard_update_reason = if should_apply_transactions { if is_new_chunk { - self.get_apply_chunk_job_new_chunk( - block, - prev_block, - chunk_header, - prev_chunk_header, - shard_uid, - will_shard_layout_change, - incoming_receipts, - state_patch, - runtime, - epoch_manager, - split_state_roots, + // Validate new chunk and collect incoming receipts for it. + + let prev_chunk_extra = self.get_chunk_extra(prev_hash, &shard_uid)?; + let chunk = self.get_chunk_clone_from_header(&chunk_header)?; + let prev_chunk_height_included = prev_chunk_header.height_included(); + + // Validate that all next chunk information matches previous chunk extra. + validate_chunk_with_chunk_extra( + // It's safe here to use ChainStore instead of ChainStoreUpdate + // because we're asking prev_chunk_header for already committed block + self.store(), + self.epoch_manager.as_ref(), + prev_hash, + prev_chunk_extra.as_ref(), + prev_chunk_height_included, + &chunk_header, ) + .map_err(|err| { + warn!( + target: "chain", + ?err, + prev_block_hash=?prev_hash, + block_hash=?block.header().hash(), + shard_id, + prev_chunk_height_included, + ?prev_chunk_extra, + ?chunk_header, + "Failed to validate chunk extra" + ); + byzantine_assert!(false); + match self.create_chunk_state_challenge(prev_block, block, &chunk_header) { + Ok(chunk_state) => Error::InvalidChunkState(Box::new(chunk_state)), + Err(err) => err, + } + })?; + + self.validate_chunk_transactions(&block, prev_block.header(), &chunk)?; + + // we can't use hash from the current block here yet because the incoming receipts + // for this block is not stored yet + let new_receipts = collect_receipts(incoming_receipts.get(&shard_id).unwrap()); + let old_receipts = &self.store().get_incoming_receipts_for_shard( + self.epoch_manager.as_ref(), + shard_id, + *prev_hash, + prev_chunk_height_included, + )?; + let old_receipts = collect_receipts_from_response(old_receipts); + let receipts = [new_receipts, old_receipts].concat(); + + ShardUpdateReason::NewChunk(chunk, receipts, split_state_roots) } else { - self.get_apply_chunk_job_old_chunk( - block, - prev_block, - shard_uid, - will_shard_layout_change, - state_patch, - runtime, - epoch_manager, + ShardUpdateReason::OldChunk( + ChunkExtra::clone(self.get_chunk_extra(prev_hash, &shard_uid)?.as_ref()), split_state_roots, ) } } else if let Some(split_state_roots) = split_state_roots { - // Case 3), split state are ready. Read the state changes from the - // database and apply them to the split states. assert!(mode == ApplyChunksMode::CatchingUp && cares_about_shard_this_epoch); - self.get_apply_chunk_job_split_state( - block, - shard_uid, - runtime, - epoch_manager, + ShardUpdateReason::StateSplit( split_state_roots, + self.store().get_state_changes_for_split_states(block.hash(), shard_id)?, ) } else { - Ok(None) - } - } - - /// Returns the apply chunk job when applying a new chunk and applying transactions. - fn get_apply_chunk_job_new_chunk( - &self, - block: &Block, - prev_block: &Block, - chunk_header: &ShardChunkHeader, - prev_chunk_header: &ShardChunkHeader, - shard_uid: ShardUId, - will_shard_layout_change: bool, - incoming_receipts: &HashMap>, - state_patch: SandboxStatePatch, - runtime: Arc, - epoch_manager: Arc, - split_state_roots: Option>, - ) -> Result, Error> { - let prev_hash = block.header().prev_hash(); - let shard_id = shard_uid.shard_id(); - - let prev_chunk_height_included = prev_chunk_header.height_included(); - // Validate state root. - let prev_chunk_extra = self.get_chunk_extra(prev_hash, &shard_uid)?; - - // Validate that all next chunk information matches previous chunk extra. - validate_chunk_with_chunk_extra( - // It's safe here to use ChainStore instead of ChainStoreUpdate - // because we're asking prev_chunk_header for already committed block - self.store(), - self.epoch_manager.as_ref(), - prev_hash, - &prev_chunk_extra, - prev_chunk_height_included, - chunk_header, - ) - .map_err(|err| { - warn!( - target: "chain", - ?err, - prev_block_hash=?prev_hash, - block_hash=?block.header().hash(), - shard_id, - prev_chunk_height_included, - ?prev_chunk_extra, - ?chunk_header, - "Failed to validate chunk extra"); - byzantine_assert!(false); - match self.create_chunk_state_challenge(prev_block, block, chunk_header) { - Ok(chunk_state) => Error::InvalidChunkState(Box::new(chunk_state)), - Err(err) => err, - } - })?; - // we can't use hash from the current block here yet because the incoming receipts - // for this block is not stored yet - let new_receipts = collect_receipts(incoming_receipts.get(&shard_id).unwrap()); - let old_receipts = &self.store().get_incoming_receipts_for_shard( - self.epoch_manager.as_ref(), - shard_id, - *prev_hash, - prev_chunk_height_included, - )?; - let old_receipts = collect_receipts_from_response(old_receipts); - let receipts = [new_receipts, old_receipts].concat(); - - let chunk = self.get_chunk_clone_from_header(&chunk_header.clone())?; - - let transactions = chunk.transactions(); - if !validate_transactions_order(transactions) { - let merkle_paths = Block::compute_chunk_headers_root(block.chunks().iter()).1; - let chunk_proof = ChunkProofs { - block_header: borsh::to_vec(&block.header()).expect("Failed to serialize"), - merkle_proof: merkle_paths[shard_id as usize].clone(), - chunk: MaybeEncodedShardChunk::Decoded(chunk), - }; - return Err(Error::InvalidChunkProofs(Box::new(chunk_proof))); - } - - let protocol_version = - self.epoch_manager.get_epoch_protocol_version(block.header().epoch_id())?; - - if checked_feature!("stable", AccessKeyNonceRange, protocol_version) { - let transaction_validity_period = self.transaction_validity_period; - for transaction in transactions { - self.store() - .check_transaction_validity_period( - prev_block.header(), - &transaction.transaction.block_hash, - transaction_validity_period, - ) - .map_err(|_| Error::from(Error::InvalidTransactions))?; - } + return Ok(None); }; - let chunk_inner = chunk.cloned_header().take_inner(); - let gas_limit = chunk_inner.gas_limit(); - - // This variable is responsible for checking to which block we can apply receipts previously lost in apply_chunks - // (see https://github.com/near/nearcore/pull/4248/) - // We take the first block with existing chunk in the first epoch in which protocol feature - // RestoreReceiptsAfterFixApplyChunks was enabled, and put the restored receipts there. - let is_first_block_with_chunk_of_version = check_if_block_is_first_with_chunk_of_version( - self.store(), - epoch_manager.as_ref(), - prev_block.hash(), + let runtime = self.runtime_adapter.clone(); + let epoch_manager = self.epoch_manager.clone(); + let block_context = self.get_block_context_for_shard_update( + block.header(), + prev_block.header(), shard_id, + is_new_chunk, )?; - - let block_hash = *block.hash(); - let block_height = block.header().height(); - let challenges_result = block.header().challenges_result().clone(); - let block_timestamp = block.header().raw_timestamp(); - let next_gas_price = prev_block.header().next_gas_price(); - let random_seed = *block.header().random_value(); - let height = chunk_header.height_included(); - let prev_block_hash = *chunk_header.prev_block_hash(); - Ok(Some(Box::new(move |parent_span| -> Result { - let _span = tracing::debug_span!( - target: "chain", - parent: parent_span, - "new_chunk", - shard_id) - .entered(); - let _timer = CryptoHashTimer::new(chunk.chunk_hash().0); - let storage_config = RuntimeStorageConfig { - state_root: *chunk_inner.prev_state_root(), - use_flat_storage: true, - source: crate::types::StorageDataSource::Db, - state_patch, - record_storage: false, - }; - match runtime.apply_transactions( - shard_id, - storage_config, - height, - block_timestamp, - &prev_block_hash, - &block_hash, - &receipts, - chunk.transactions(), - chunk_inner.prev_validator_proposals(), - next_gas_price, - gas_limit, - &challenges_result, - random_seed, - true, - is_first_block_with_chunk_of_version, - ) { - Ok(apply_result) => { - let apply_split_result_or_state_changes = if will_shard_layout_change { - Some(ChainUpdate::apply_split_state_changes( - epoch_manager.as_ref(), - runtime.as_ref(), - &block_hash, - block_height, - &prev_block_hash, - &apply_result, - split_state_roots, - )?) - } else { - None - }; - Ok(ApplyChunkResult::SameHeight(SameHeightResult { - gas_limit, - shard_uid, - apply_result, - apply_split_result_or_state_changes, - })) - } - Err(err) => Err(err), - } - }))) - } - - /// Returns the apply chunk job when applying an old chunk and applying transactions. - fn get_apply_chunk_job_old_chunk( - &self, - block: &Block, - prev_block: &Block, - shard_uid: ShardUId, - will_shard_layout_change: bool, - state_patch: SandboxStatePatch, - runtime: Arc, - epoch_manager: Arc, - split_state_roots: Option>, - ) -> Result, Error> { - let shard_id = shard_uid.shard_id(); - let prev_block_hash = *prev_block.hash(); - let new_extra = self.get_chunk_extra(&prev_block_hash, &shard_uid)?; - - let block_hash = *block.hash(); - let block_height = block.header().height(); - let challenges_result = block.header().challenges_result().clone(); - let block_timestamp = block.header().raw_timestamp(); - let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&prev_block_hash)?; - let protocol_version = self.epoch_manager.get_epoch_protocol_version(&epoch_id)?; - - let next_gas_price = - if protocol_version >= ProtocolFeature::FixApplyChunks.protocol_version() { - prev_block.header().next_gas_price() - } else { - block.header().next_gas_price() - }; - let random_seed = *block.header().random_value(); - let height = block.header().height(); - - Ok(Some(Box::new(move |parent_span| -> Result { - let _span = tracing::debug_span!( - target: "chain", - parent: parent_span, - "existing_chunk", - shard_id) - .entered(); - let storage_config = RuntimeStorageConfig { - state_root: *new_extra.state_root(), - use_flat_storage: true, - source: crate::types::StorageDataSource::Db, + Ok(process_shard_update( + parent_span, + runtime.as_ref(), + epoch_manager.as_ref(), + shard_update_reason, + block_context, + ShardContext { shard_uid, will_shard_layout_change }, state_patch, - record_storage: false, - }; - match runtime.apply_transactions( - shard_id, - storage_config, - height, - block_timestamp, - &prev_block_hash, - &block_hash, - &[], - &[], - new_extra.validator_proposals(), - next_gas_price, - new_extra.gas_limit(), - &challenges_result, - random_seed, - false, - false, - ) { - Ok(apply_result) => { - let apply_split_result_or_state_changes = if will_shard_layout_change { - Some(ChainUpdate::apply_split_state_changes( - epoch_manager.as_ref(), - runtime.as_ref(), - &block_hash, - block_height, - &prev_block_hash, - &apply_result, - split_state_roots, - )?) - } else { - None - }; - Ok(ApplyChunkResult::DifferentHeight(DifferentHeightResult { - shard_uid, - apply_result, - apply_split_result_or_state_changes, - })) - } - Err(err) => Err(err), - } - }))) - } - - /// Returns the apply chunk job when just splitting state but not applying transactions. - fn get_apply_chunk_job_split_state( - &self, - block: &Block, - shard_uid: ShardUId, - runtime: Arc, - epoch_manager: Arc, - split_state_roots: HashMap, - ) -> Result, Error> { - let shard_id = shard_uid.shard_id(); - let next_epoch_shard_layout = - epoch_manager.get_shard_layout(block.header().next_epoch_id())?; - let state_changes = - self.store().get_state_changes_for_split_states(block.hash(), shard_id)?; - let block_hash = *block.hash(); - let block_height = block.header().height(); - Ok(Some(Box::new(move |parent_span| -> Result { - let _span = tracing::debug_span!( - target: "chain", - parent: parent_span, - "split_state", - shard_id, - ?shard_uid) - .entered(); - let results = runtime.apply_update_to_split_states( - &block_hash, - block_height, - split_state_roots, - &next_epoch_shard_layout, - state_changes, - )?; - Ok(ApplyChunkResult::SplitState(SplitStateResult { shard_uid, results })) + )?) }))) } @@ -4965,35 +4795,6 @@ pub struct ChainUpdate<'a> { transaction_validity_period: BlockHeightDelta, } -#[derive(Debug)] -pub struct SameHeightResult { - shard_uid: ShardUId, - gas_limit: Gas, - apply_result: ApplyTransactionResult, - apply_split_result_or_state_changes: Option, -} - -#[derive(Debug)] -pub struct DifferentHeightResult { - shard_uid: ShardUId, - apply_result: ApplyTransactionResult, - apply_split_result_or_state_changes: Option, -} - -#[derive(Debug)] -pub struct SplitStateResult { - // parent shard of the split states - shard_uid: ShardUId, - results: Vec, -} - -#[derive(Debug)] -pub enum ApplyChunkResult { - SameHeight(SameHeightResult), - DifferentHeight(DifferentHeightResult), - SplitState(SplitStateResult), -} - impl<'a> ChainUpdate<'a> { pub fn new( store: &'a mut ChainStore, @@ -5105,45 +4906,6 @@ impl<'a> ChainUpdate<'a> { Ok(()) } - /// Process ApplyTransactionResult to apply changes to split states - /// When shards will change next epoch, - /// if `split_state_roots` is not None, that means states for the split shards are ready - /// this function updates these states and return apply results for these states - /// otherwise, this function returns state changes needed to be applied to split - /// states. These state changes will be stored in the database by `process_split_state` - fn apply_split_state_changes( - epoch_manager: &dyn EpochManagerAdapter, - runtime_adapter: &dyn RuntimeAdapter, - block_hash: &CryptoHash, - block_height: BlockHeight, - prev_block_hash: &CryptoHash, - apply_result: &ApplyTransactionResult, - split_state_roots: Option>, - ) -> Result { - let state_changes = StateChangesForSplitStates::from_raw_state_changes( - apply_result.trie_changes.state_changes(), - apply_result.processed_delayed_receipts.clone(), - ); - let next_epoch_shard_layout = { - let next_epoch_id = epoch_manager.get_next_epoch_id_from_prev_block(prev_block_hash)?; - epoch_manager.get_shard_layout(&next_epoch_id)? - }; - // split states are ready, apply update to them now - if let Some(state_roots) = split_state_roots { - let split_state_results = runtime_adapter.apply_update_to_split_states( - block_hash, - block_height, - state_roots, - &next_epoch_shard_layout, - state_changes, - )?; - Ok(ApplySplitStateResultOrStateChanges::ApplySplitStateResults(split_state_results)) - } else { - // split states are not ready yet, store state changes in consolidated_state_changes - Ok(ApplySplitStateResultOrStateChanges::StateChangesForSplitStates(state_changes)) - } - } - /// Postprocess split state results or state changes, do the necessary update on chain /// for split state results: store the chunk extras and trie changes for the split states /// for state changes, store the state changes for splitting states @@ -5294,7 +5056,7 @@ impl<'a> ChainUpdate<'a> { let prev_hash = block.header().prev_hash(); let height = block.header().height(); match result { - ApplyChunkResult::SameHeight(SameHeightResult { + ApplyChunkResult::NewChunk(NewChunkResult { gas_limit, shard_uid, apply_result, @@ -5345,7 +5107,7 @@ impl<'a> ChainUpdate<'a> { self.process_split_state(block, &shard_uid, apply_results_or_state_changes)?; } } - ApplyChunkResult::DifferentHeight(DifferentHeightResult { + ApplyChunkResult::OldChunk(OldChunkResult { shard_uid, apply_result, apply_split_result_or_state_changes, @@ -5372,7 +5134,7 @@ impl<'a> ChainUpdate<'a> { self.process_split_state(block, &shard_uid, apply_results_or_state_changes)?; } } - ApplyChunkResult::SplitState(SplitStateResult { shard_uid, results }) => { + ApplyChunkResult::StateSplit(StateSplitResult { shard_uid, results }) => { self.chain_store_update .remove_state_changes_for_split_states(*block.hash(), shard_uid.shard_id()); self.process_split_state( diff --git a/chain/chain/src/lib.rs b/chain/chain/src/lib.rs index dca14596187..bcf3be21320 100644 --- a/chain/chain/src/lib.rs +++ b/chain/chain/src/lib.rs @@ -30,6 +30,7 @@ pub mod validate; #[cfg(test)] mod tests; +mod update_shard; #[cfg(feature = "byzantine_asserts")] #[macro_export] diff --git a/chain/chain/src/update_shard.rs b/chain/chain/src/update_shard.rs new file mode 100644 index 00000000000..8e119033206 --- /dev/null +++ b/chain/chain/src/update_shard.rs @@ -0,0 +1,345 @@ +use crate::crypto_hash_timer::CryptoHashTimer; +use crate::types::{ + ApplySplitStateResult, ApplySplitStateResultOrStateChanges, ApplyTransactionResult, + RuntimeAdapter, RuntimeStorageConfig, +}; +use near_chain_primitives::Error; +use near_epoch_manager::EpochManagerAdapter; +use near_primitives::challenge::ChallengesResult; +use near_primitives::hash::CryptoHash; +use near_primitives::receipt::Receipt; +use near_primitives::sandbox::state_patch::SandboxStatePatch; +use near_primitives::shard_layout::ShardUId; +use near_primitives::sharding::ShardChunk; +use near_primitives::types::chunk_extra::ChunkExtra; +use near_primitives::types::{Balance, BlockHeight, Gas, StateChangesForSplitStates, StateRoot}; +use std::collections::HashMap; + +/// Information about block for which shard is updated. +/// Use cases include: +/// - queries to epoch manager +/// - allowing contracts to get current chain data +#[derive(Clone, Debug)] +pub(crate) struct BlockContext { + pub block_hash: CryptoHash, + pub prev_block_hash: CryptoHash, + pub challenges_result: ChallengesResult, + pub block_timestamp: u64, + pub gas_price: Balance, + pub height: BlockHeight, + pub random_seed: CryptoHash, + pub is_first_block_with_chunk_of_version: bool, +} + +/// Result of updating a shard for some block when it has a new chunk for this +/// shard. +#[derive(Debug)] +pub struct NewChunkResult { + pub(crate) shard_uid: ShardUId, + pub(crate) gas_limit: Gas, + pub(crate) apply_result: ApplyTransactionResult, + pub(crate) apply_split_result_or_state_changes: Option, +} + +/// Result of updating a shard for some block when it doesn't have a new chunk +/// for this shard, so previous chunk header is copied. +#[derive(Debug)] +pub struct OldChunkResult { + pub(crate) shard_uid: ShardUId, + /// Note that despite the naming, no transactions are applied in this case. + /// TODO(logunov): exclude receipts/txs context from all related types. + pub(crate) apply_result: ApplyTransactionResult, + pub(crate) apply_split_result_or_state_changes: Option, +} + +/// Result of updating a shard for some block when we apply only split state +/// changes due to resharding. +#[derive(Debug)] +pub struct StateSplitResult { + // parent shard of the split states + pub(crate) shard_uid: ShardUId, + pub(crate) results: Vec, +} + +#[derive(Debug)] +pub enum ApplyChunkResult { + NewChunk(NewChunkResult), + OldChunk(OldChunkResult), + StateSplit(StateSplitResult), +} + +/// State roots of split shards which are ready. +type SplitStateRoots = HashMap; + +/// Reason to update a shard when new block appears on chain. +/// All types include state roots for split shards in case of resharding. +pub(crate) enum ShardUpdateReason { + /// Block has a new chunk for the shard. + /// Contains chunk itself and all new incoming receipts to the shard. + NewChunk(ShardChunk, Vec, Option), + /// Block doesn't have a new chunk for the shard. + /// Instead, previous chunk header is copied. + /// Contains result of shard update for previous block. + OldChunk(ChunkExtra, Option), + /// See comment to `split_state_roots` in `Chain::get_update_shard_job`. + /// Process only state changes caused by resharding. + StateSplit(SplitStateRoots, StateChangesForSplitStates), +} + +/// Information about shard to update. +pub(crate) struct ShardContext { + pub shard_uid: ShardUId, + /// Whether shard layout changes in the next epoch. + pub will_shard_layout_change: bool, +} + +/// Processes shard update with given block and shard. +/// Doesn't modify chain, only produces result to be applied later. +pub(crate) fn process_shard_update( + parent_span: &tracing::Span, + runtime: &dyn RuntimeAdapter, + epoch_manager: &dyn EpochManagerAdapter, + shard_update_reason: ShardUpdateReason, + block_context: BlockContext, + shard_context: ShardContext, + state_patch: SandboxStatePatch, +) -> Result { + match shard_update_reason { + ShardUpdateReason::NewChunk(chunk, receipts, split_state_roots) => apply_new_chunk( + parent_span, + block_context, + chunk, + shard_context, + receipts, + state_patch, + runtime, + epoch_manager, + split_state_roots, + ), + ShardUpdateReason::OldChunk(prev_chunk_extra, split_state_roots) => apply_old_chunk( + parent_span, + block_context, + &prev_chunk_extra, + shard_context, + state_patch, + runtime, + epoch_manager, + split_state_roots, + ), + ShardUpdateReason::StateSplit(split_state_roots, state_changes) => apply_state_split( + parent_span, + block_context, + shard_context.shard_uid, + runtime, + epoch_manager, + split_state_roots, + state_changes, + ), + } +} + +/// Applies new chunk, which includes applying transactions from chunk and +/// receipts filtered from outgoing receipts from previous chunks. +fn apply_new_chunk( + parent_span: &tracing::Span, + block_context: BlockContext, + chunk: ShardChunk, + shard_info: ShardContext, + receipts: Vec, + state_patch: SandboxStatePatch, + runtime: &dyn RuntimeAdapter, + epoch_manager: &dyn EpochManagerAdapter, + split_state_roots: Option, +) -> Result { + let shard_id = shard_info.shard_uid.shard_id(); + let _span = tracing::debug_span!( + target: "chain", + parent: parent_span, + "new_chunk", + shard_id) + .entered(); + let chunk_inner = chunk.cloned_header().take_inner(); + let gas_limit = chunk_inner.gas_limit(); + + let _timer = CryptoHashTimer::new(chunk.chunk_hash().0); + let storage_config = RuntimeStorageConfig { + state_root: *chunk_inner.prev_state_root(), + use_flat_storage: true, + source: crate::types::StorageDataSource::Db, + state_patch, + record_storage: false, + }; + match runtime.apply_transactions( + shard_id, + storage_config, + block_context.height, + block_context.block_timestamp, + &block_context.prev_block_hash, + &block_context.block_hash, + &receipts, + chunk.transactions(), + chunk_inner.prev_validator_proposals(), + block_context.gas_price, + gas_limit, + &block_context.challenges_result, + block_context.random_seed, + true, + block_context.is_first_block_with_chunk_of_version, + ) { + Ok(apply_result) => { + let apply_split_result_or_state_changes = if shard_info.will_shard_layout_change { + Some(apply_split_state_changes( + epoch_manager, + runtime, + block_context, + &apply_result, + split_state_roots, + )?) + } else { + None + }; + Ok(ApplyChunkResult::NewChunk(NewChunkResult { + gas_limit, + shard_uid: shard_info.shard_uid, + apply_result, + apply_split_result_or_state_changes, + })) + } + Err(err) => Err(err), + } +} + +/// Applies shard update corresponding to missing chunk. +/// (logunov) From what I know, the state update may include only validator +/// accounts update on epoch start. +fn apply_old_chunk( + parent_span: &tracing::Span, + block_context: BlockContext, + prev_chunk_extra: &ChunkExtra, + shard_info: ShardContext, + state_patch: SandboxStatePatch, + runtime: &dyn RuntimeAdapter, + epoch_manager: &dyn EpochManagerAdapter, + split_state_roots: Option, +) -> Result { + let shard_id = shard_info.shard_uid.shard_id(); + let _span = tracing::debug_span!( + target: "chain", + parent: parent_span, + "existing_chunk", + shard_id) + .entered(); + + let storage_config = RuntimeStorageConfig { + state_root: *prev_chunk_extra.state_root(), + use_flat_storage: true, + source: crate::types::StorageDataSource::Db, + state_patch, + record_storage: false, + }; + match runtime.apply_transactions( + shard_id, + storage_config, + block_context.height, + block_context.block_timestamp, + &block_context.prev_block_hash, + &block_context.block_hash, + &[], + &[], + prev_chunk_extra.validator_proposals(), + block_context.gas_price, + prev_chunk_extra.gas_limit(), + &block_context.challenges_result, + block_context.random_seed, + false, + false, + ) { + Ok(apply_result) => { + let apply_split_result_or_state_changes = if shard_info.will_shard_layout_change { + Some(apply_split_state_changes( + epoch_manager, + runtime, + block_context, + &apply_result, + split_state_roots, + )?) + } else { + None + }; + Ok(ApplyChunkResult::OldChunk(OldChunkResult { + shard_uid: shard_info.shard_uid, + apply_result, + apply_split_result_or_state_changes, + })) + } + Err(err) => Err(err), + } +} + +/// Applies only split state changes but not applies any transactions. +fn apply_state_split( + parent_span: &tracing::Span, + block_context: BlockContext, + shard_uid: ShardUId, + runtime: &dyn RuntimeAdapter, + epoch_manager: &dyn EpochManagerAdapter, + split_state_roots: SplitStateRoots, + state_changes: StateChangesForSplitStates, +) -> Result { + let shard_id = shard_uid.shard_id(); + let _span = tracing::debug_span!( + target: "chain", + parent: parent_span, + "split_state", + shard_id, + ?shard_uid) + .entered(); + let next_epoch_id = epoch_manager.get_next_epoch_id(&block_context.block_hash)?; + let next_epoch_shard_layout = epoch_manager.get_shard_layout(&next_epoch_id)?; + let results = runtime.apply_update_to_split_states( + &block_context.block_hash, + block_context.height, + split_state_roots, + &next_epoch_shard_layout, + state_changes, + )?; + Ok(ApplyChunkResult::StateSplit(StateSplitResult { shard_uid, results })) +} + +/// Process ApplyTransactionResult to apply changes to split states +/// When shards will change next epoch, +/// if `split_state_roots` is not None, that means states for the split shards are ready +/// this function updates these states and return apply results for these states +/// otherwise, this function returns state changes needed to be applied to split +/// states. These state changes will be stored in the database by `process_split_state` +fn apply_split_state_changes( + epoch_manager: &dyn EpochManagerAdapter, + runtime_adapter: &dyn RuntimeAdapter, + block_context: BlockContext, + apply_result: &ApplyTransactionResult, + split_state_roots: Option, +) -> Result { + let state_changes = StateChangesForSplitStates::from_raw_state_changes( + apply_result.trie_changes.state_changes(), + apply_result.processed_delayed_receipts.clone(), + ); + let next_epoch_shard_layout = { + let next_epoch_id = + epoch_manager.get_next_epoch_id_from_prev_block(&block_context.prev_block_hash)?; + epoch_manager.get_shard_layout(&next_epoch_id)? + }; + // split states are ready, apply update to them now + if let Some(state_roots) = split_state_roots { + let split_state_results = runtime_adapter.apply_update_to_split_states( + &block_context.block_hash, + block_context.height, + state_roots, + &next_epoch_shard_layout, + state_changes, + )?; + Ok(ApplySplitStateResultOrStateChanges::ApplySplitStateResults(split_state_results)) + } else { + // split states are not ready yet, store state changes in consolidated_state_changes + Ok(ApplySplitStateResultOrStateChanges::StateChangesForSplitStates(state_changes)) + } +}