Skip to content

Commit

Permalink
Merge branch 'stefan/improved_parallel_forknet' into stefan/improved_…
Browse files Browse the repository at this point in the history
…parallel_forknet_and_peer_manager
  • Loading branch information
stedfn committed Jan 3, 2025
2 parents 39240a0 + a052e36 commit 35247ff
Show file tree
Hide file tree
Showing 22 changed files with 331 additions and 194 deletions.
62 changes: 31 additions & 31 deletions chain/chain/src/stateless_validation/chunk_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,11 @@ fn get_state_witness_block_range(
last_chunk_shard_id: ShardId,
}

let initial_prev_hash = *state_witness.chunk_header.prev_block_hash();
let initial_prev_hash = *state_witness.inner.chunk_header.prev_block_hash();
let initial_prev_block = store.get_block(&initial_prev_hash)?;
let initial_shard_layout =
epoch_manager.get_shard_layout_from_prev_block(&initial_prev_hash)?;
let initial_shard_id = state_witness.chunk_header.shard_id();
let initial_shard_id = state_witness.inner.chunk_header.shard_id();
// Check that shard id is present in current epoch.
// TODO: consider more proper way to validate this.
let _ = initial_shard_layout.get_shard_index(initial_shard_id)?;
Expand Down Expand Up @@ -333,8 +333,8 @@ pub fn pre_validate_chunk_state_witness(

// Ensure that the chunk header version is supported in this protocol version
let protocol_version =
epoch_manager.get_epoch_info(&state_witness.epoch_id)?.protocol_version();
state_witness.chunk_header.validate_version(protocol_version)?;
epoch_manager.get_epoch_info(&state_witness.inner.epoch_id)?.protocol_version();
state_witness.inner.chunk_header.validate_version(protocol_version)?;

// First, go back through the blockchain history to locate the last new chunk
// and last last new chunk for the shard.
Expand All @@ -348,19 +348,19 @@ pub fn pre_validate_chunk_state_witness(

let receipts_to_apply = validate_source_receipt_proofs(
epoch_manager,
&state_witness.source_receipt_proofs,
&state_witness.inner.source_receipt_proofs,
&blocks_after_last_last_chunk,
last_chunk_shard_layout,
last_chunk_shard_id,
)?;
let applied_receipts_hash = hash(&borsh::to_vec(receipts_to_apply.as_slice()).unwrap());
if applied_receipts_hash != state_witness.applied_receipts_hash {
if applied_receipts_hash != state_witness.inner.applied_receipts_hash {
return Err(Error::InvalidChunkStateWitness(format!(
"Receipts hash {:?} does not match expected receipts hash {:?}",
applied_receipts_hash, state_witness.applied_receipts_hash
applied_receipts_hash, state_witness.inner.applied_receipts_hash
)));
}
let (tx_root_from_state_witness, _) = merklize(&state_witness.transactions);
let (tx_root_from_state_witness, _) = merklize(&state_witness.inner.transactions);
let last_chunk_block = blocks_after_last_last_chunk.first().ok_or_else(|| {
Error::Other("blocks_after_last_last_chunk is empty, this should be impossible!".into())
})?;
Expand All @@ -374,15 +374,15 @@ pub fn pre_validate_chunk_state_witness(
}

let current_protocol_version =
epoch_manager.get_epoch_protocol_version(&state_witness.epoch_id)?;
epoch_manager.get_epoch_protocol_version(&state_witness.inner.epoch_id)?;
if !checked_feature!(
"protocol_feature_relaxed_chunk_validation",
RelaxedChunkValidation,
current_protocol_version
) {
let new_transactions = &state_witness.new_transactions;
let new_transactions = &state_witness.inner.new_transactions;
let (new_tx_root_from_state_witness, _) = merklize(&new_transactions);
let chunk_tx_root = state_witness.chunk_header.tx_root();
let chunk_tx_root = state_witness.inner.chunk_header.tx_root();
if new_tx_root_from_state_witness != chunk_tx_root {
return Err(Error::InvalidChunkStateWitness(format!(
"Witness new transactions root {:?} does not match chunk {:?}",
Expand All @@ -392,21 +392,21 @@ pub fn pre_validate_chunk_state_witness(
// Verify that all proposed transactions are valid.
if !new_transactions.is_empty() {
let transactions_validation_storage_config = RuntimeStorageConfig {
state_root: state_witness.chunk_header.prev_state_root(),
state_root: state_witness.inner.chunk_header.prev_state_root(),
use_flat_storage: true,
source: StorageDataSource::Recorded(PartialStorage {
nodes: state_witness.new_transactions_validation_state.clone(),
nodes: state_witness.inner.new_transactions_validation_state.clone(),
}),
state_patch: Default::default(),
};

match validate_prepared_transactions(
chain,
runtime_adapter,
&state_witness.chunk_header,
&state_witness.inner.chunk_header,
transactions_validation_storage_config,
&new_transactions,
&state_witness.transactions,
&state_witness.inner.transactions,
) {
Ok(result) => {
if result.transactions.len() != new_transactions.len() {
Expand Down Expand Up @@ -450,7 +450,7 @@ pub fn pre_validate_chunk_state_witness(
} else {
MainTransition::NewChunk(NewChunkData {
chunk_header: last_chunk_block.chunks().get(last_chunk_shard_index).unwrap().clone(),
transactions: state_witness.transactions.clone(),
transactions: state_witness.inner.transactions.clone(),
receipts: receipts_to_apply,
block: Chain::get_apply_chunk_block_context(
epoch_manager,
Expand All @@ -461,7 +461,7 @@ pub fn pre_validate_chunk_state_witness(
is_first_block_with_chunk_of_version: false,
storage_context: StorageContext {
storage_data_source: StorageDataSource::Recorded(PartialStorage {
nodes: state_witness.main_state_transition.base_state.clone(),
nodes: state_witness.inner.main_state_transition.base_state.clone(),
}),
state_patch: Default::default(),
},
Expand Down Expand Up @@ -596,13 +596,13 @@ pub fn validate_chunk_state_witness(
main_state_transition_cache: &MainStateTransitionCache,
) -> Result<(), Error> {
let _timer = crate::stateless_validation::metrics::CHUNK_STATE_WITNESS_VALIDATION_TIME
.with_label_values(&[&state_witness.chunk_header.shard_id().to_string()])
.with_label_values(&[&state_witness.inner.chunk_header.shard_id().to_string()])
.start_timer();
let span = tracing::debug_span!(target: "client", "validate_chunk_state_witness").entered();
let witness_shard_layout = epoch_manager.get_shard_layout(&state_witness.epoch_id)?;
let witness_chunk_shard_id = state_witness.chunk_header.shard_id();
let witness_shard_layout = epoch_manager.get_shard_layout(&state_witness.inner.epoch_id)?;
let witness_chunk_shard_id = state_witness.inner.chunk_header.shard_id();
let witness_chunk_shard_uid =
epoch_manager.shard_id_to_uid(witness_chunk_shard_id, &state_witness.epoch_id)?;
epoch_manager.shard_id_to_uid(witness_chunk_shard_id, &state_witness.inner.epoch_id)?;
let block_hash = pre_validation_output.main_transition_params.block_hash();
let epoch_id = epoch_manager.get_epoch_id(&block_hash)?;
let shard_id = pre_validation_output.main_transition_params.shard_id();
Expand Down Expand Up @@ -634,14 +634,14 @@ pub fn validate_chunk_state_witness(
}
(_, Some(result)) => (result.chunk_extra, result.outgoing_receipts),
};
if chunk_extra.state_root() != &state_witness.main_state_transition.post_state_root {
if chunk_extra.state_root() != &state_witness.inner.main_state_transition.post_state_root {
// This is an early check, it's not for correctness, only for better
// error reporting in case of an invalid state witness due to a bug.
// Only the final state root check against the chunk header is required.
return Err(Error::InvalidChunkStateWitness(format!(
"Post state root {:?} for main transition does not match expected post state root {:?}",
chunk_extra.state_root(),
state_witness.main_state_transition.post_state_root,
state_witness.inner.main_state_transition.post_state_root,
)));
}

Expand All @@ -654,7 +654,7 @@ pub fn validate_chunk_state_witness(
&mut outgoing_receipts,
protocol_version,
&witness_shard_layout,
state_witness.chunk_header.shard_id(),
state_witness.inner.chunk_header.shard_id(),
shard_id,
)?;
}
Expand All @@ -676,19 +676,19 @@ pub fn validate_chunk_state_witness(
}

if pre_validation_output.implicit_transition_params.len()
!= state_witness.implicit_transitions.len()
!= state_witness.inner.implicit_transitions.len()
{
return Err(Error::InvalidChunkStateWitness(format!(
"Implicit transitions count mismatch. Expected {}, found {}",
pre_validation_output.implicit_transition_params.len(),
state_witness.implicit_transitions.len(),
state_witness.inner.implicit_transitions.len(),
)));
}

for (implicit_transition_params, transition) in pre_validation_output
.implicit_transition_params
.into_iter()
.zip(state_witness.implicit_transitions.into_iter())
.zip(state_witness.inner.implicit_transitions.into_iter())
{
let (shard_uid, new_state_root) = match implicit_transition_params {
ImplicitTransitionParams::ApplyOldChunk(block, shard_uid) => {
Expand Down Expand Up @@ -744,7 +744,7 @@ pub fn validate_chunk_state_witness(
let (outgoing_receipts_root, _) = merklize(&outgoing_receipts_hashes);
validate_chunk_with_chunk_extra_and_receipts_root(
&chunk_extra,
&state_witness.chunk_header,
&state_witness.inner.chunk_header,
&outgoing_receipts_root,
)?;

Expand Down Expand Up @@ -778,9 +778,9 @@ impl Chain {
runtime_adapter: &dyn RuntimeAdapter,
processing_done_tracker: Option<ProcessingDoneTracker>,
) -> Result<(), Error> {
let shard_id = witness.chunk_header.shard_id();
let height_created = witness.chunk_header.height_created();
let chunk_hash = witness.chunk_header.chunk_hash();
let shard_id = witness.inner.chunk_header.shard_id();
let height_created = witness.inner.chunk_header.height_created();
let chunk_hash = witness.inner.chunk_header.chunk_hash();
let parent_span = tracing::debug_span!(
target: "chain", "shadow_validate", ?shard_id, height_created);
let (encoded_witness, raw_witness_size) = {
Expand Down
10 changes: 5 additions & 5 deletions chain/chain/src/stateless_validation/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ fn record_witness_size_metrics_fallible(
encoded_size: usize,
witness: &ChunkStateWitness,
) -> Result<(), std::io::Error> {
let shard_id = witness.chunk_header.shard_id().to_string();
let shard_id = witness.inner.chunk_header.shard_id().to_string();
CHUNK_STATE_WITNESS_RAW_SIZE
.with_label_values(&[shard_id.as_str()])
.observe(decoded_size as f64);
Expand All @@ -188,16 +188,16 @@ fn record_witness_size_metrics_fallible(
.observe(encoded_size as f64);
CHUNK_STATE_WITNESS_MAIN_STATE_TRANSISTION_SIZE
.with_label_values(&[shard_id.as_str()])
.observe(borsh::to_vec(&witness.main_state_transition)?.len() as f64);
.observe(borsh::to_vec(&witness.inner.main_state_transition)?.len() as f64);
CHUNK_STATE_WITNESS_NEW_TRANSACTIONS_SIZE
.with_label_values(&[&shard_id.as_str()])
.observe(borsh::to_vec(&witness.new_transactions)?.len() as f64);
.observe(borsh::to_vec(&witness.inner.new_transactions)?.len() as f64);
CHUNK_STATE_WITNESS_NEW_TRANSACTIONS_STATE_SIZE
.with_label_values(&[&shard_id.as_str()])
.observe(borsh::to_vec(&witness.new_transactions_validation_state)?.len() as f64);
.observe(borsh::to_vec(&witness.inner.new_transactions_validation_state)?.len() as f64);
CHUNK_STATE_WITNESS_SOURCE_RECEIPT_PROOFS_SIZE
.with_label_values(&[&shard_id.as_str()])
.observe(borsh::to_vec(&witness.source_receipt_proofs)?.len() as f64);
.observe(borsh::to_vec(&witness.inner.source_receipt_proofs)?.len() as f64);
Ok(())
}

Expand Down
12 changes: 6 additions & 6 deletions chain/chain/src/store/latest_witnesses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ impl ChainStore {
let _span = tracing::info_span!(
target: "client",
"save_latest_chunk_state_witness",
witness_height = witness.chunk_header.height_created(),
witness_shard = ?witness.chunk_header.shard_id(),
witness_height = witness.inner.chunk_header.height_created(),
witness_shard = ?witness.inner.chunk_header.shard_id(),
)
.entered();

Expand Down Expand Up @@ -172,9 +172,9 @@ impl ChainStore {
let mut random_uuid = [0u8; 16];
OsRng.fill_bytes(&mut random_uuid);
let key = LatestWitnessesKey {
height: witness.chunk_header.height_created(),
shard_id: witness.chunk_header.shard_id().into(),
epoch_id: witness.epoch_id,
height: witness.inner.chunk_header.height_created(),
shard_id: witness.inner.chunk_header.shard_id().into(),
epoch_id: witness.inner.epoch_id,
witness_size: serialized_witness_size,
random_uuid,
};
Expand All @@ -195,7 +195,7 @@ impl ChainStore {

let store_commit_time = start_time.elapsed().saturating_sub(store_update_time);

let shard_id_str = witness.chunk_header.shard_id().to_string();
let shard_id_str = witness.inner.chunk_header.shard_id().to_string();
stateless_validation::metrics::SAVE_LATEST_WITNESS_GENERATE_UPDATE_TIME
.with_label_values(&[shard_id_str.as_str()])
.observe(store_update_time.as_secs_f64());
Expand Down
10 changes: 10 additions & 0 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,16 @@ pub(crate) static PARTIAL_WITNESS_ENCODE_TIME: LazyLock<HistogramVec> = LazyLock
.unwrap()
});

pub(crate) static PARTIAL_WITNESS_DECODE_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
"near_partial_witness_decode_time",
"State witness decoding time from the partial state witness parts in seconds",
&["shard_id"],
Some(linear_buckets(0.0, 0.005, 20).unwrap()),
)
.unwrap()
});

pub(crate) static PARTIAL_WITNESS_TIME_TO_LAST_PART: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
"near_partial_witness_time_to_last_part",
Expand Down
24 changes: 12 additions & 12 deletions chain/client/src/stateless_validation/chunk_validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ impl ChunkValidator {
processing_done_tracker: Option<ProcessingDoneTracker>,
signer: &Arc<ValidatorSigner>,
) -> Result<(), Error> {
let prev_block_hash = state_witness.chunk_header.prev_block_hash();
let shard_id = state_witness.chunk_header.shard_id();
let prev_block_hash = state_witness.inner.chunk_header.prev_block_hash();
let shard_id = state_witness.inner.chunk_header.shard_id();
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(prev_block_hash)?;
if epoch_id != state_witness.epoch_id {
if epoch_id != state_witness.inner.epoch_id {
return Err(Error::InvalidChunkStateWitness(format!(
"Invalid EpochId {:?} for previous block {}, expected {:?}",
state_witness.epoch_id, prev_block_hash, epoch_id
state_witness.inner.epoch_id, prev_block_hash, epoch_id
)));
}

Expand All @@ -92,7 +92,7 @@ impl ChunkValidator {
self.runtime_adapter.as_ref(),
)?;

let chunk_header = state_witness.chunk_header.clone();
let chunk_header = state_witness.inner.chunk_header.clone();
let network_sender = self.network_sender.clone();
let epoch_manager = self.epoch_manager.clone();

Expand Down Expand Up @@ -230,8 +230,8 @@ impl Client {
) -> Result<(), Error> {
tracing::debug!(
target: "client",
chunk_hash=?witness.chunk_header.chunk_hash(),
shard_id=?witness.chunk_header.shard_id(),
chunk_hash=?witness.inner.chunk_header.chunk_hash(),
shard_id=?witness.inner.chunk_header.shard_id(),
"process_chunk_state_witness",
);

Expand All @@ -252,7 +252,7 @@ impl Client {
self.chain.chain_store.save_latest_chunk_state_witness(&witness)?;
}

match self.chain.get_block(witness.chunk_header.prev_block_hash()) {
match self.chain.get_block(witness.inner.chunk_header.prev_block_hash()) {
Ok(block) => self.process_chunk_state_witness_with_prev_block(
witness,
&block,
Expand All @@ -273,7 +273,7 @@ impl Client {
// produced the witness. However some tests bypass PartialWitnessActor, thus when a chunk producer
// receives its own state witness, we log a warning instead of panicking.
// TODO: Make sure all tests run with "test_features" and panic for non-test builds.
if signer.validator_id() == &witness.chunk_producer {
if signer.validator_id() == &witness.inner.chunk_producer {
tracing::warn!(
"Validator {:?} received state witness from itself. Witness={:?}",
signer.validator_id(),
Expand All @@ -283,7 +283,7 @@ impl Client {
}
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ChunkStateWitnessAck(
witness.chunk_producer.clone(),
witness.inner.chunk_producer.clone(),
ChunkStateWitnessAck::new(witness),
),
));
Expand All @@ -296,10 +296,10 @@ impl Client {
processing_done_tracker: Option<ProcessingDoneTracker>,
signer: &Arc<ValidatorSigner>,
) -> Result<(), Error> {
if witness.chunk_header.prev_block_hash() != prev_block.hash() {
if witness.inner.chunk_header.prev_block_hash() != prev_block.hash() {
return Err(Error::Other(format!(
"process_chunk_state_witness_with_prev_block - prev_block doesn't match ({} != {})",
witness.chunk_header.prev_block_hash(),
witness.inner.chunk_header.prev_block_hash(),
prev_block.hash()
)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Client {
witness: ChunkStateWitness,
witness_size: usize,
) -> Result<HandleOrphanWitnessOutcome, Error> {
let chunk_header = &witness.chunk_header;
let chunk_header = &witness.inner.chunk_header;
let witness_height = chunk_header.height_created();
let witness_shard = chunk_header.shard_id();

Expand Down Expand Up @@ -83,7 +83,7 @@ impl Client {
.orphan_witness_pool
.take_state_witnesses_waiting_for_block(new_block.hash());
for witness in ready_witnesses {
let header = &witness.chunk_header;
let header = &witness.inner.chunk_header;
tracing::debug!(
target: "client",
witness_height = header.height_created(),
Expand Down
Loading

0 comments on commit 35247ff

Please sign in to comment.