From db8f248d96988fbd1d02fb2d94d5e8bdb249167b Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Mon, 2 Dec 2024 11:35:40 +0200 Subject: [PATCH 01/17] big state witness --- .../stateless_validation/chunk_validation.rs | 56 +++++++++---------- .../chain/src/stateless_validation/metrics.rs | 10 ++-- chain/chain/src/store/latest_witnesses.rs | 12 ++-- .../chunk_validator/mod.rs | 24 ++++---- .../orphan_witness_handling.rs | 4 +- .../chunk_validator/orphan_witness_pool.rs | 12 ++-- .../partial_witness/partial_witness_actor.rs | 10 ++-- .../partial_witness_tracker.rs | 5 +- .../state_witness_tracker.rs | 14 ++++- chain/client/src/test_utils/test_env.rs | 6 +- .../src/stateless_validation/state_witness.rs | 53 +++++++++++++++++- .../features/orphan_chunk_state_witness.rs | 6 +- tools/state-viewer/src/latest_witnesses.rs | 12 ++-- 13 files changed, 141 insertions(+), 83 deletions(-) diff --git a/chain/chain/src/stateless_validation/chunk_validation.rs b/chain/chain/src/stateless_validation/chunk_validation.rs index 4dfca1c3cb5..30288d508ea 100644 --- a/chain/chain/src/stateless_validation/chunk_validation.rs +++ b/chain/chain/src/stateless_validation/chunk_validation.rs @@ -174,11 +174,11 @@ fn get_state_witness_block_range( last_chunk_shard_index: usize, } - let initial_prev_hash = *state_witness.chunk_header.prev_block_hash(); + let initial_prev_hash = *state_witness.inner.chunk_header.prev_block_hash(); let initial_prev_block = store.get_block(&initial_prev_hash)?; let initial_shard_layout = epoch_manager.get_shard_layout_from_prev_block(&initial_prev_hash)?; - let initial_shard_id = state_witness.chunk_header.shard_id(); + let initial_shard_id = state_witness.inner.chunk_header.shard_id(); let initial_shard_index = initial_shard_layout.get_shard_index(initial_shard_id)?; let mut position = TraversalPosition { @@ -322,8 +322,8 @@ pub fn pre_validate_chunk_state_witness( // Ensure that the chunk header version is supported in this protocol version let protocol_version = - epoch_manager.get_epoch_info(&state_witness.epoch_id)?.protocol_version(); - state_witness.chunk_header.validate_version(protocol_version)?; + epoch_manager.get_epoch_info(&state_witness.inner.epoch_id)?.protocol_version(); + state_witness.inner.chunk_header.validate_version(protocol_version)?; // First, go back through the blockchain history to locate the last new chunk // and last last new chunk for the shard. @@ -336,18 +336,18 @@ pub fn pre_validate_chunk_state_witness( let receipts_to_apply = validate_source_receipt_proofs( epoch_manager, - &state_witness.source_receipt_proofs, + &state_witness.inner.source_receipt_proofs, &blocks_after_last_last_chunk, last_chunk_shard_id, )?; let applied_receipts_hash = hash(&borsh::to_vec(receipts_to_apply.as_slice()).unwrap()); - if applied_receipts_hash != state_witness.applied_receipts_hash { + if applied_receipts_hash != state_witness.inner.applied_receipts_hash { return Err(Error::InvalidChunkStateWitness(format!( "Receipts hash {:?} does not match expected receipts hash {:?}", - applied_receipts_hash, state_witness.applied_receipts_hash + applied_receipts_hash, state_witness.inner.applied_receipts_hash ))); } - let (tx_root_from_state_witness, _) = merklize(&state_witness.transactions); + let (tx_root_from_state_witness, _) = merklize(&state_witness.inner.transactions); let last_chunk_block = blocks_after_last_last_chunk.first().ok_or_else(|| { Error::Other("blocks_after_last_last_chunk is empty, this should be impossible!".into()) })?; @@ -361,15 +361,15 @@ pub fn pre_validate_chunk_state_witness( } let current_protocol_version = - epoch_manager.get_epoch_protocol_version(&state_witness.epoch_id)?; + epoch_manager.get_epoch_protocol_version(&state_witness.inner.epoch_id)?; if !checked_feature!( "protocol_feature_relaxed_chunk_validation", RelaxedChunkValidation, current_protocol_version ) { - let new_transactions = &state_witness.new_transactions; + let new_transactions = &state_witness.inner.new_transactions; let (new_tx_root_from_state_witness, _) = merklize(&new_transactions); - let chunk_tx_root = state_witness.chunk_header.tx_root(); + let chunk_tx_root = state_witness.inner.chunk_header.tx_root(); if new_tx_root_from_state_witness != chunk_tx_root { return Err(Error::InvalidChunkStateWitness(format!( "Witness new transactions root {:?} does not match chunk {:?}", @@ -379,10 +379,10 @@ pub fn pre_validate_chunk_state_witness( // Verify that all proposed transactions are valid. if !new_transactions.is_empty() { let transactions_validation_storage_config = RuntimeStorageConfig { - state_root: state_witness.chunk_header.prev_state_root(), + state_root: state_witness.inner.chunk_header.prev_state_root(), use_flat_storage: true, source: StorageDataSource::Recorded(PartialStorage { - nodes: state_witness.new_transactions_validation_state.clone(), + nodes: state_witness.inner.new_transactions_validation_state.clone(), }), state_patch: Default::default(), }; @@ -390,10 +390,10 @@ pub fn pre_validate_chunk_state_witness( match validate_prepared_transactions( chain, runtime_adapter, - &state_witness.chunk_header, + &state_witness.inner.chunk_header, transactions_validation_storage_config, &new_transactions, - &state_witness.transactions, + &state_witness.inner.transactions, ) { Ok(result) => { if result.transactions.len() != new_transactions.len() { @@ -437,7 +437,7 @@ pub fn pre_validate_chunk_state_witness( } else { MainTransition::NewChunk(NewChunkData { chunk_header: last_chunk_block.chunks().get(last_chunk_shard_index).unwrap().clone(), - transactions: state_witness.transactions.clone(), + transactions: state_witness.inner.transactions.clone(), receipts: receipts_to_apply, block: Chain::get_apply_chunk_block_context( epoch_manager, @@ -448,7 +448,7 @@ pub fn pre_validate_chunk_state_witness( is_first_block_with_chunk_of_version: false, storage_context: StorageContext { storage_data_source: StorageDataSource::Recorded(PartialStorage { - nodes: state_witness.main_state_transition.base_state.clone(), + nodes: state_witness.inner.main_state_transition.base_state.clone(), }), state_patch: Default::default(), }, @@ -577,7 +577,7 @@ pub fn validate_chunk_state_witness( main_state_transition_cache: &MainStateTransitionCache, ) -> Result<(), Error> { let _timer = crate::stateless_validation::metrics::CHUNK_STATE_WITNESS_VALIDATION_TIME - .with_label_values(&[&state_witness.chunk_header.shard_id().to_string()]) + .with_label_values(&[&state_witness.inner.chunk_header.shard_id().to_string()]) .start_timer(); let span = tracing::debug_span!(target: "client", "validate_chunk_state_witness").entered(); let block_hash = pre_validation_output.main_transition_params.block_hash(); @@ -609,21 +609,21 @@ pub fn validate_chunk_state_witness( } (_, Some(result)) => (result.chunk_extra, result.outgoing_receipts), }; - if chunk_extra.state_root() != &state_witness.main_state_transition.post_state_root { + if chunk_extra.state_root() != &state_witness.inner.main_state_transition.post_state_root { // This is an early check, it's not for correctness, only for better // error reporting in case of an invalid state witness due to a bug. // Only the final state root check against the chunk header is required. return Err(Error::InvalidChunkStateWitness(format!( "Post state root {:?} for main transition does not match expected post state root {:?}", chunk_extra.state_root(), - state_witness.main_state_transition.post_state_root, + state_witness.inner.main_state_transition.post_state_root, ))); } // Compute receipt hashes here to avoid copying receipts let outgoing_receipts_hashes = { let shard_layout = epoch_manager - .get_shard_layout_from_prev_block(state_witness.chunk_header.prev_block_hash())?; + .get_shard_layout_from_prev_block(state_witness.inner.chunk_header.prev_block_hash())?; Chain::build_receipts_hashes(&outgoing_receipts, &shard_layout) }; // Save main state transition result to cache. @@ -642,19 +642,19 @@ pub fn validate_chunk_state_witness( } if pre_validation_output.implicit_transition_params.len() - != state_witness.implicit_transitions.len() + != state_witness.inner.implicit_transitions.len() { return Err(Error::InvalidChunkStateWitness(format!( "Implicit transitions count mismatch. Expected {}, found {}", pre_validation_output.implicit_transition_params.len(), - state_witness.implicit_transitions.len(), + state_witness.inner.implicit_transitions.len(), ))); } for (implicit_transition_params, transition) in pre_validation_output .implicit_transition_params .into_iter() - .zip(state_witness.implicit_transitions.into_iter()) + .zip(state_witness.inner.implicit_transitions.into_iter()) { let (shard_uid, new_state_root) = match implicit_transition_params { ImplicitTransitionParams::ApplyOldChunk(block, shard_uid) => { @@ -710,7 +710,7 @@ pub fn validate_chunk_state_witness( let (outgoing_receipts_root, _) = merklize(&outgoing_receipts_hashes); validate_chunk_with_chunk_extra_and_receipts_root( &chunk_extra, - &state_witness.chunk_header, + &state_witness.inner.chunk_header, &outgoing_receipts_root, )?; @@ -744,9 +744,9 @@ impl Chain { runtime_adapter: &dyn RuntimeAdapter, processing_done_tracker: Option, ) -> Result<(), Error> { - let shard_id = witness.chunk_header.shard_id(); - let height_created = witness.chunk_header.height_created(); - let chunk_hash = witness.chunk_header.chunk_hash(); + let shard_id = witness.inner.chunk_header.shard_id(); + let height_created = witness.inner.chunk_header.height_created(); + let chunk_hash = witness.inner.chunk_header.chunk_hash(); let parent_span = tracing::debug_span!( target: "chain", "shadow_validate", ?shard_id, height_created); let (encoded_witness, raw_witness_size) = { diff --git a/chain/chain/src/stateless_validation/metrics.rs b/chain/chain/src/stateless_validation/metrics.rs index 7afb60db795..98264f5838b 100644 --- a/chain/chain/src/stateless_validation/metrics.rs +++ b/chain/chain/src/stateless_validation/metrics.rs @@ -179,7 +179,7 @@ fn record_witness_size_metrics_fallible( encoded_size: usize, witness: &ChunkStateWitness, ) -> Result<(), std::io::Error> { - let shard_id = witness.chunk_header.shard_id().to_string(); + let shard_id = witness.inner.chunk_header.shard_id().to_string(); CHUNK_STATE_WITNESS_RAW_SIZE .with_label_values(&[shard_id.as_str()]) .observe(decoded_size as f64); @@ -188,16 +188,16 @@ fn record_witness_size_metrics_fallible( .observe(encoded_size as f64); CHUNK_STATE_WITNESS_MAIN_STATE_TRANSISTION_SIZE .with_label_values(&[shard_id.as_str()]) - .observe(borsh::to_vec(&witness.main_state_transition)?.len() as f64); + .observe(borsh::to_vec(&witness.inner.main_state_transition)?.len() as f64); CHUNK_STATE_WITNESS_NEW_TRANSACTIONS_SIZE .with_label_values(&[&shard_id.as_str()]) - .observe(borsh::to_vec(&witness.new_transactions)?.len() as f64); + .observe(borsh::to_vec(&witness.inner.new_transactions)?.len() as f64); CHUNK_STATE_WITNESS_NEW_TRANSACTIONS_STATE_SIZE .with_label_values(&[&shard_id.as_str()]) - .observe(borsh::to_vec(&witness.new_transactions_validation_state)?.len() as f64); + .observe(borsh::to_vec(&witness.inner.new_transactions_validation_state)?.len() as f64); CHUNK_STATE_WITNESS_SOURCE_RECEIPT_PROOFS_SIZE .with_label_values(&[&shard_id.as_str()]) - .observe(borsh::to_vec(&witness.source_receipt_proofs)?.len() as f64); + .observe(borsh::to_vec(&witness.inner.source_receipt_proofs)?.len() as f64); Ok(()) } diff --git a/chain/chain/src/store/latest_witnesses.rs b/chain/chain/src/store/latest_witnesses.rs index 2b61626b749..3757c7b70f0 100644 --- a/chain/chain/src/store/latest_witnesses.rs +++ b/chain/chain/src/store/latest_witnesses.rs @@ -112,8 +112,8 @@ impl ChainStore { let _span = tracing::info_span!( target: "client", "save_latest_chunk_state_witness", - witness_height = witness.chunk_header.height_created(), - witness_shard = ?witness.chunk_header.shard_id(), + witness_height = witness.inner.chunk_header.height_created(), + witness_shard = ?witness.inner.chunk_header.shard_id(), ) .entered(); @@ -172,9 +172,9 @@ impl ChainStore { let mut random_uuid = [0u8; 16]; OsRng.fill_bytes(&mut random_uuid); let key = LatestWitnessesKey { - height: witness.chunk_header.height_created(), - shard_id: witness.chunk_header.shard_id().into(), - epoch_id: witness.epoch_id, + height: witness.inner.chunk_header.height_created(), + shard_id: witness.inner.chunk_header.shard_id().into(), + epoch_id: witness.inner.epoch_id, witness_size: serialized_witness_size, random_uuid, }; @@ -195,7 +195,7 @@ impl ChainStore { let store_commit_time = start_time.elapsed().saturating_sub(store_update_time); - let shard_id_str = witness.chunk_header.shard_id().to_string(); + let shard_id_str = witness.inner.chunk_header.shard_id().to_string(); stateless_validation::metrics::SAVE_LATEST_WITNESS_GENERATE_UPDATE_TIME .with_label_values(&[shard_id_str.as_str()]) .observe(store_update_time.as_secs_f64()); diff --git a/chain/client/src/stateless_validation/chunk_validator/mod.rs b/chain/client/src/stateless_validation/chunk_validator/mod.rs index 4cad6d7ef8a..4e860456946 100644 --- a/chain/client/src/stateless_validation/chunk_validator/mod.rs +++ b/chain/client/src/stateless_validation/chunk_validator/mod.rs @@ -75,13 +75,13 @@ impl ChunkValidator { processing_done_tracker: Option, signer: &Arc, ) -> Result<(), Error> { - let prev_block_hash = state_witness.chunk_header.prev_block_hash(); - let shard_id = state_witness.chunk_header.shard_id(); + let prev_block_hash = state_witness.inner.chunk_header.prev_block_hash(); + let shard_id = state_witness.inner.chunk_header.shard_id(); let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(prev_block_hash)?; - if epoch_id != state_witness.epoch_id { + if epoch_id != state_witness.inner.epoch_id { return Err(Error::InvalidChunkStateWitness(format!( "Invalid EpochId {:?} for previous block {}, expected {:?}", - state_witness.epoch_id, prev_block_hash, epoch_id + state_witness.inner.epoch_id, prev_block_hash, epoch_id ))); } @@ -92,7 +92,7 @@ impl ChunkValidator { self.runtime_adapter.as_ref(), )?; - let chunk_header = state_witness.chunk_header.clone(); + let chunk_header = state_witness.inner.chunk_header.clone(); let network_sender = self.network_sender.clone(); let epoch_manager = self.epoch_manager.clone(); @@ -230,8 +230,8 @@ impl Client { ) -> Result<(), Error> { tracing::debug!( target: "client", - chunk_hash=?witness.chunk_header.chunk_hash(), - shard_id=?witness.chunk_header.shard_id(), + chunk_hash=?witness.inner.chunk_header.chunk_hash(), + shard_id=?witness.inner.chunk_header.shard_id(), "process_chunk_state_witness", ); @@ -252,7 +252,7 @@ impl Client { self.chain.chain_store.save_latest_chunk_state_witness(&witness)?; } - match self.chain.get_block(witness.chunk_header.prev_block_hash()) { + match self.chain.get_block(witness.inner.chunk_header.prev_block_hash()) { Ok(block) => self.process_chunk_state_witness_with_prev_block( witness, &block, @@ -273,7 +273,7 @@ impl Client { // produced the witness. However some tests bypass PartialWitnessActor, thus when a chunk producer // receives its own state witness, we log a warning instead of panicking. // TODO: Make sure all tests run with "test_features" and panic for non-test builds. - if signer.validator_id() == &witness.chunk_producer { + if signer.validator_id() == &witness.inner.chunk_producer { tracing::warn!( "Validator {:?} received state witness from itself. Witness={:?}", signer.validator_id(), @@ -283,7 +283,7 @@ impl Client { } self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( NetworkRequests::ChunkStateWitnessAck( - witness.chunk_producer.clone(), + witness.inner.chunk_producer.clone(), ChunkStateWitnessAck::new(witness), ), )); @@ -296,10 +296,10 @@ impl Client { processing_done_tracker: Option, signer: &Arc, ) -> Result<(), Error> { - if witness.chunk_header.prev_block_hash() != prev_block.hash() { + if witness.inner.chunk_header.prev_block_hash() != prev_block.hash() { return Err(Error::Other(format!( "process_chunk_state_witness_with_prev_block - prev_block doesn't match ({} != {})", - witness.chunk_header.prev_block_hash(), + witness.inner.chunk_header.prev_block_hash(), prev_block.hash() ))); } diff --git a/chain/client/src/stateless_validation/chunk_validator/orphan_witness_handling.rs b/chain/client/src/stateless_validation/chunk_validator/orphan_witness_handling.rs index 96a55131d2b..2ca4689f987 100644 --- a/chain/client/src/stateless_validation/chunk_validator/orphan_witness_handling.rs +++ b/chain/client/src/stateless_validation/chunk_validator/orphan_witness_handling.rs @@ -28,7 +28,7 @@ impl Client { witness: ChunkStateWitness, witness_size: usize, ) -> Result { - let chunk_header = &witness.chunk_header; + let chunk_header = &witness.inner.chunk_header; let witness_height = chunk_header.height_created(); let witness_shard = chunk_header.shard_id(); @@ -83,7 +83,7 @@ impl Client { .orphan_witness_pool .take_state_witnesses_waiting_for_block(new_block.hash()); for witness in ready_witnesses { - let header = &witness.chunk_header; + let header = &witness.inner.chunk_header; tracing::debug!( target: "client", witness_height = header.height_created(), diff --git a/chain/client/src/stateless_validation/chunk_validator/orphan_witness_pool.rs b/chain/client/src/stateless_validation/chunk_validator/orphan_witness_pool.rs index 58cd381645f..dc77cc36794 100644 --- a/chain/client/src/stateless_validation/chunk_validator/orphan_witness_pool.rs +++ b/chain/client/src/stateless_validation/chunk_validator/orphan_witness_pool.rs @@ -52,7 +52,7 @@ impl OrphanStateWitnessPool { let cache_entry = CacheEntry { witness, _metrics_tracker: metrics_tracker }; if let Some((_, ejected_entry)) = self.witness_cache.push(cache_key, cache_entry) { // Another witness has been ejected from the cache due to capacity limit - let header = &ejected_entry.witness.chunk_header; + let header = &ejected_entry.witness.inner.chunk_header; tracing::debug!( target: "client", ejected_witness_height = header.height_created(), @@ -72,7 +72,7 @@ impl OrphanStateWitnessPool { ) -> Vec { let mut to_remove: Vec = Vec::new(); for (cache_key, cache_entry) in self.witness_cache.iter() { - if cache_entry.witness.chunk_header.prev_block_hash() == prev_block { + if cache_entry.witness.inner.chunk_header.prev_block_hash() == prev_block { to_remove.push(cache_key.clone()); } } @@ -96,7 +96,7 @@ impl OrphanStateWitnessPool { let witness_height = cache_key.height_created; if witness_height <= final_height { to_remove.push(cache_key.clone()); - let header = &cache_entry.witness.chunk_header; + let header = &cache_entry.witness.inner.chunk_header; tracing::debug!( target: "client", final_height, @@ -141,7 +141,7 @@ mod metrics_tracker { witness: &ChunkStateWitness, witness_size: usize, ) -> OrphanWitnessMetricsTracker { - let shard_id = witness.chunk_header.shard_id().to_string(); + let shard_id = witness.inner.chunk_header.shard_id().to_string(); metrics::ORPHAN_CHUNK_STATE_WITNESSES_TOTAL_COUNT .with_label_values(&[shard_id.as_str()]) .inc(); @@ -193,7 +193,7 @@ mod tests { encoded_length: u64, ) -> ChunkStateWitness { let mut witness = ChunkStateWitness::new_dummy(height, shard_id, prev_block_hash); - match &mut witness.chunk_header { + match &mut witness.inner.chunk_header { ShardChunkHeader::V3(header) => match &mut header.inner { ShardChunkHeaderInner::V1(_) => unimplemented!(), ShardChunkHeaderInner::V2(inner) => inner.encoded_length = encoded_length, @@ -221,7 +221,7 @@ mod tests { expected.sort_by(sort_comparator); if observed != expected { let print_witness_info = |witness: &ChunkStateWitness| { - let header = &witness.chunk_header; + let header = &witness.inner.chunk_header; eprintln!( "- height = {}, shard_id = {}, encoded_length: {} prev_block: {}", header.height_created(), diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index 34f9d0278f6..4fcd894ab31 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -200,7 +200,7 @@ impl PartialWitnessActor { tracing::debug!( target: "client", - chunk_hash=?state_witness.chunk_header.chunk_hash(), + chunk_hash=?state_witness.inner.chunk_header.chunk_hash(), "distribute_chunk_state_witness", ); @@ -224,7 +224,7 @@ impl PartialWitnessActor { key.clone(), contract_accesses, MainTransitionKey { - block_hash: state_witness.main_state_transition.block_hash, + block_hash: state_witness.inner.main_state_transition.block_hash, shard_id: main_transition_shard_id, }, &chunk_validators, @@ -235,7 +235,7 @@ impl PartialWitnessActor { let witness_bytes = compress_witness(&state_witness)?; self.send_state_witness_parts( key.epoch_id, - &state_witness.chunk_header, + &state_witness.inner.chunk_header, witness_bytes, &chunk_validators, &signer, @@ -598,7 +598,7 @@ impl PartialWitnessActor { /// Sends the contract accesses to the same chunk validators /// (except for the chunk producers that track the same shard), - /// which will receive the state witness for the new chunk. + /// which will receive the state witness for the new chunk. fn send_contract_accesses_to_chunk_validators( &self, key: ChunkProductionKey, @@ -776,7 +776,7 @@ impl PartialWitnessActor { } fn compress_witness(witness: &ChunkStateWitness) -> Result { - let shard_id_label = witness.chunk_header.shard_id().to_string(); + let shard_id_label = witness.inner.chunk_header.shard_id().to_string(); let encode_timer = near_chain::stateless_validation::metrics::CHUNK_STATE_WITNESS_ENCODE_TIME .with_label_values(&[shard_id_label.as_str()]) .start_timer(); diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs index 4beb1b1ff3d..a860eb32d16 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs @@ -423,7 +423,8 @@ impl PartialEncodedStateWitnessTracker { } // Merge accessed contracts into the main transition's partial state. - let PartialState::TrieValues(values) = &mut witness.main_state_transition.base_state; + let PartialState::TrieValues(values) = + &mut witness.inner.main_state_transition.base_state; values.extend(accessed_contracts.into_iter().map(|code| code.0.into())); tracing::debug!(target: "client", ?key, "Sending encoded witness to client."); @@ -472,7 +473,7 @@ impl PartialEncodedStateWitnessTracker { let decode_start = std::time::Instant::now(); let (witness, raw_witness_size) = encoded_witness.decode()?; let decode_elapsed_seconds = decode_start.elapsed().as_secs_f64(); - let witness_shard = witness.chunk_header.shard_id(); + let witness_shard = witness.inner.chunk_header.shard_id(); // Record metrics after validating the witness near_chain::stateless_validation::metrics::CHUNK_STATE_WITNESS_DECODE_TIME diff --git a/chain/client/src/stateless_validation/state_witness_tracker.rs b/chain/client/src/stateless_validation/state_witness_tracker.rs index 8ad0ed414d6..420e25793ef 100644 --- a/chain/client/src/stateless_validation/state_witness_tracker.rs +++ b/chain/client/src/stateless_validation/state_witness_tracker.rs @@ -118,7 +118,7 @@ impl ChunkStateWitnessTracker { &mut self, witness: &near_primitives::stateless_validation::state_witness::ChunkStateWitness, ) -> Option<&ChunkStateWitnessRecord> { - let key = ChunkStateWitnessKey::new(witness.chunk_header.chunk_hash()); + let key = ChunkStateWitnessKey::new(witness.inner.chunk_header.chunk_hash()); self.witnesses.get(&key) } } @@ -163,7 +163,11 @@ mod state_witness_tracker_tests { let clock = dummy_clock(); let mut tracker = ChunkStateWitnessTracker::new(clock.clock()); - tracker.record_witness_sent(witness.chunk_header.compute_hash(), 4321, NUM_VALIDATORS); + tracker.record_witness_sent( + witness.inner.chunk_header.compute_hash(), + 4321, + NUM_VALIDATORS, + ); clock.advance(Duration::milliseconds(3444)); // Ack received from all "except for one". @@ -182,7 +186,11 @@ mod state_witness_tracker_tests { let clock = dummy_clock(); let mut tracker = ChunkStateWitnessTracker::new(clock.clock()); - tracker.record_witness_sent(witness.chunk_header.compute_hash(), 4321, NUM_VALIDATORS); + tracker.record_witness_sent( + witness.inner.chunk_header.compute_hash(), + 4321, + NUM_VALIDATORS, + ); clock.advance(Duration::milliseconds(3444)); // Ack received from all. diff --git a/chain/client/src/test_utils/test_env.rs b/chain/client/src/test_utils/test_env.rs index 6ff11e3aa1f..b5340aa0aab 100644 --- a/chain/client/src/test_utils/test_env.rs +++ b/chain/client/src/test_utils/test_env.rs @@ -360,8 +360,10 @@ impl TestEnv { fn found_differing_post_state_root_due_to_state_transitions( witness: &ChunkStateWitness, ) -> bool { - let mut post_state_roots = HashSet::from([witness.main_state_transition.post_state_root]); - post_state_roots.extend(witness.implicit_transitions.iter().map(|t| t.post_state_root)); + let mut post_state_roots = + HashSet::from([witness.inner.main_state_transition.post_state_root]); + post_state_roots + .extend(witness.inner.implicit_transitions.iter().map(|t| t.post_state_root)); post_state_roots.len() >= 2 } diff --git a/core/primitives/src/stateless_validation/state_witness.rs b/core/primitives/src/stateless_validation/state_witness.rs index f6c1aff510b..366ab5c0b72 100644 --- a/core/primitives/src/stateless_validation/state_witness.rs +++ b/core/primitives/src/stateless_validation/state_witness.rs @@ -81,14 +81,61 @@ pub struct ChunkStateWitnessAck { impl ChunkStateWitnessAck { pub fn new(witness: &ChunkStateWitness) -> Self { - Self { chunk_hash: witness.chunk_header.chunk_hash() } + Self { chunk_hash: witness.inner.chunk_header.chunk_hash() } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize, ProtocolSchema)] +pub struct ChunkStateWitness { + pub inner: ChunkStateWitnessInner, + padding: Vec, +} + +impl ChunkStateWitness { + pub fn new( + chunk_producer: AccountId, + epoch_id: EpochId, + chunk_header: ShardChunkHeader, + main_state_transition: ChunkStateTransition, + source_receipt_proofs: HashMap, + applied_receipts_hash: CryptoHash, + transactions: Vec, + implicit_transitions: Vec, + new_transactions: Vec, + new_transactions_validation_state: PartialState, + ) -> Self { + let inner = ChunkStateWitnessInner::new( + chunk_producer, + epoch_id, + chunk_header, + main_state_transition, + source_receipt_proofs, + applied_receipts_hash, + transactions, + implicit_transitions, + new_transactions, + new_transactions_validation_state, + ); + let inner_serialized = borsh::to_vec(&inner).unwrap(); + const WANTED_SIZE: usize = 30 * 1024 * 1024; // 30 MB + let padding = vec![0; WANTED_SIZE - inner_serialized.len()]; + Self { inner, padding } + } + + pub fn chunk_production_key(&self) -> ChunkProductionKey { + self.inner.chunk_production_key() + } + + pub fn new_dummy(height: BlockHeight, shard_id: ShardId, prev_block_hash: CryptoHash) -> Self { + let inner = ChunkStateWitnessInner::new_dummy(height, shard_id, prev_block_hash); + Self { inner, padding: vec![] } } } /// The state witness for a chunk; proves the state transition that the /// chunk attests to. #[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize, ProtocolSchema)] -pub struct ChunkStateWitness { +pub struct ChunkStateWitnessInner { // TODO(stateless_validation): Deprecate this field in the next version of the state witness. pub chunk_producer: AccountId, /// EpochId corresponds to the next block after chunk's previous block. @@ -162,7 +209,7 @@ pub struct ChunkStateWitness { signature_differentiator: SignatureDifferentiator, } -impl ChunkStateWitness { +impl ChunkStateWitnessInner { pub fn new( chunk_producer: AccountId, epoch_id: EpochId, diff --git a/integration-tests/src/tests/client/features/orphan_chunk_state_witness.rs b/integration-tests/src/tests/client/features/orphan_chunk_state_witness.rs index cdda3ba38e3..2401c0a073b 100644 --- a/integration-tests/src/tests/client/features/orphan_chunk_state_witness.rs +++ b/integration-tests/src/tests/client/features/orphan_chunk_state_witness.rs @@ -175,7 +175,7 @@ fn setup_orphan_witness_test() -> OrphanWitnessTestEnv { "There should be no missing chunks." ); let witness = witness_opt.unwrap(); - assert_eq!(witness.chunk_header.chunk_hash(), block2.chunks()[0].chunk_hash()); + assert_eq!(witness.inner.chunk_header.chunk_hash(), block2.chunks()[0].chunk_hash()); for client_idx in clients_without_excluded { let blocks_processed = env.clients[client_idx] @@ -290,7 +290,7 @@ fn test_orphan_witness_not_fully_validated() { setup_orphan_witness_test(); // Make the witness invalid in a way that won't be detected during orphan witness validation - witness.source_receipt_proofs.insert( + witness.inner.source_receipt_proofs.insert( ChunkHash::default(), ReceiptProof( vec![], @@ -316,7 +316,7 @@ fn modify_witness_header_inner( witness: &mut ChunkStateWitness, f: impl FnOnce(&mut ShardChunkHeaderV3), ) { - match &mut witness.chunk_header { + match &mut witness.inner.chunk_header { ShardChunkHeader::V3(header) => { f(header); } diff --git a/tools/state-viewer/src/latest_witnesses.rs b/tools/state-viewer/src/latest_witnesses.rs index 8d5a3aee1af..9fb1e846bd1 100644 --- a/tools/state-viewer/src/latest_witnesses.rs +++ b/tools/state-viewer/src/latest_witnesses.rs @@ -72,9 +72,9 @@ impl DumpWitnessesCmd { println!( "#{} (height: {}, shard_id: {}, epoch_id: {:?})", i, - witness.chunk_header.height_created(), - witness.chunk_header.shard_id(), - witness.epoch_id + witness.inner.chunk_header.height_created(), + witness.inner.chunk_header.shard_id(), + witness.inner.epoch_id ); match self.mode { DumpWitnessesMode::Pretty => { @@ -84,9 +84,9 @@ impl DumpWitnessesCmd { DumpWitnessesMode::Binary { ref output_dir } => { let file_name = format!( "witness_{}_{}_{}_{}.bin", - witness.chunk_header.height_created(), - witness.chunk_header.shard_id(), - witness.epoch_id.0, + witness.inner.chunk_header.height_created(), + witness.inner.chunk_header.shard_id(), + witness.inner.epoch_id.0, i ); let file_path = output_dir.join(file_name); From f3b6eece99185a3d880186c0dfa869f0c1d9c1b4 Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Mon, 2 Dec 2024 13:45:26 +0200 Subject: [PATCH 02/17] . --- core/primitives/src/stateless_validation/state_witness.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/primitives/src/stateless_validation/state_witness.rs b/core/primitives/src/stateless_validation/state_witness.rs index 366ab5c0b72..72ae1d89240 100644 --- a/core/primitives/src/stateless_validation/state_witness.rs +++ b/core/primitives/src/stateless_validation/state_witness.rs @@ -118,7 +118,10 @@ impl ChunkStateWitness { ); let inner_serialized = borsh::to_vec(&inner).unwrap(); const WANTED_SIZE: usize = 30 * 1024 * 1024; // 30 MB - let padding = vec![0; WANTED_SIZE - inner_serialized.len()]; + let padding_size = WANTED_SIZE - inner_serialized.len(); + let padding: Vec = + (0..padding_size).map(|i| if i % 2 == 0 { 0xAA } else { 0x55 }).collect(); + // let padding = vec![0; WANTED_SIZE - inner_serialized.len()]; Self { inner, padding } } From 7f7182a30620506ecd4f23ea9a8aad76de27656e Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Mon, 2 Dec 2024 14:57:34 +0200 Subject: [PATCH 03/17] disable compression --- core/primitives/src/utils/compression.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/primitives/src/utils/compression.rs b/core/primitives/src/utils/compression.rs index 0b6fcce1103..e79499ed1ea 100644 --- a/core/primitives/src/utils/compression.rs +++ b/core/primitives/src/utils/compression.rs @@ -22,12 +22,11 @@ where fn encode(uncompressed: &T) -> std::io::Result<(Self, usize)> { // Flow of data: Original --> Borsh serialization --> Counting write --> zstd compression --> Bytes. // CountingWrite will count the number of bytes for the Borsh-serialized data, before compression. - let mut counting_write = - CountingWrite::new(zstd::stream::Encoder::new(Vec::new().writer(), COMPRESSION_LEVEL)?); + let mut counting_write = CountingWrite::new(Vec::new().writer()); borsh::to_writer(&mut counting_write, uncompressed)?; let borsh_bytes_len = counting_write.bytes_written(); - let encoded_bytes = counting_write.into_inner().finish()?.into_inner(); + let encoded_bytes = counting_write.into_inner().into_inner(); Ok((Self::from(encoded_bytes.into()), borsh_bytes_len.as_u64() as usize)) } From c5a43857172b33a04d27c0f91761ef5e697c05e1 Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Mon, 2 Dec 2024 16:39:04 +0200 Subject: [PATCH 04/17] remove decompression --- core/primitives/src/utils/compression.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/primitives/src/utils/compression.rs b/core/primitives/src/utils/compression.rs index e79499ed1ea..ec69d2bef16 100644 --- a/core/primitives/src/utils/compression.rs +++ b/core/primitives/src/utils/compression.rs @@ -43,10 +43,7 @@ where fn decode_with_limit(&self, limit: ByteSize) -> std::io::Result<(T, usize)> { // Flow of data: Bytes --> zstd decompression --> Counting read --> Borsh deserialization --> Original. // CountingRead will count the number of bytes for the Borsh-deserialized data, after decompression. - let mut counting_read = CountingRead::new_with_limit( - zstd::stream::Decoder::new(self.as_ref().reader())?, - limit, - ); + let mut counting_read = CountingRead::new_with_limit(self.as_ref().reader(), limit); match borsh::from_reader(&mut counting_read) { Err(err) => { From c86de699125e135a0521ff390de72e2582c2737d Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Mon, 2 Dec 2024 18:55:14 +0200 Subject: [PATCH 05/17] padding with 0 --- core/primitives/src/stateless_validation/state_witness.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/primitives/src/stateless_validation/state_witness.rs b/core/primitives/src/stateless_validation/state_witness.rs index 72ae1d89240..71f56b79521 100644 --- a/core/primitives/src/stateless_validation/state_witness.rs +++ b/core/primitives/src/stateless_validation/state_witness.rs @@ -116,12 +116,10 @@ impl ChunkStateWitness { new_transactions, new_transactions_validation_state, ); - let inner_serialized = borsh::to_vec(&inner).unwrap(); + let inner_len = borsh::to_vec(&inner).unwrap().len(); const WANTED_SIZE: usize = 30 * 1024 * 1024; // 30 MB - let padding_size = WANTED_SIZE - inner_serialized.len(); - let padding: Vec = - (0..padding_size).map(|i| if i % 2 == 0 { 0xAA } else { 0x55 }).collect(); - // let padding = vec![0; WANTED_SIZE - inner_serialized.len()]; + let padding_size = WANTED_SIZE - inner_len; + let padding = vec![0; padding_size]; Self { inner, padding } } From 2d3e5141f9b40fe0a69e683efd4bb2da22d1b601 Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Thu, 5 Dec 2024 15:39:59 +0200 Subject: [PATCH 06/17] 0 padding but still create 30mb --- core/primitives/src/stateless_validation/state_witness.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/primitives/src/stateless_validation/state_witness.rs b/core/primitives/src/stateless_validation/state_witness.rs index 71f56b79521..5e42fa7f002 100644 --- a/core/primitives/src/stateless_validation/state_witness.rs +++ b/core/primitives/src/stateless_validation/state_witness.rs @@ -119,8 +119,8 @@ impl ChunkStateWitness { let inner_len = borsh::to_vec(&inner).unwrap().len(); const WANTED_SIZE: usize = 30 * 1024 * 1024; // 30 MB let padding_size = WANTED_SIZE - inner_len; - let padding = vec![0; padding_size]; - Self { inner, padding } + let _padding = vec![0; padding_size]; + Self { inner, padding: vec![0; 0] } } pub fn chunk_production_key(&self) -> ChunkProductionKey { From 17a0d389b6512a26e5d5667f30c956881e205a40 Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Mon, 9 Dec 2024 11:40:53 +0200 Subject: [PATCH 07/17] 3mb --- core/primitives/src/stateless_validation/state_witness.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/primitives/src/stateless_validation/state_witness.rs b/core/primitives/src/stateless_validation/state_witness.rs index 5e42fa7f002..f2c66638a72 100644 --- a/core/primitives/src/stateless_validation/state_witness.rs +++ b/core/primitives/src/stateless_validation/state_witness.rs @@ -117,10 +117,10 @@ impl ChunkStateWitness { new_transactions_validation_state, ); let inner_len = borsh::to_vec(&inner).unwrap().len(); - const WANTED_SIZE: usize = 30 * 1024 * 1024; // 30 MB + const WANTED_SIZE: usize = 3 * 1024 * 1024; // 30 MB let padding_size = WANTED_SIZE - inner_len; - let _padding = vec![0; padding_size]; - Self { inner, padding: vec![0; 0] } + let padding = vec![0; padding_size]; + Self { inner, padding } } pub fn chunk_production_key(&self) -> ChunkProductionKey { From 6cfc22dffc9ac630b1242ef254e0028b109c4cec Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Tue, 10 Dec 2024 13:12:52 +0200 Subject: [PATCH 08/17] 30 mb and don't copy state witness parts --- .../partial_witness/partial_witness_actor.rs | 2 +- core/primitives/src/stateless_validation/state_witness.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index 4fcd894ab31..d8e144c1af9 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -279,7 +279,7 @@ impl PartialWitnessActor { epoch_id, chunk_header.clone(), part_ord, - part.unwrap().to_vec(), + part.unwrap().into_vec(), encoded_length, signer, ); diff --git a/core/primitives/src/stateless_validation/state_witness.rs b/core/primitives/src/stateless_validation/state_witness.rs index f2c66638a72..71f56b79521 100644 --- a/core/primitives/src/stateless_validation/state_witness.rs +++ b/core/primitives/src/stateless_validation/state_witness.rs @@ -117,7 +117,7 @@ impl ChunkStateWitness { new_transactions_validation_state, ); let inner_len = borsh::to_vec(&inner).unwrap().len(); - const WANTED_SIZE: usize = 3 * 1024 * 1024; // 30 MB + const WANTED_SIZE: usize = 30 * 1024 * 1024; // 30 MB let padding_size = WANTED_SIZE - inner_len; let padding = vec![0; padding_size]; Self { inner, padding } From e77eeeafcbe2dc90075b8e42071cce17be6ca9cd Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Tue, 10 Dec 2024 14:18:31 +0200 Subject: [PATCH 09/17] revert back to copying the parts --- .../partial_witness/partial_witness_actor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index d8e144c1af9..4fcd894ab31 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -279,7 +279,7 @@ impl PartialWitnessActor { epoch_id, chunk_header.clone(), part_ord, - part.unwrap().into_vec(), + part.unwrap().to_vec(), encoded_length, signer, ); From 8bbc5e74fc67067c8a569fafcccfdc423c2b4f3e Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Tue, 10 Dec 2024 16:54:03 +0200 Subject: [PATCH 10/17] remove copying --- .../partial_witness/partial_witness_actor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index 4fcd894ab31..d8e144c1af9 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -279,7 +279,7 @@ impl PartialWitnessActor { epoch_id, chunk_header.clone(), part_ord, - part.unwrap().to_vec(), + part.unwrap().into_vec(), encoded_length, signer, ); From d54505ce7a36e9b0f753f6fa6e1bc464fa789200 Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Wed, 11 Dec 2024 11:24:36 +0200 Subject: [PATCH 11/17] add decoding metric --- chain/client/src/metrics.rs | 10 ++++++++++ .../partial_witness/partial_deploys_tracker.rs | 2 +- .../partial_witness/partial_witness_tracker.rs | 5 ++++- core/primitives/src/reed_solomon.rs | 7 +++++-- 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/chain/client/src/metrics.rs b/chain/client/src/metrics.rs index c774a3eab8c..0138787b43b 100644 --- a/chain/client/src/metrics.rs +++ b/chain/client/src/metrics.rs @@ -648,6 +648,16 @@ pub(crate) static PARTIAL_WITNESS_ENCODE_TIME: LazyLock = LazyLock .unwrap() }); +pub(crate) static PARTIAL_WITNESS_DECODE_TIME: LazyLock = LazyLock::new(|| { + try_create_histogram_vec( + "near_partial_witness_decode_time", + "State witness decoding time from the partial state witness parts in seconds", + &["shard_id"], + Some(linear_buckets(0.0, 0.005, 20).unwrap()), + ) + .unwrap() +}); + pub(crate) static PARTIAL_WITNESS_TIME_TO_LAST_PART: LazyLock = LazyLock::new(|| { try_create_histogram_vec( "near_partial_witness_time_to_last_part", diff --git a/chain/client/src/stateless_validation/partial_witness/partial_deploys_tracker.rs b/chain/client/src/stateless_validation/partial_witness/partial_deploys_tracker.rs index 3b2c748a487..dfa47bab959 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_deploys_tracker.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_deploys_tracker.rs @@ -66,7 +66,7 @@ impl CacheEntry { ); None } - InsertPartResult::Decoded(decode_result) => Some(decode_result), + InsertPartResult::Decoded(decode_result, _) => Some(decode_result), } } } diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs index a860eb32d16..5e82c436999 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs @@ -188,7 +188,10 @@ impl CacheEntry { "Received invalid partial witness part ord" ); } - InsertPartResult::Decoded(decode_result) => { + InsertPartResult::Decoded(decode_result, decoding_time) => { + metrics::PARTIAL_WITNESS_DECODE_TIME + .with_label_values(&[&self.shard_id.to_string()]) + .observe(decoding_time); self.witness_parts = WitnessPartsState::Decoded { decode_result, decoded_at: Instant::now() }; metrics::DECODE_PARTIAL_WITNESS_ACCESSED_CONTRACTS_STATE_COUNT diff --git a/core/primitives/src/reed_solomon.rs b/core/primitives/src/reed_solomon.rs index f6830a4f1d3..f960fdba809 100644 --- a/core/primitives/src/reed_solomon.rs +++ b/core/primitives/src/reed_solomon.rs @@ -187,7 +187,7 @@ pub enum InsertPartResult { Accepted, PartAlreadyAvailable, InvalidPartOrd, - Decoded(std::io::Result), + Decoded(std::io::Result, f64), } impl ReedSolomonPartsTracker { @@ -239,7 +239,10 @@ impl ReedSolomonPartsTracker { self.parts[part_ord] = Some(part); if self.has_enough_parts() { - InsertPartResult::Decoded(self.encoder.decode(&mut self.parts, self.encoded_length)) + let decode_start = std::time::Instant::now(); + let decoded = self.encoder.decode(&mut self.parts, self.encoded_length); + let decode_elapsed_seconds = decode_start.elapsed().as_secs_f64(); + InsertPartResult::Decoded(decoded, decode_elapsed_seconds) } else { InsertPartResult::Accepted } From 5fe820213bb0936a66867e0c8896ab5773ceb514 Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Fri, 13 Dec 2024 16:48:16 +0200 Subject: [PATCH 12/17] . --- .../partial_witness/partial_witness_actor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index d8e144c1af9..4fcd894ab31 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -279,7 +279,7 @@ impl PartialWitnessActor { epoch_id, chunk_header.clone(), part_ord, - part.unwrap().into_vec(), + part.unwrap().to_vec(), encoded_length, signer, ); From efc25d57fd7a6a122131fdb28a986da8b175df51 Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Fri, 13 Dec 2024 18:15:18 +0200 Subject: [PATCH 13/17] enable simd --- Cargo.lock | 2 ++ Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index cc140563ee6..8993fc0f4f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6793,6 +6793,8 @@ version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7263373d500d4d4f505d43a2a662d475a894aa94503a1ee28e9188b5f3960d4f" dependencies = [ + "cc", + "libc", "libm", "lru 0.7.8", "parking_lot 0.11.2", diff --git a/Cargo.toml b/Cargo.toml index de169f5038f..05e063e0363 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -321,7 +321,7 @@ rand_hc = "0.3.1" rand_xorshift = "0.3" rayon = "1.5" redis = "0.23.0" -reed-solomon-erasure = "6.0.0" +reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] } regex = "1.7.1" region = "3.0" reqwest = { version = "0.11.14", features = ["blocking"] } From 80a6dfff634835a47dbc52929cd1fe0298de67e6 Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Wed, 18 Dec 2024 12:47:35 +0200 Subject: [PATCH 14/17] spawn thread in handle_partial_encoded_state_witness --- .../partial_witness/partial_witness_actor.rs | 73 +- .../partial_witness_actor_v2.rs | 805 ++++++++++++++++++ chain/client/src/test_utils/setup.rs | 1 + integration-tests/src/test_loop/builder.rs | 1 + integration-tests/src/tests/network/runner.rs | 1 + nearcore/src/lib.rs | 1 + 6 files changed, 870 insertions(+), 12 deletions(-) create mode 100644 chain/client/src/stateless_validation/partial_witness/partial_witness_actor_v2.rs diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index 34f9d0278f6..0f2762e5391 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -75,6 +75,7 @@ pub struct PartialWitnessActor { /// Same as above for contract deploys. contract_deploys_encoders: ReedSolomonEncoderCache, compile_contracts_spawner: Arc, + partial_witness_spawner: Arc, /// AccountId in the key corresponds to the requester (chunk validator). processed_contract_code_requests: LruCache<(ChunkProductionKey, AccountId), ()>, } @@ -166,6 +167,7 @@ impl PartialWitnessActor { epoch_manager: Arc, runtime: Arc, compile_contracts_spawner: Arc, + partial_witness_spawner: Arc, ) -> Self { let partial_witness_tracker = PartialEncodedStateWitnessTracker::new(client_sender, epoch_manager.clone()); @@ -182,6 +184,7 @@ impl PartialWitnessActor { CONTRACT_DEPLOYS_RATIO_DATA_PARTS, ), compile_contracts_spawner, + partial_witness_spawner, processed_contract_code_requests: LruCache::new( NonZeroUsize::new(PROCESSED_CONTRACT_CODE_REQUESTS_CACHE_SIZE).unwrap(), ), @@ -402,18 +405,45 @@ impl PartialWitnessActor { &mut self, partial_witness: PartialEncodedStateWitness, ) -> Result<(), Error> { - tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessMessage"); - let signer = self.my_validator_signer()?; - // Validate the partial encoded state witness and forward the part to all the chunk validators. - if validate_partial_encoded_state_witness( - self.epoch_manager.as_ref(), - &partial_witness, - &signer, - self.runtime.store(), - )? { - self.forward_state_witness_part(partial_witness)?; - } + let epoch_manager = self.epoch_manager.clone(); + let runtime_adapter = self.runtime.clone(); + + let ChunkProductionKey { shard_id, epoch_id, height_created } = + partial_witness.chunk_production_key(); + + let chunk_producer = self + .epoch_manager + .get_chunk_producer_info(&ChunkProductionKey { epoch_id, height_created, shard_id })? + .take_account_id(); + + // Forward witness part to chunk validators except the validator that produced the chunk and witness. + let target_chunk_validators = self + .epoch_manager + .get_chunk_validator_assignments(&epoch_id, shard_id, height_created)? + .ordered_chunk_validators() + .into_iter() + .filter(|validator| validator != &chunk_producer) + .collect(); + + let network_adapter = self.network_adapter.clone(); + + self.partial_witness_spawner.spawn("handle_partial_encoded_state_witness", move || { + tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessMessage"); + + // Validate the partial encoded state witness and forward the part to all the chunk validators. + if validate_partial_encoded_state_witness( + epoch_manager.as_ref(), + &partial_witness, + &signer, + runtime_adapter.store(), + ).unwrap() { + forward_state_witness_part_v2(partial_witness, + chunk_producer, + target_chunk_validators, + network_adapter).unwrap(); + } + }); Ok(()) } @@ -598,7 +628,7 @@ impl PartialWitnessActor { /// Sends the contract accesses to the same chunk validators /// (except for the chunk producers that track the same shard), - /// which will receive the state witness for the new chunk. + /// which will receive the state witness for the new chunk. fn send_contract_accesses_to_chunk_validators( &self, key: ChunkProductionKey, @@ -799,3 +829,22 @@ fn contracts_cache_contains_contract( let cache_key = get_contract_cache_key(contract_hash.0, &runtime_config.wasm_config); cache.memory_cache().contains(cache_key) || cache.has(&cache_key).is_ok_and(|has| has) } + +/// Sends the witness part to the chunk validators, except the chunk producer that generated the witness part. +fn forward_state_witness_part_v2( + partial_witness: PartialEncodedStateWitness, + chunk_producer: AccountId, + target_chunk_validators: Vec, + network_adapter: PeerManagerAdapter, +) -> Result<(), Error> { + let ChunkProductionKey { shard_id, epoch_id, height_created } = + partial_witness.chunk_production_key(); + + network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::PartialEncodedStateWitnessForward( + target_chunk_validators, + partial_witness, + ), + )); + Ok(()) +} diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor_v2.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor_v2.rs new file mode 100644 index 00000000000..02c50312445 --- /dev/null +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor_v2.rs @@ -0,0 +1,805 @@ +use std::collections::HashSet; +use std::num::NonZeroUsize; +use std::sync::Arc; + +use itertools::Itertools; +use lru::LruCache; +use near_async::futures::{AsyncComputationSpawner, AsyncComputationSpawnerExt}; +use near_async::messaging::{Actor, CanSend, Handler, Sender}; +use near_async::time::Clock; +use near_async::{MultiSend, MultiSenderFrom}; +use near_chain::types::RuntimeAdapter; +use near_chain::Error; +use near_chain_configs::MutableValidatorSigner; +use near_epoch_manager::EpochManagerAdapter; +use near_network::state_witness::{ + ChunkContractAccessesMessage, ChunkStateWitnessAckMessage, ContractCodeRequestMessage, + ContractCodeResponseMessage, PartialEncodedContractDeploysMessage, + PartialEncodedStateWitnessForwardMessage, PartialEncodedStateWitnessMessage, +}; +use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest}; +use near_parameters::RuntimeConfig; +use near_performance_metrics_macros::perf; +use near_primitives::reed_solomon::{ReedSolomonEncoder, ReedSolomonEncoderCache}; +use near_primitives::sharding::ShardChunkHeader; +use near_primitives::stateless_validation::contract_distribution::{ + ChunkContractAccesses, ChunkContractDeploys, CodeBytes, CodeHash, ContractCodeRequest, + ContractCodeResponse, ContractUpdates, MainTransitionKey, PartialEncodedContractDeploys, + PartialEncodedContractDeploysPart, +}; +use near_primitives::stateless_validation::partial_witness::PartialEncodedStateWitness; +use near_primitives::stateless_validation::state_witness::{ + ChunkStateWitness, ChunkStateWitnessAck, EncodedChunkStateWitness, +}; +use near_primitives::stateless_validation::stored_chunk_state_transition_data::StoredChunkStateTransitionData; +use near_primitives::stateless_validation::ChunkProductionKey; +use near_primitives::types::{AccountId, EpochId, ShardId}; +use near_primitives::validator_signer::ValidatorSigner; +use near_store::adapter::trie_store::TrieStoreAdapter; +use near_store::{DBCol, StorageError, TrieDBStorage, TrieStorage}; +use near_vm_runner::{get_contract_cache_key, ContractCode, ContractRuntimeCache}; +use rand::Rng; + +use crate::client_actor::ClientSenderForPartialWitness; +use crate::metrics; +use crate::stateless_validation::state_witness_tracker::ChunkStateWitnessTracker; +use crate::stateless_validation::validate::{ + validate_chunk_contract_accesses, validate_contract_code_request, + validate_partial_encoded_contract_deploys, validate_partial_encoded_state_witness, +}; + +use super::encoding::{CONTRACT_DEPLOYS_RATIO_DATA_PARTS, WITNESS_RATIO_DATA_PARTS}; +use super::partial_deploys_tracker::PartialEncodedContractDeploysTracker; +use super::partial_witness_tracker::PartialEncodedStateWitnessTracker; +use near_primitives::utils::compression::CompressedData; + +const PROCESSED_CONTRACT_CODE_REQUESTS_CACHE_SIZE: usize = 30; + +pub struct PartialWitnessActorState { + /// Validator signer to sign the state witness. This field is mutable and optional. Use with caution! + /// Lock the value of mutable validator signer for the duration of a request to ensure consistency. + /// Please note that the locked value should not be stored anywhere or passed through the thread boundary. + my_signer: MutableValidatorSigner, + /// Tracks the parts of the state witness sent from chunk producers to chunk validators. + partial_witness_tracker: PartialEncodedStateWitnessTracker, + partial_deploys_tracker: PartialEncodedContractDeploysTracker, + /// Tracks a collection of state witnesses sent from chunk producers to chunk validators. + state_witness_tracker: ChunkStateWitnessTracker, + /// AccountId in the key corresponds to the requester (chunk validator). + processed_contract_code_requests: LruCache<(ChunkProductionKey, AccountId), ()>, +} + +pub struct PartialWitnessActor { + /// Adapter to send messages to the network. + network_adapter: PeerManagerAdapter, + epoch_manager: Arc, + runtime: Arc, + /// Reed Solomon encoder for encoding state witness parts. + /// We keep one wrapper for each length of chunk_validators to avoid re-creating the encoder. + witness_encoders: ReedSolomonEncoderCache, + /// Same as above for contract deploys. + contract_deploys_encoders: ReedSolomonEncoderCache, + compile_contracts_spawner: Arc, + state: PartialWitnessActorState, +} + +impl Actor for PartialWitnessActor {} + +#[derive(actix::Message, Debug)] +#[rtype(result = "()")] +pub struct DistributeStateWitnessRequest { + pub state_witness: ChunkStateWitness, + pub contract_updates: ContractUpdates, + pub main_transition_shard_id: ShardId, +} + +#[derive(Clone, MultiSend, MultiSenderFrom)] +pub struct PartialWitnessSenderForClient { + pub distribute_chunk_state_witness: Sender, +} + +impl Handler for PartialWitnessActor { + #[perf] + fn handle(&mut self, msg: DistributeStateWitnessRequest) { + if let Err(err) = self.handle_distribute_state_witness_request(msg) { + tracing::error!(target: "client", ?err, "Failed to handle distribute chunk state witness request"); + } + } +} + +impl Handler for PartialWitnessActor { + fn handle(&mut self, msg: ChunkStateWitnessAckMessage) { + self.handle_chunk_state_witness_ack(msg.0); + } +} + +impl Handler for PartialWitnessActor { + fn handle(&mut self, msg: PartialEncodedStateWitnessMessage) { + if let Err(err) = self.handle_partial_encoded_state_witness(msg.0) { + tracing::error!(target: "client", ?err, "Failed to handle PartialEncodedStateWitnessMessage"); + } + } +} + +impl Handler for PartialWitnessActor { + fn handle(&mut self, msg: PartialEncodedStateWitnessForwardMessage) { + if let Err(err) = self.handle_partial_encoded_state_witness_forward(msg.0) { + tracing::error!(target: "client", ?err, "Failed to handle PartialEncodedStateWitnessForwardMessage"); + } + } +} + +impl Handler for PartialWitnessActor { + fn handle(&mut self, msg: ChunkContractAccessesMessage) { + if let Err(err) = self.handle_chunk_contract_accesses(msg.0) { + tracing::error!(target: "client", ?err, "Failed to handle ChunkContractAccessesMessage"); + } + } +} + +impl Handler for PartialWitnessActor { + fn handle(&mut self, msg: PartialEncodedContractDeploysMessage) { + if let Err(err) = self.handle_partial_encoded_contract_deploys(msg.0) { + tracing::error!(target: "client", ?err, "Failed to handle PartialEncodedContractDeploysMessage"); + } + } +} + +impl Handler for PartialWitnessActor { + fn handle(&mut self, msg: ContractCodeRequestMessage) { + if let Err(err) = self.handle_contract_code_request(msg.0) { + tracing::error!(target: "client", ?err, "Failed to handle ContractCodeRequestMessage"); + } + } +} + +impl Handler for PartialWitnessActor { + fn handle(&mut self, msg: ContractCodeResponseMessage) { + if let Err(err) = self.handle_contract_code_response(msg.0) { + tracing::error!(target: "client", ?err, "Failed to handle ContractCodeResponseMessage"); + } + } +} + +impl PartialWitnessActor { + pub fn new( + clock: Clock, + network_adapter: PeerManagerAdapter, + client_sender: ClientSenderForPartialWitness, + my_signer: MutableValidatorSigner, + epoch_manager: Arc, + runtime: Arc, + compile_contracts_spawner: Arc, + ) -> Self { + let partial_witness_tracker = + PartialEncodedStateWitnessTracker::new(client_sender, epoch_manager.clone()); + Self { + network_adapter, + my_signer, + epoch_manager, + partial_witness_tracker, + partial_deploys_tracker: PartialEncodedContractDeploysTracker::new(), + state_witness_tracker: ChunkStateWitnessTracker::new(clock), + runtime, + witness_encoders: ReedSolomonEncoderCache::new(WITNESS_RATIO_DATA_PARTS), + contract_deploys_encoders: ReedSolomonEncoderCache::new( + CONTRACT_DEPLOYS_RATIO_DATA_PARTS, + ), + compile_contracts_spawner, + processed_contract_code_requests: LruCache::new( + NonZeroUsize::new(PROCESSED_CONTRACT_CODE_REQUESTS_CACHE_SIZE).unwrap(), + ), + } + } + + fn handle_distribute_state_witness_request( + &mut self, + msg: DistributeStateWitnessRequest, + ) -> Result<(), Error> { + let DistributeStateWitnessRequest { + state_witness, + contract_updates: ContractUpdates { contract_accesses, contract_deploys }, + main_transition_shard_id, + } = msg; + + tracing::debug!( + target: "client", + chunk_hash=?state_witness.chunk_header.chunk_hash(), + "distribute_chunk_state_witness", + ); + + // We send the state-witness and contract-updates in the following order: + // 1. We send the hashes of the contract code accessed (if contract code is excluded from witness and any contracts are called) + // before the state witness in order to allow validators to check and request missing contract code, while waiting for witness parts. + // 2. We send the state witness parts to witness-part owners. + // 3. We send the contract deploys parts to other validators (that do not validate the witness in this turn). This is lower priority + // since the newly-deployed contracts will be needed by other validators in later turns. + + let signer = self.my_validator_signer()?; + let key = state_witness.chunk_production_key(); + let chunk_validators = self + .epoch_manager + .get_chunk_validator_assignments(&key.epoch_id, key.shard_id, key.height_created) + .expect("Chunk validators must be defined") + .ordered_chunk_validators(); + + if !contract_accesses.is_empty() { + self.send_contract_accesses_to_chunk_validators( + key.clone(), + contract_accesses, + MainTransitionKey { + block_hash: state_witness.main_state_transition.block_hash, + shard_id: main_transition_shard_id, + }, + &chunk_validators, + &signer, + ); + } + + let witness_bytes = compress_witness(&state_witness)?; + self.send_state_witness_parts( + key.epoch_id, + &state_witness.chunk_header, + witness_bytes, + &chunk_validators, + &signer, + )?; + + if !contract_deploys.is_empty() { + self.send_chunk_contract_deploys_parts(key, contract_deploys)?; + } + + Ok(()) + } + + // Function to generate the parts of the state witness and return them as a tuple of chunk_validator and part. + fn generate_state_witness_parts( + &mut self, + epoch_id: EpochId, + chunk_header: &ShardChunkHeader, + witness_bytes: EncodedChunkStateWitness, + chunk_validators: &[AccountId], + signer: &ValidatorSigner, + ) -> Result, Error> { + tracing::debug!( + target: "client", + chunk_hash=?chunk_header.chunk_hash(), + ?chunk_validators, + "generate_state_witness_parts", + ); + + // Break the state witness into parts using Reed Solomon encoding. + let encoder = self.witness_encoders.entry(chunk_validators.len()); + let (parts, encoded_length) = encoder.encode(&witness_bytes); + + Ok(chunk_validators + .iter() + .zip_eq(parts) + .enumerate() + .map(|(part_ord, (chunk_validator, part))| { + // It's fine to unwrap part here as we just constructed the parts above and we expect + // all of them to be present. + let partial_witness = PartialEncodedStateWitness::new( + epoch_id, + chunk_header.clone(), + part_ord, + part.unwrap().to_vec(), + encoded_length, + signer, + ); + (chunk_validator.clone(), partial_witness) + }) + .collect_vec()) + } + + fn generate_contract_deploys_parts( + &mut self, + key: &ChunkProductionKey, + deploys: ChunkContractDeploys, + ) -> Result, Error> { + let validators = self.ordered_contract_deploys_validators(key)?; + // Note that target validators do not include the chunk producers, and thus in some case + // (eg. tests or small networks) there may be no other validators to send the new contracts to. + if validators.is_empty() { + return Ok(vec![]); + } + + let encoder = self.contract_deploys_encoder(validators.len()); + let (parts, encoded_length) = encoder.encode(&deploys); + let signer = self.my_validator_signer()?; + + Ok(validators + .into_iter() + .zip_eq(parts) + .enumerate() + .map(|(part_ord, (validator, part))| { + let partial_deploys = PartialEncodedContractDeploys::new( + key.clone(), + PartialEncodedContractDeploysPart { + part_ord, + data: part.unwrap().to_vec().into_boxed_slice(), + encoded_length, + }, + &signer, + ); + (validator, partial_deploys) + }) + .collect_vec()) + } + + // Break the state witness into parts and send each part to the corresponding chunk validator owner. + // The chunk validator owner will then forward the part to all other chunk validators. + // Each chunk validator would collect the parts and reconstruct the state witness. + fn send_state_witness_parts( + &mut self, + epoch_id: EpochId, + chunk_header: &ShardChunkHeader, + witness_bytes: EncodedChunkStateWitness, + chunk_validators: &[AccountId], + signer: &ValidatorSigner, + ) -> Result<(), Error> { + // Capture these values first, as the sources are consumed before calling record_witness_sent. + let chunk_hash = chunk_header.chunk_hash(); + let witness_size_in_bytes = witness_bytes.size_bytes(); + + // Record time taken to encode the state witness parts. + let shard_id_label = chunk_header.shard_id().to_string(); + let encode_timer = metrics::PARTIAL_WITNESS_ENCODE_TIME + .with_label_values(&[shard_id_label.as_str()]) + .start_timer(); + let validator_witness_tuple = self.generate_state_witness_parts( + epoch_id, + chunk_header, + witness_bytes, + chunk_validators, + signer, + )?; + encode_timer.observe_duration(); + + // Record the witness in order to match the incoming acks for measuring round-trip times. + // See process_chunk_state_witness_ack for the handling of the ack messages. + self.state_witness_tracker.record_witness_sent( + chunk_hash, + witness_size_in_bytes, + validator_witness_tuple.len(), + ); + + // Send the parts to the corresponding chunk validator owners. + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple), + )); + Ok(()) + } + + /// Sends the witness part to the chunk validators, except the chunk producer that generated the witness part. + fn forward_state_witness_part( + &self, + partial_witness: PartialEncodedStateWitness, + ) -> Result<(), Error> { + let ChunkProductionKey { shard_id, epoch_id, height_created } = + partial_witness.chunk_production_key(); + let chunk_producer = self + .epoch_manager + .get_chunk_producer_info(&ChunkProductionKey { epoch_id, height_created, shard_id })? + .take_account_id(); + + // Forward witness part to chunk validators except the validator that produced the chunk and witness. + let target_chunk_validators = self + .epoch_manager + .get_chunk_validator_assignments(&epoch_id, shard_id, height_created)? + .ordered_chunk_validators() + .into_iter() + .filter(|validator| validator != &chunk_producer) + .collect(); + + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::PartialEncodedStateWitnessForward( + target_chunk_validators, + partial_witness, + ), + )); + Ok(()) + } + + /// Function to handle receiving partial_encoded_state_witness message from chunk producer. + fn handle_partial_encoded_state_witness( + &mut self, + partial_witness: PartialEncodedStateWitness, + ) -> Result<(), Error> { + tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessMessage"); + + let signer = self.my_validator_signer()?; + // Validate the partial encoded state witness and forward the part to all the chunk validators. + if validate_partial_encoded_state_witness( + self.epoch_manager.as_ref(), + &partial_witness, + &signer, + self.runtime.store(), + )? { + self.forward_state_witness_part(partial_witness)?; + } + + Ok(()) + } + + /// Function to handle receiving partial_encoded_state_witness_forward message from chunk producer. + fn handle_partial_encoded_state_witness_forward( + &mut self, + partial_witness: PartialEncodedStateWitness, + ) -> Result<(), Error> { + tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessForwardMessage"); + + let signer = self.my_validator_signer()?; + // Validate the partial encoded state witness and store the partial encoded state witness. + if validate_partial_encoded_state_witness( + self.epoch_manager.as_ref(), + &partial_witness, + &signer, + self.runtime.store(), + )? { + self.partial_witness_tracker.store_partial_encoded_state_witness(partial_witness)?; + } + + Ok(()) + } + + /// Handles partial contract deploy message received from a peer. + /// + /// This message may belong to one of two steps of distributing contract code. In the first step the code is compressed + /// and encoded into parts using Reed Solomon encoding and each part is sent to one of the validators (part owner). + /// See `send_chunk_contract_deploys_parts` for the code implementing this. In the second step each validator (part-owner) + /// forwards the part it receives to other validators. + fn handle_partial_encoded_contract_deploys( + &mut self, + partial_deploys: PartialEncodedContractDeploys, + ) -> Result<(), Error> { + tracing::debug!(target: "client", ?partial_deploys, "Receive PartialEncodedContractDeploys"); + if !validate_partial_encoded_contract_deploys( + self.epoch_manager.as_ref(), + &partial_deploys, + self.runtime.store(), + )? { + return Ok(()); + } + if self.partial_deploys_tracker.already_processed(&partial_deploys) { + return Ok(()); + } + let key = partial_deploys.chunk_production_key().clone(); + let validators = self.ordered_contract_deploys_validators(&key)?; + if validators.is_empty() { + // Note that target validators do not include the chunk producers, and thus in some case + // (eg. tests or small networks) there may be no other validators to send the new contracts to. + // In such case, the message we are handling here should not be sent in the first place, + // unless there is a bug or adversarial behavior that sends the message. + debug_assert!(false, "No target validators, we must not receive this message"); + return Ok(()); + } + + // Forward to other validators if the part received is my part + let signer = self.my_validator_signer()?; + let my_account_id = signer.validator_id(); + let Some(my_part_ord) = validators.iter().position(|validator| validator == my_account_id) + else { + tracing::warn!( + target: "client", + ?key, + "Validator is not a part of contract deploys distribution" + ); + return Ok(()); + }; + if partial_deploys.part().part_ord == my_part_ord { + let other_validators = validators + .iter() + .filter(|&validator| validator != my_account_id) + .cloned() + .collect_vec(); + if !other_validators.is_empty() { + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::PartialEncodedContractDeploys( + other_validators, + partial_deploys.clone(), + ), + )); + } + } + + // Store part + let encoder = self.contract_deploys_encoder(validators.len()); + if let Some(deploys) = self + .partial_deploys_tracker + .store_partial_encoded_contract_deploys(partial_deploys, encoder)? + { + let contracts = match deploys.decompress_contracts() { + Ok(contracts) => contracts, + Err(err) => { + tracing::warn!( + target: "client", + ?err, + ?key, + "Failed to decompress deployed contracts." + ); + return Ok(()); + } + }; + let contract_codes = contracts.into_iter().map(|contract| contract.into()).collect(); + let runtime = self.runtime.clone(); + self.compile_contracts_spawner.spawn("precompile_deployed_contracts", move || { + if let Err(err) = runtime.precompile_contracts(&key.epoch_id, contract_codes) { + tracing::error!( + target: "client", + ?err, + ?key, + "Failed to precompile deployed contracts." + ); + } + }); + } + + Ok(()) + } + + /// Handles the state witness ack message from the chunk validator. + /// It computes the round-trip time between sending the state witness and receiving + /// the ack message and updates the corresponding metric with it. + /// Currently we do not raise an error for handling of witness-ack messages, + /// as it is used only for tracking some networking metrics. + fn handle_chunk_state_witness_ack(&mut self, witness_ack: ChunkStateWitnessAck) { + self.state_witness_tracker.on_witness_ack_received(witness_ack); + } + + /// Handles contract code accesses message from chunk producer. + /// This is sent in parallel to a chunk state witness and contains the hashes + /// of the contract code accessed when applying the previous chunk of the witness. + fn handle_chunk_contract_accesses( + &mut self, + accesses: ChunkContractAccesses, + ) -> Result<(), Error> { + let signer = self.my_validator_signer()?; + if !validate_chunk_contract_accesses( + self.epoch_manager.as_ref(), + &accesses, + &signer, + self.runtime.store(), + )? { + return Ok(()); + } + let key = accesses.chunk_production_key(); + let contracts_cache = self.runtime.compiled_contract_cache(); + let runtime_config = self + .runtime + .get_runtime_config(self.epoch_manager.get_epoch_protocol_version(&key.epoch_id)?)?; + let missing_contract_hashes = HashSet::from_iter( + accesses + .contracts() + .iter() + .filter(|&hash| { + !contracts_cache_contains_contract(contracts_cache, hash, &runtime_config) + }) + .cloned(), + ); + if missing_contract_hashes.is_empty() { + return Ok(()); + } + self.partial_witness_tracker + .store_accessed_contract_hashes(key.clone(), missing_contract_hashes.clone())?; + let random_chunk_producer = { + let mut chunk_producers = self + .epoch_manager + .get_epoch_chunk_producers_for_shard(&key.epoch_id, key.shard_id)?; + chunk_producers.swap_remove(rand::thread_rng().gen_range(0..chunk_producers.len())) + }; + let request = ContractCodeRequest::new( + key.clone(), + missing_contract_hashes, + accesses.main_transition().clone(), + &signer, + ); + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::ContractCodeRequest(random_chunk_producer, request), + )); + Ok(()) + } + + /// Sends the contract accesses to the same chunk validators + /// (except for the chunk producers that track the same shard), + /// which will receive the state witness for the new chunk. + fn send_contract_accesses_to_chunk_validators( + &self, + key: ChunkProductionKey, + contract_accesses: HashSet, + main_transition: MainTransitionKey, + chunk_validators: &[AccountId], + my_signer: &ValidatorSigner, + ) { + let chunk_producers: HashSet = self + .epoch_manager + .get_epoch_chunk_producers_for_shard(&key.epoch_id, key.shard_id) + .expect("Chunk producers must be defined") + .into_iter() + .collect(); + + // Exclude chunk producers that track the same shard from the target list, since they track the state that contains the respective code. + let target_chunk_validators = chunk_validators + .iter() + .filter(|validator| !chunk_producers.contains(*validator)) + .cloned() + .collect(); + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::ChunkContractAccesses( + target_chunk_validators, + ChunkContractAccesses::new(key, contract_accesses, main_transition, my_signer), + ), + )); + } + + /// Retrieves the code for the given contract hashes and distributes them to validator in parts. + /// + /// This implements the first step of distributing contract code to validators where the contract codes + /// are compressed and encoded into parts using Reed Solomon encoding, and then each part is sent to + /// one of the validators (part-owner). Second step of the distribution, where each validator (part-owner) + /// forwards the part it receives is implemented in `handle_partial_encoded_contract_deploys`. + fn send_chunk_contract_deploys_parts( + &mut self, + key: ChunkProductionKey, + contract_codes: Vec, + ) -> Result<(), Error> { + let contracts = contract_codes.into_iter().map(|contract| contract.into()).collect(); + let compressed_deploys = ChunkContractDeploys::compress_contracts(&contracts)?; + let validator_parts = self.generate_contract_deploys_parts(&key, compressed_deploys)?; + for (part_owner, deploys_part) in validator_parts.into_iter() { + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::PartialEncodedContractDeploys(vec![part_owner], deploys_part), + )); + } + Ok(()) + } + + /// Handles contract code requests message from chunk validators. + /// As response to this message, sends the contract code requested to + /// the requesting chunk validator for the given hashes of the contract code. + fn handle_contract_code_request(&mut self, request: ContractCodeRequest) -> Result<(), Error> { + if !validate_contract_code_request( + self.epoch_manager.as_ref(), + &request, + self.runtime.store(), + )? { + return Ok(()); + } + + let key = request.chunk_production_key(); + let processed_requests_key = (key.clone(), request.requester().clone()); + if self.processed_contract_code_requests.contains(&processed_requests_key) { + tracing::warn!( + target: "client", + ?processed_requests_key, + "Contract code request from this account was already processed" + ); + return Ok(()); + } + self.processed_contract_code_requests.push(processed_requests_key, ()); + + let _timer = near_chain::stateless_validation::metrics::PROCESS_CONTRACT_CODE_REQUEST_TIME + .with_label_values(&[&key.shard_id.to_string()]) + .start_timer(); + + let main_transition_key = request.main_transition(); + let Some(transition_data) = + self.runtime.store().get_ser::( + DBCol::StateTransitionData, + &near_primitives::utils::get_block_shard_id( + &main_transition_key.block_hash, + main_transition_key.shard_id, + ), + )? + else { + tracing::warn!( + target: "client", + ?key, + ?main_transition_key, + "Missing state transition data" + ); + return Ok(()); + }; + let valid_accesses: HashSet = + transition_data.contract_accesses().iter().cloned().collect(); + + let storage = TrieDBStorage::new( + TrieStoreAdapter::new(self.runtime.store().clone()), + self.epoch_manager.shard_id_to_uid( + main_transition_key.shard_id, + &self.epoch_manager.get_epoch_id(&main_transition_key.block_hash)?, + )?, + ); + let mut contracts = Vec::new(); + for contract_hash in request.contracts() { + if !valid_accesses.contains(contract_hash) { + tracing::warn!( + target: "client", + ?key, + ?contract_hash, + "Requested contract code was not accessed when applying the chunk" + ); + return Ok(()); + } + match storage.retrieve_raw_bytes(&contract_hash.0) { + Ok(bytes) => contracts.push(CodeBytes(bytes)), + Err(StorageError::MissingTrieValue(_, _)) => { + tracing::warn!( + target: "client", + ?contract_hash, + chunk_production_key = ?key, + "Requested contract hash is not present in the storage" + ); + return Ok(()); + } + Err(err) => return Err(err.into()), + } + } + let response = ContractCodeResponse::encode(key.clone(), &contracts)?; + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::ContractCodeResponse(request.requester().clone(), response), + )); + Ok(()) + } + + /// Handles contract code responses message from chunk producer. + fn handle_contract_code_response( + &mut self, + response: ContractCodeResponse, + ) -> Result<(), Error> { + let key = response.chunk_production_key().clone(); + let contracts = response.decompress_contracts()?; + self.partial_witness_tracker.store_accessed_contract_codes(key, contracts) + } + + fn my_validator_signer(&self) -> Result, Error> { + self.my_signer.get().ok_or_else(|| Error::NotAValidator("not a validator".to_owned())) + } + + fn contract_deploys_encoder(&mut self, validators_count: usize) -> Arc { + self.contract_deploys_encoders.entry(validators_count) + } + + fn ordered_contract_deploys_validators( + &mut self, + key: &ChunkProductionKey, + ) -> Result, Error> { + let chunk_producers = HashSet::::from_iter( + self.epoch_manager.get_epoch_chunk_producers_for_shard(&key.epoch_id, key.shard_id)?, + ); + let mut validators = self + .epoch_manager + .get_epoch_all_validators(&key.epoch_id)? + .into_iter() + .filter(|stake| !chunk_producers.contains(stake.account_id())) + .map(|stake| stake.account_id().clone()) + .collect::>(); + validators.sort(); + Ok(validators) + } +} + +fn compress_witness(witness: &ChunkStateWitness) -> Result { + let shard_id_label = witness.chunk_header.shard_id().to_string(); + let encode_timer = near_chain::stateless_validation::metrics::CHUNK_STATE_WITNESS_ENCODE_TIME + .with_label_values(&[shard_id_label.as_str()]) + .start_timer(); + let (witness_bytes, raw_witness_size) = EncodedChunkStateWitness::encode(witness)?; + encode_timer.observe_duration(); + + near_chain::stateless_validation::metrics::record_witness_size_metrics( + raw_witness_size, + witness_bytes.size_bytes(), + witness, + ); + Ok(witness_bytes) +} + +fn contracts_cache_contains_contract( + cache: &dyn ContractRuntimeCache, + contract_hash: &CodeHash, + runtime_config: &RuntimeConfig, +) -> bool { + let cache_key = get_contract_cache_key(contract_hash.0, &runtime_config.wasm_config); + cache.memory_cache().contains(cache_key) || cache.has(&cache_key).is_ok_and(|has| has) +} diff --git a/chain/client/src/test_utils/setup.rs b/chain/client/src/test_utils/setup.rs index 22ca09bde99..37422fd7597 100644 --- a/chain/client/src/test_utils/setup.rs +++ b/chain/client/src/test_utils/setup.rs @@ -164,6 +164,7 @@ pub fn setup( epoch_manager.clone(), runtime.clone(), Arc::new(RayonAsyncComputationSpawner), + Arc::new(RayonAsyncComputationSpawner), )); let partial_witness_adapter = partial_witness_addr.with_auto_span_context(); diff --git a/integration-tests/src/test_loop/builder.rs b/integration-tests/src/test_loop/builder.rs index ae89b2eb31b..b8064024d16 100644 --- a/integration-tests/src/test_loop/builder.rs +++ b/integration-tests/src/test_loop/builder.rs @@ -724,6 +724,7 @@ impl TestLoopBuilder { epoch_manager.clone(), runtime_adapter.clone(), Arc::new(self.test_loop.async_computation_spawner(|_| Duration::milliseconds(80))), + Arc::new(self.test_loop.async_computation_spawner(|_| Duration::milliseconds(80))), ); let gc_actor = GCActor::new( diff --git a/integration-tests/src/tests/network/runner.rs b/integration-tests/src/tests/network/runner.rs index 8eecd5a410a..a6fe267d014 100644 --- a/integration-tests/src/tests/network/runner.rs +++ b/integration-tests/src/tests/network/runner.rs @@ -148,6 +148,7 @@ fn setup_network_node( epoch_manager, runtime, Arc::new(RayonAsyncComputationSpawner), + Arc::new(RayonAsyncComputationSpawner), )); shards_manager_adapter.bind(shards_manager_actor.with_auto_span_context()); let peer_manager = PeerManagerActor::spawn( diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 0b255586fa1..c7619d6c7b3 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -372,6 +372,7 @@ pub fn start_with_config_and_synchronization( epoch_manager.clone(), runtime.clone(), Arc::new(RayonAsyncComputationSpawner), + Arc::new(RayonAsyncComputationSpawner), )); let (_gc_actor, gc_arbiter) = spawn_actix_actor(GCActor::new( From 36136198d86c6857debfbcc6141d7206efe7196b Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Wed, 18 Dec 2024 12:48:00 +0200 Subject: [PATCH 15/17] remove file --- .../partial_witness_actor_v2.rs | 805 ------------------ 1 file changed, 805 deletions(-) delete mode 100644 chain/client/src/stateless_validation/partial_witness/partial_witness_actor_v2.rs diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor_v2.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor_v2.rs deleted file mode 100644 index 02c50312445..00000000000 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor_v2.rs +++ /dev/null @@ -1,805 +0,0 @@ -use std::collections::HashSet; -use std::num::NonZeroUsize; -use std::sync::Arc; - -use itertools::Itertools; -use lru::LruCache; -use near_async::futures::{AsyncComputationSpawner, AsyncComputationSpawnerExt}; -use near_async::messaging::{Actor, CanSend, Handler, Sender}; -use near_async::time::Clock; -use near_async::{MultiSend, MultiSenderFrom}; -use near_chain::types::RuntimeAdapter; -use near_chain::Error; -use near_chain_configs::MutableValidatorSigner; -use near_epoch_manager::EpochManagerAdapter; -use near_network::state_witness::{ - ChunkContractAccessesMessage, ChunkStateWitnessAckMessage, ContractCodeRequestMessage, - ContractCodeResponseMessage, PartialEncodedContractDeploysMessage, - PartialEncodedStateWitnessForwardMessage, PartialEncodedStateWitnessMessage, -}; -use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest}; -use near_parameters::RuntimeConfig; -use near_performance_metrics_macros::perf; -use near_primitives::reed_solomon::{ReedSolomonEncoder, ReedSolomonEncoderCache}; -use near_primitives::sharding::ShardChunkHeader; -use near_primitives::stateless_validation::contract_distribution::{ - ChunkContractAccesses, ChunkContractDeploys, CodeBytes, CodeHash, ContractCodeRequest, - ContractCodeResponse, ContractUpdates, MainTransitionKey, PartialEncodedContractDeploys, - PartialEncodedContractDeploysPart, -}; -use near_primitives::stateless_validation::partial_witness::PartialEncodedStateWitness; -use near_primitives::stateless_validation::state_witness::{ - ChunkStateWitness, ChunkStateWitnessAck, EncodedChunkStateWitness, -}; -use near_primitives::stateless_validation::stored_chunk_state_transition_data::StoredChunkStateTransitionData; -use near_primitives::stateless_validation::ChunkProductionKey; -use near_primitives::types::{AccountId, EpochId, ShardId}; -use near_primitives::validator_signer::ValidatorSigner; -use near_store::adapter::trie_store::TrieStoreAdapter; -use near_store::{DBCol, StorageError, TrieDBStorage, TrieStorage}; -use near_vm_runner::{get_contract_cache_key, ContractCode, ContractRuntimeCache}; -use rand::Rng; - -use crate::client_actor::ClientSenderForPartialWitness; -use crate::metrics; -use crate::stateless_validation::state_witness_tracker::ChunkStateWitnessTracker; -use crate::stateless_validation::validate::{ - validate_chunk_contract_accesses, validate_contract_code_request, - validate_partial_encoded_contract_deploys, validate_partial_encoded_state_witness, -}; - -use super::encoding::{CONTRACT_DEPLOYS_RATIO_DATA_PARTS, WITNESS_RATIO_DATA_PARTS}; -use super::partial_deploys_tracker::PartialEncodedContractDeploysTracker; -use super::partial_witness_tracker::PartialEncodedStateWitnessTracker; -use near_primitives::utils::compression::CompressedData; - -const PROCESSED_CONTRACT_CODE_REQUESTS_CACHE_SIZE: usize = 30; - -pub struct PartialWitnessActorState { - /// Validator signer to sign the state witness. This field is mutable and optional. Use with caution! - /// Lock the value of mutable validator signer for the duration of a request to ensure consistency. - /// Please note that the locked value should not be stored anywhere or passed through the thread boundary. - my_signer: MutableValidatorSigner, - /// Tracks the parts of the state witness sent from chunk producers to chunk validators. - partial_witness_tracker: PartialEncodedStateWitnessTracker, - partial_deploys_tracker: PartialEncodedContractDeploysTracker, - /// Tracks a collection of state witnesses sent from chunk producers to chunk validators. - state_witness_tracker: ChunkStateWitnessTracker, - /// AccountId in the key corresponds to the requester (chunk validator). - processed_contract_code_requests: LruCache<(ChunkProductionKey, AccountId), ()>, -} - -pub struct PartialWitnessActor { - /// Adapter to send messages to the network. - network_adapter: PeerManagerAdapter, - epoch_manager: Arc, - runtime: Arc, - /// Reed Solomon encoder for encoding state witness parts. - /// We keep one wrapper for each length of chunk_validators to avoid re-creating the encoder. - witness_encoders: ReedSolomonEncoderCache, - /// Same as above for contract deploys. - contract_deploys_encoders: ReedSolomonEncoderCache, - compile_contracts_spawner: Arc, - state: PartialWitnessActorState, -} - -impl Actor for PartialWitnessActor {} - -#[derive(actix::Message, Debug)] -#[rtype(result = "()")] -pub struct DistributeStateWitnessRequest { - pub state_witness: ChunkStateWitness, - pub contract_updates: ContractUpdates, - pub main_transition_shard_id: ShardId, -} - -#[derive(Clone, MultiSend, MultiSenderFrom)] -pub struct PartialWitnessSenderForClient { - pub distribute_chunk_state_witness: Sender, -} - -impl Handler for PartialWitnessActor { - #[perf] - fn handle(&mut self, msg: DistributeStateWitnessRequest) { - if let Err(err) = self.handle_distribute_state_witness_request(msg) { - tracing::error!(target: "client", ?err, "Failed to handle distribute chunk state witness request"); - } - } -} - -impl Handler for PartialWitnessActor { - fn handle(&mut self, msg: ChunkStateWitnessAckMessage) { - self.handle_chunk_state_witness_ack(msg.0); - } -} - -impl Handler for PartialWitnessActor { - fn handle(&mut self, msg: PartialEncodedStateWitnessMessage) { - if let Err(err) = self.handle_partial_encoded_state_witness(msg.0) { - tracing::error!(target: "client", ?err, "Failed to handle PartialEncodedStateWitnessMessage"); - } - } -} - -impl Handler for PartialWitnessActor { - fn handle(&mut self, msg: PartialEncodedStateWitnessForwardMessage) { - if let Err(err) = self.handle_partial_encoded_state_witness_forward(msg.0) { - tracing::error!(target: "client", ?err, "Failed to handle PartialEncodedStateWitnessForwardMessage"); - } - } -} - -impl Handler for PartialWitnessActor { - fn handle(&mut self, msg: ChunkContractAccessesMessage) { - if let Err(err) = self.handle_chunk_contract_accesses(msg.0) { - tracing::error!(target: "client", ?err, "Failed to handle ChunkContractAccessesMessage"); - } - } -} - -impl Handler for PartialWitnessActor { - fn handle(&mut self, msg: PartialEncodedContractDeploysMessage) { - if let Err(err) = self.handle_partial_encoded_contract_deploys(msg.0) { - tracing::error!(target: "client", ?err, "Failed to handle PartialEncodedContractDeploysMessage"); - } - } -} - -impl Handler for PartialWitnessActor { - fn handle(&mut self, msg: ContractCodeRequestMessage) { - if let Err(err) = self.handle_contract_code_request(msg.0) { - tracing::error!(target: "client", ?err, "Failed to handle ContractCodeRequestMessage"); - } - } -} - -impl Handler for PartialWitnessActor { - fn handle(&mut self, msg: ContractCodeResponseMessage) { - if let Err(err) = self.handle_contract_code_response(msg.0) { - tracing::error!(target: "client", ?err, "Failed to handle ContractCodeResponseMessage"); - } - } -} - -impl PartialWitnessActor { - pub fn new( - clock: Clock, - network_adapter: PeerManagerAdapter, - client_sender: ClientSenderForPartialWitness, - my_signer: MutableValidatorSigner, - epoch_manager: Arc, - runtime: Arc, - compile_contracts_spawner: Arc, - ) -> Self { - let partial_witness_tracker = - PartialEncodedStateWitnessTracker::new(client_sender, epoch_manager.clone()); - Self { - network_adapter, - my_signer, - epoch_manager, - partial_witness_tracker, - partial_deploys_tracker: PartialEncodedContractDeploysTracker::new(), - state_witness_tracker: ChunkStateWitnessTracker::new(clock), - runtime, - witness_encoders: ReedSolomonEncoderCache::new(WITNESS_RATIO_DATA_PARTS), - contract_deploys_encoders: ReedSolomonEncoderCache::new( - CONTRACT_DEPLOYS_RATIO_DATA_PARTS, - ), - compile_contracts_spawner, - processed_contract_code_requests: LruCache::new( - NonZeroUsize::new(PROCESSED_CONTRACT_CODE_REQUESTS_CACHE_SIZE).unwrap(), - ), - } - } - - fn handle_distribute_state_witness_request( - &mut self, - msg: DistributeStateWitnessRequest, - ) -> Result<(), Error> { - let DistributeStateWitnessRequest { - state_witness, - contract_updates: ContractUpdates { contract_accesses, contract_deploys }, - main_transition_shard_id, - } = msg; - - tracing::debug!( - target: "client", - chunk_hash=?state_witness.chunk_header.chunk_hash(), - "distribute_chunk_state_witness", - ); - - // We send the state-witness and contract-updates in the following order: - // 1. We send the hashes of the contract code accessed (if contract code is excluded from witness and any contracts are called) - // before the state witness in order to allow validators to check and request missing contract code, while waiting for witness parts. - // 2. We send the state witness parts to witness-part owners. - // 3. We send the contract deploys parts to other validators (that do not validate the witness in this turn). This is lower priority - // since the newly-deployed contracts will be needed by other validators in later turns. - - let signer = self.my_validator_signer()?; - let key = state_witness.chunk_production_key(); - let chunk_validators = self - .epoch_manager - .get_chunk_validator_assignments(&key.epoch_id, key.shard_id, key.height_created) - .expect("Chunk validators must be defined") - .ordered_chunk_validators(); - - if !contract_accesses.is_empty() { - self.send_contract_accesses_to_chunk_validators( - key.clone(), - contract_accesses, - MainTransitionKey { - block_hash: state_witness.main_state_transition.block_hash, - shard_id: main_transition_shard_id, - }, - &chunk_validators, - &signer, - ); - } - - let witness_bytes = compress_witness(&state_witness)?; - self.send_state_witness_parts( - key.epoch_id, - &state_witness.chunk_header, - witness_bytes, - &chunk_validators, - &signer, - )?; - - if !contract_deploys.is_empty() { - self.send_chunk_contract_deploys_parts(key, contract_deploys)?; - } - - Ok(()) - } - - // Function to generate the parts of the state witness and return them as a tuple of chunk_validator and part. - fn generate_state_witness_parts( - &mut self, - epoch_id: EpochId, - chunk_header: &ShardChunkHeader, - witness_bytes: EncodedChunkStateWitness, - chunk_validators: &[AccountId], - signer: &ValidatorSigner, - ) -> Result, Error> { - tracing::debug!( - target: "client", - chunk_hash=?chunk_header.chunk_hash(), - ?chunk_validators, - "generate_state_witness_parts", - ); - - // Break the state witness into parts using Reed Solomon encoding. - let encoder = self.witness_encoders.entry(chunk_validators.len()); - let (parts, encoded_length) = encoder.encode(&witness_bytes); - - Ok(chunk_validators - .iter() - .zip_eq(parts) - .enumerate() - .map(|(part_ord, (chunk_validator, part))| { - // It's fine to unwrap part here as we just constructed the parts above and we expect - // all of them to be present. - let partial_witness = PartialEncodedStateWitness::new( - epoch_id, - chunk_header.clone(), - part_ord, - part.unwrap().to_vec(), - encoded_length, - signer, - ); - (chunk_validator.clone(), partial_witness) - }) - .collect_vec()) - } - - fn generate_contract_deploys_parts( - &mut self, - key: &ChunkProductionKey, - deploys: ChunkContractDeploys, - ) -> Result, Error> { - let validators = self.ordered_contract_deploys_validators(key)?; - // Note that target validators do not include the chunk producers, and thus in some case - // (eg. tests or small networks) there may be no other validators to send the new contracts to. - if validators.is_empty() { - return Ok(vec![]); - } - - let encoder = self.contract_deploys_encoder(validators.len()); - let (parts, encoded_length) = encoder.encode(&deploys); - let signer = self.my_validator_signer()?; - - Ok(validators - .into_iter() - .zip_eq(parts) - .enumerate() - .map(|(part_ord, (validator, part))| { - let partial_deploys = PartialEncodedContractDeploys::new( - key.clone(), - PartialEncodedContractDeploysPart { - part_ord, - data: part.unwrap().to_vec().into_boxed_slice(), - encoded_length, - }, - &signer, - ); - (validator, partial_deploys) - }) - .collect_vec()) - } - - // Break the state witness into parts and send each part to the corresponding chunk validator owner. - // The chunk validator owner will then forward the part to all other chunk validators. - // Each chunk validator would collect the parts and reconstruct the state witness. - fn send_state_witness_parts( - &mut self, - epoch_id: EpochId, - chunk_header: &ShardChunkHeader, - witness_bytes: EncodedChunkStateWitness, - chunk_validators: &[AccountId], - signer: &ValidatorSigner, - ) -> Result<(), Error> { - // Capture these values first, as the sources are consumed before calling record_witness_sent. - let chunk_hash = chunk_header.chunk_hash(); - let witness_size_in_bytes = witness_bytes.size_bytes(); - - // Record time taken to encode the state witness parts. - let shard_id_label = chunk_header.shard_id().to_string(); - let encode_timer = metrics::PARTIAL_WITNESS_ENCODE_TIME - .with_label_values(&[shard_id_label.as_str()]) - .start_timer(); - let validator_witness_tuple = self.generate_state_witness_parts( - epoch_id, - chunk_header, - witness_bytes, - chunk_validators, - signer, - )?; - encode_timer.observe_duration(); - - // Record the witness in order to match the incoming acks for measuring round-trip times. - // See process_chunk_state_witness_ack for the handling of the ack messages. - self.state_witness_tracker.record_witness_sent( - chunk_hash, - witness_size_in_bytes, - validator_witness_tuple.len(), - ); - - // Send the parts to the corresponding chunk validator owners. - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple), - )); - Ok(()) - } - - /// Sends the witness part to the chunk validators, except the chunk producer that generated the witness part. - fn forward_state_witness_part( - &self, - partial_witness: PartialEncodedStateWitness, - ) -> Result<(), Error> { - let ChunkProductionKey { shard_id, epoch_id, height_created } = - partial_witness.chunk_production_key(); - let chunk_producer = self - .epoch_manager - .get_chunk_producer_info(&ChunkProductionKey { epoch_id, height_created, shard_id })? - .take_account_id(); - - // Forward witness part to chunk validators except the validator that produced the chunk and witness. - let target_chunk_validators = self - .epoch_manager - .get_chunk_validator_assignments(&epoch_id, shard_id, height_created)? - .ordered_chunk_validators() - .into_iter() - .filter(|validator| validator != &chunk_producer) - .collect(); - - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::PartialEncodedStateWitnessForward( - target_chunk_validators, - partial_witness, - ), - )); - Ok(()) - } - - /// Function to handle receiving partial_encoded_state_witness message from chunk producer. - fn handle_partial_encoded_state_witness( - &mut self, - partial_witness: PartialEncodedStateWitness, - ) -> Result<(), Error> { - tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessMessage"); - - let signer = self.my_validator_signer()?; - // Validate the partial encoded state witness and forward the part to all the chunk validators. - if validate_partial_encoded_state_witness( - self.epoch_manager.as_ref(), - &partial_witness, - &signer, - self.runtime.store(), - )? { - self.forward_state_witness_part(partial_witness)?; - } - - Ok(()) - } - - /// Function to handle receiving partial_encoded_state_witness_forward message from chunk producer. - fn handle_partial_encoded_state_witness_forward( - &mut self, - partial_witness: PartialEncodedStateWitness, - ) -> Result<(), Error> { - tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessForwardMessage"); - - let signer = self.my_validator_signer()?; - // Validate the partial encoded state witness and store the partial encoded state witness. - if validate_partial_encoded_state_witness( - self.epoch_manager.as_ref(), - &partial_witness, - &signer, - self.runtime.store(), - )? { - self.partial_witness_tracker.store_partial_encoded_state_witness(partial_witness)?; - } - - Ok(()) - } - - /// Handles partial contract deploy message received from a peer. - /// - /// This message may belong to one of two steps of distributing contract code. In the first step the code is compressed - /// and encoded into parts using Reed Solomon encoding and each part is sent to one of the validators (part owner). - /// See `send_chunk_contract_deploys_parts` for the code implementing this. In the second step each validator (part-owner) - /// forwards the part it receives to other validators. - fn handle_partial_encoded_contract_deploys( - &mut self, - partial_deploys: PartialEncodedContractDeploys, - ) -> Result<(), Error> { - tracing::debug!(target: "client", ?partial_deploys, "Receive PartialEncodedContractDeploys"); - if !validate_partial_encoded_contract_deploys( - self.epoch_manager.as_ref(), - &partial_deploys, - self.runtime.store(), - )? { - return Ok(()); - } - if self.partial_deploys_tracker.already_processed(&partial_deploys) { - return Ok(()); - } - let key = partial_deploys.chunk_production_key().clone(); - let validators = self.ordered_contract_deploys_validators(&key)?; - if validators.is_empty() { - // Note that target validators do not include the chunk producers, and thus in some case - // (eg. tests or small networks) there may be no other validators to send the new contracts to. - // In such case, the message we are handling here should not be sent in the first place, - // unless there is a bug or adversarial behavior that sends the message. - debug_assert!(false, "No target validators, we must not receive this message"); - return Ok(()); - } - - // Forward to other validators if the part received is my part - let signer = self.my_validator_signer()?; - let my_account_id = signer.validator_id(); - let Some(my_part_ord) = validators.iter().position(|validator| validator == my_account_id) - else { - tracing::warn!( - target: "client", - ?key, - "Validator is not a part of contract deploys distribution" - ); - return Ok(()); - }; - if partial_deploys.part().part_ord == my_part_ord { - let other_validators = validators - .iter() - .filter(|&validator| validator != my_account_id) - .cloned() - .collect_vec(); - if !other_validators.is_empty() { - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::PartialEncodedContractDeploys( - other_validators, - partial_deploys.clone(), - ), - )); - } - } - - // Store part - let encoder = self.contract_deploys_encoder(validators.len()); - if let Some(deploys) = self - .partial_deploys_tracker - .store_partial_encoded_contract_deploys(partial_deploys, encoder)? - { - let contracts = match deploys.decompress_contracts() { - Ok(contracts) => contracts, - Err(err) => { - tracing::warn!( - target: "client", - ?err, - ?key, - "Failed to decompress deployed contracts." - ); - return Ok(()); - } - }; - let contract_codes = contracts.into_iter().map(|contract| contract.into()).collect(); - let runtime = self.runtime.clone(); - self.compile_contracts_spawner.spawn("precompile_deployed_contracts", move || { - if let Err(err) = runtime.precompile_contracts(&key.epoch_id, contract_codes) { - tracing::error!( - target: "client", - ?err, - ?key, - "Failed to precompile deployed contracts." - ); - } - }); - } - - Ok(()) - } - - /// Handles the state witness ack message from the chunk validator. - /// It computes the round-trip time between sending the state witness and receiving - /// the ack message and updates the corresponding metric with it. - /// Currently we do not raise an error for handling of witness-ack messages, - /// as it is used only for tracking some networking metrics. - fn handle_chunk_state_witness_ack(&mut self, witness_ack: ChunkStateWitnessAck) { - self.state_witness_tracker.on_witness_ack_received(witness_ack); - } - - /// Handles contract code accesses message from chunk producer. - /// This is sent in parallel to a chunk state witness and contains the hashes - /// of the contract code accessed when applying the previous chunk of the witness. - fn handle_chunk_contract_accesses( - &mut self, - accesses: ChunkContractAccesses, - ) -> Result<(), Error> { - let signer = self.my_validator_signer()?; - if !validate_chunk_contract_accesses( - self.epoch_manager.as_ref(), - &accesses, - &signer, - self.runtime.store(), - )? { - return Ok(()); - } - let key = accesses.chunk_production_key(); - let contracts_cache = self.runtime.compiled_contract_cache(); - let runtime_config = self - .runtime - .get_runtime_config(self.epoch_manager.get_epoch_protocol_version(&key.epoch_id)?)?; - let missing_contract_hashes = HashSet::from_iter( - accesses - .contracts() - .iter() - .filter(|&hash| { - !contracts_cache_contains_contract(contracts_cache, hash, &runtime_config) - }) - .cloned(), - ); - if missing_contract_hashes.is_empty() { - return Ok(()); - } - self.partial_witness_tracker - .store_accessed_contract_hashes(key.clone(), missing_contract_hashes.clone())?; - let random_chunk_producer = { - let mut chunk_producers = self - .epoch_manager - .get_epoch_chunk_producers_for_shard(&key.epoch_id, key.shard_id)?; - chunk_producers.swap_remove(rand::thread_rng().gen_range(0..chunk_producers.len())) - }; - let request = ContractCodeRequest::new( - key.clone(), - missing_contract_hashes, - accesses.main_transition().clone(), - &signer, - ); - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::ContractCodeRequest(random_chunk_producer, request), - )); - Ok(()) - } - - /// Sends the contract accesses to the same chunk validators - /// (except for the chunk producers that track the same shard), - /// which will receive the state witness for the new chunk. - fn send_contract_accesses_to_chunk_validators( - &self, - key: ChunkProductionKey, - contract_accesses: HashSet, - main_transition: MainTransitionKey, - chunk_validators: &[AccountId], - my_signer: &ValidatorSigner, - ) { - let chunk_producers: HashSet = self - .epoch_manager - .get_epoch_chunk_producers_for_shard(&key.epoch_id, key.shard_id) - .expect("Chunk producers must be defined") - .into_iter() - .collect(); - - // Exclude chunk producers that track the same shard from the target list, since they track the state that contains the respective code. - let target_chunk_validators = chunk_validators - .iter() - .filter(|validator| !chunk_producers.contains(*validator)) - .cloned() - .collect(); - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::ChunkContractAccesses( - target_chunk_validators, - ChunkContractAccesses::new(key, contract_accesses, main_transition, my_signer), - ), - )); - } - - /// Retrieves the code for the given contract hashes and distributes them to validator in parts. - /// - /// This implements the first step of distributing contract code to validators where the contract codes - /// are compressed and encoded into parts using Reed Solomon encoding, and then each part is sent to - /// one of the validators (part-owner). Second step of the distribution, where each validator (part-owner) - /// forwards the part it receives is implemented in `handle_partial_encoded_contract_deploys`. - fn send_chunk_contract_deploys_parts( - &mut self, - key: ChunkProductionKey, - contract_codes: Vec, - ) -> Result<(), Error> { - let contracts = contract_codes.into_iter().map(|contract| contract.into()).collect(); - let compressed_deploys = ChunkContractDeploys::compress_contracts(&contracts)?; - let validator_parts = self.generate_contract_deploys_parts(&key, compressed_deploys)?; - for (part_owner, deploys_part) in validator_parts.into_iter() { - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::PartialEncodedContractDeploys(vec![part_owner], deploys_part), - )); - } - Ok(()) - } - - /// Handles contract code requests message from chunk validators. - /// As response to this message, sends the contract code requested to - /// the requesting chunk validator for the given hashes of the contract code. - fn handle_contract_code_request(&mut self, request: ContractCodeRequest) -> Result<(), Error> { - if !validate_contract_code_request( - self.epoch_manager.as_ref(), - &request, - self.runtime.store(), - )? { - return Ok(()); - } - - let key = request.chunk_production_key(); - let processed_requests_key = (key.clone(), request.requester().clone()); - if self.processed_contract_code_requests.contains(&processed_requests_key) { - tracing::warn!( - target: "client", - ?processed_requests_key, - "Contract code request from this account was already processed" - ); - return Ok(()); - } - self.processed_contract_code_requests.push(processed_requests_key, ()); - - let _timer = near_chain::stateless_validation::metrics::PROCESS_CONTRACT_CODE_REQUEST_TIME - .with_label_values(&[&key.shard_id.to_string()]) - .start_timer(); - - let main_transition_key = request.main_transition(); - let Some(transition_data) = - self.runtime.store().get_ser::( - DBCol::StateTransitionData, - &near_primitives::utils::get_block_shard_id( - &main_transition_key.block_hash, - main_transition_key.shard_id, - ), - )? - else { - tracing::warn!( - target: "client", - ?key, - ?main_transition_key, - "Missing state transition data" - ); - return Ok(()); - }; - let valid_accesses: HashSet = - transition_data.contract_accesses().iter().cloned().collect(); - - let storage = TrieDBStorage::new( - TrieStoreAdapter::new(self.runtime.store().clone()), - self.epoch_manager.shard_id_to_uid( - main_transition_key.shard_id, - &self.epoch_manager.get_epoch_id(&main_transition_key.block_hash)?, - )?, - ); - let mut contracts = Vec::new(); - for contract_hash in request.contracts() { - if !valid_accesses.contains(contract_hash) { - tracing::warn!( - target: "client", - ?key, - ?contract_hash, - "Requested contract code was not accessed when applying the chunk" - ); - return Ok(()); - } - match storage.retrieve_raw_bytes(&contract_hash.0) { - Ok(bytes) => contracts.push(CodeBytes(bytes)), - Err(StorageError::MissingTrieValue(_, _)) => { - tracing::warn!( - target: "client", - ?contract_hash, - chunk_production_key = ?key, - "Requested contract hash is not present in the storage" - ); - return Ok(()); - } - Err(err) => return Err(err.into()), - } - } - let response = ContractCodeResponse::encode(key.clone(), &contracts)?; - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::ContractCodeResponse(request.requester().clone(), response), - )); - Ok(()) - } - - /// Handles contract code responses message from chunk producer. - fn handle_contract_code_response( - &mut self, - response: ContractCodeResponse, - ) -> Result<(), Error> { - let key = response.chunk_production_key().clone(); - let contracts = response.decompress_contracts()?; - self.partial_witness_tracker.store_accessed_contract_codes(key, contracts) - } - - fn my_validator_signer(&self) -> Result, Error> { - self.my_signer.get().ok_or_else(|| Error::NotAValidator("not a validator".to_owned())) - } - - fn contract_deploys_encoder(&mut self, validators_count: usize) -> Arc { - self.contract_deploys_encoders.entry(validators_count) - } - - fn ordered_contract_deploys_validators( - &mut self, - key: &ChunkProductionKey, - ) -> Result, Error> { - let chunk_producers = HashSet::::from_iter( - self.epoch_manager.get_epoch_chunk_producers_for_shard(&key.epoch_id, key.shard_id)?, - ); - let mut validators = self - .epoch_manager - .get_epoch_all_validators(&key.epoch_id)? - .into_iter() - .filter(|stake| !chunk_producers.contains(stake.account_id())) - .map(|stake| stake.account_id().clone()) - .collect::>(); - validators.sort(); - Ok(validators) - } -} - -fn compress_witness(witness: &ChunkStateWitness) -> Result { - let shard_id_label = witness.chunk_header.shard_id().to_string(); - let encode_timer = near_chain::stateless_validation::metrics::CHUNK_STATE_WITNESS_ENCODE_TIME - .with_label_values(&[shard_id_label.as_str()]) - .start_timer(); - let (witness_bytes, raw_witness_size) = EncodedChunkStateWitness::encode(witness)?; - encode_timer.observe_duration(); - - near_chain::stateless_validation::metrics::record_witness_size_metrics( - raw_witness_size, - witness_bytes.size_bytes(), - witness, - ); - Ok(witness_bytes) -} - -fn contracts_cache_contains_contract( - cache: &dyn ContractRuntimeCache, - contract_hash: &CodeHash, - runtime_config: &RuntimeConfig, -) -> bool { - let cache_key = get_contract_cache_key(contract_hash.0, &runtime_config.wasm_config); - cache.memory_cache().contains(cache_key) || cache.has(&cache_key).is_ok_and(|has| has) -} From c12d23b158bc8b3f7ccf98fad191f0660bb13176 Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Wed, 18 Dec 2024 12:51:45 +0200 Subject: [PATCH 16/17] clippy --- .../partial_witness/partial_witness_actor.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index 0f2762e5391..45f527c1281 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -371,7 +371,7 @@ impl PartialWitnessActor { } /// Sends the witness part to the chunk validators, except the chunk producer that generated the witness part. - fn forward_state_witness_part( + fn _forward_state_witness_part( &self, partial_witness: PartialEncodedStateWitness, ) -> Result<(), Error> { @@ -439,7 +439,6 @@ impl PartialWitnessActor { runtime_adapter.store(), ).unwrap() { forward_state_witness_part_v2(partial_witness, - chunk_producer, target_chunk_validators, network_adapter).unwrap(); } @@ -833,13 +832,9 @@ fn contracts_cache_contains_contract( /// Sends the witness part to the chunk validators, except the chunk producer that generated the witness part. fn forward_state_witness_part_v2( partial_witness: PartialEncodedStateWitness, - chunk_producer: AccountId, target_chunk_validators: Vec, network_adapter: PeerManagerAdapter, ) -> Result<(), Error> { - let ChunkProductionKey { shard_id, epoch_id, height_created } = - partial_witness.chunk_production_key(); - network_adapter.send(PeerManagerMessageRequest::NetworkRequests( NetworkRequests::PartialEncodedStateWitnessForward( target_chunk_validators, From 1de100c9c738f8a36da77cca9267b8e238afccc7 Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Thu, 19 Dec 2024 11:56:07 +0200 Subject: [PATCH 17/17] modify handle_partial_encoded_state_witness_forward --- .../partial_witness/partial_witness_actor.rs | 99 ++++++++++++++----- 1 file changed, 74 insertions(+), 25 deletions(-) diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index 45f527c1281..f556d64085e 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; use std::num::NonZeroUsize; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use itertools::Itertools; use lru::LruCache; @@ -65,7 +65,7 @@ pub struct PartialWitnessActor { epoch_manager: Arc, runtime: Arc, /// Tracks the parts of the state witness sent from chunk producers to chunk validators. - partial_witness_tracker: PartialEncodedStateWitnessTracker, + partial_witness_tracker: Arc>, partial_deploys_tracker: PartialEncodedContractDeploysTracker, /// Tracks a collection of state witnesses sent from chunk producers to chunk validators. state_witness_tracker: ChunkStateWitnessTracker, @@ -169,8 +169,10 @@ impl PartialWitnessActor { compile_contracts_spawner: Arc, partial_witness_spawner: Arc, ) -> Self { - let partial_witness_tracker = - PartialEncodedStateWitnessTracker::new(client_sender, epoch_manager.clone()); + let partial_witness_tracker = Arc::new(Mutex::new(PartialEncodedStateWitnessTracker::new( + client_sender, + epoch_manager.clone(), + ))); Self { network_adapter, my_signer, @@ -405,6 +407,7 @@ impl PartialWitnessActor { &mut self, partial_witness: PartialEncodedStateWitness, ) -> Result<(), Error> { + tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessMessage"); let signer = self.my_validator_signer()?; let epoch_manager = self.epoch_manager.clone(); let runtime_adapter = self.runtime.clone(); @@ -429,18 +432,34 @@ impl PartialWitnessActor { let network_adapter = self.network_adapter.clone(); self.partial_witness_spawner.spawn("handle_partial_encoded_state_witness", move || { - tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessMessage"); - // Validate the partial encoded state witness and forward the part to all the chunk validators. - if validate_partial_encoded_state_witness( + let validation = validate_partial_encoded_state_witness( epoch_manager.as_ref(), &partial_witness, &signer, runtime_adapter.store(), - ).unwrap() { - forward_state_witness_part_v2(partial_witness, - target_chunk_validators, - network_adapter).unwrap(); + ); + match validation { + Ok(true) => { + forward_state_witness_part_v2( + partial_witness, + target_chunk_validators, + network_adapter, + ); + } + Ok(false) => { + tracing::warn!( + target: "client", + "Received partial encoded state witness that is not valid" + ); + } + Err(err) => { + tracing::warn!( + target: "client", + "Encountered error during validation: {}", + err + ); + } } }); @@ -455,15 +474,42 @@ impl PartialWitnessActor { tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessForwardMessage"); let signer = self.my_validator_signer()?; - // Validate the partial encoded state witness and store the partial encoded state witness. - if validate_partial_encoded_state_witness( - self.epoch_manager.as_ref(), - &partial_witness, - &signer, - self.runtime.store(), - )? { - self.partial_witness_tracker.store_partial_encoded_state_witness(partial_witness)?; - } + let partial_witness_tracker = self.partial_witness_tracker.clone(); + let epoch_manager = self.epoch_manager.clone(); + let runtime_adapter = self.runtime.clone(); + self.partial_witness_spawner.spawn( + "handle_partial_encoded_state_witness_forward", + move || { + // Validate the partial encoded state witness and store the partial encoded state witness. + let validation = validate_partial_encoded_state_witness( + epoch_manager.as_ref(), + &partial_witness, + &signer, + runtime_adapter.store(), + ); + match validation { + Ok(true) => { + let mut partial_witness_tracker = partial_witness_tracker.lock().unwrap(); + partial_witness_tracker + .store_partial_encoded_state_witness(partial_witness) + .unwrap(); + } + Ok(false) => { + tracing::warn!( + target: "client", + "Received partial encoded state witness that is not valid" + ); + } + Err(err) => { + tracing::warn!( + target: "client", + "Encountered error during validation: {}", + err + ); + } + } + }, + ); Ok(()) } @@ -605,8 +651,11 @@ impl PartialWitnessActor { if missing_contract_hashes.is_empty() { return Ok(()); } - self.partial_witness_tracker - .store_accessed_contract_hashes(key.clone(), missing_contract_hashes.clone())?; + { + let mut partial_witness_tracker = self.partial_witness_tracker.lock().unwrap(); + partial_witness_tracker + .store_accessed_contract_hashes(key.clone(), missing_contract_hashes.clone())?; + } let random_chunk_producer = { let mut chunk_producers = self .epoch_manager @@ -774,7 +823,8 @@ impl PartialWitnessActor { ) -> Result<(), Error> { let key = response.chunk_production_key().clone(); let contracts = response.decompress_contracts()?; - self.partial_witness_tracker.store_accessed_contract_codes(key, contracts) + let mut partial_witness_tracker = self.partial_witness_tracker.lock().unwrap(); + partial_witness_tracker.store_accessed_contract_codes(key, contracts) } fn my_validator_signer(&self) -> Result, Error> { @@ -834,12 +884,11 @@ fn forward_state_witness_part_v2( partial_witness: PartialEncodedStateWitness, target_chunk_validators: Vec, network_adapter: PeerManagerAdapter, -) -> Result<(), Error> { +) { network_adapter.send(PeerManagerMessageRequest::NetworkRequests( NetworkRequests::PartialEncodedStateWitnessForward( target_chunk_validators, partial_witness, ), )); - Ok(()) }