Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

part distribution and block processing #13146

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions chain/chain-primitives/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ pub enum Error {
/// Chunks missing with header info.
#[error("Chunks Missing: {0:?}")]
ChunksMissing(Vec<ShardChunkHeader>),
/// Some chunks are missing but block can be partially processed.
/// Contains information about which chunks are missing.
#[error("Some Chunks Missing (partial processing allowed): {0:?}")]
SomeChunksMissing(Vec<ShardChunkHeader>),
/// Block time is before parent block time.
#[error("Invalid Block Time: block time {1} before previous {0}")]
InvalidBlockPastTime(Utc, Utc),
Expand Down Expand Up @@ -279,6 +283,7 @@ impl Error {
| Error::Orphan
| Error::ChunkMissing(_)
| Error::ChunksMissing(_)
| Error::SomeChunksMissing(_)
| Error::InvalidChunkHeight
| Error::IOErr(_)
| Error::Other(_)
Expand Down Expand Up @@ -363,6 +368,7 @@ impl Error {
Error::Orphan => "orphan",
Error::ChunkMissing(_) => "chunk_missing",
Error::ChunksMissing(_) => "chunks_missing",
Error::SomeChunksMissing(_) => "some_chunks_missing",
Error::InvalidChunkHeight => "invalid_chunk_height",
Error::IOErr(_) => "io_err",
Error::Other(_) => "other",
Expand Down
125 changes: 121 additions & 4 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,7 @@ impl Chain {
return Ok(());
}
let mut missing = vec![];
let mut critical_missing = vec![];
let block_height = block.header().height();

let epoch_id = block.header().epoch_id();
Expand Down Expand Up @@ -1392,7 +1393,24 @@ impl Chain {
let chunk_hash = chunk_header.chunk_hash();

if let Err(_) = self.chain_store.get_partial_chunk(&chunk_header.chunk_hash()) {
missing.push(chunk_header.clone());
// If this is a shard that this validator is responsible for tracking,
// consider it a critical missing chunk
if self.shard_tracker.cares_about_shard(
me.as_ref(),
&parent_hash,
shard_id,
true,
) || self.shard_tracker.will_care_about_shard(
me.as_ref(),
&parent_hash,
shard_id,
true,
) {
critical_missing.push(chunk_header.clone());
} else {
// For shards we don't track, we can process the block partially
missing.push(chunk_header.clone());
}
} else if self.shard_tracker.cares_about_shard(
me.as_ref(),
&parent_hash,
Expand All @@ -1405,14 +1423,22 @@ impl Chain {
true,
) {
if let Err(_) = self.chain_store.get_chunk(&chunk_hash) {
missing.push(chunk_header.clone());
critical_missing.push(chunk_header.clone());
}
}
}
}

// If there are critical missing chunks, we can't process the block at all
if !critical_missing.is_empty() {
return Err(Error::ChunksMissing(critical_missing));
}

// If there are only non-critical missing chunks, we can process the block partially
if !missing.is_empty() {
return Err(Error::ChunksMissing(missing));
return Err(Error::SomeChunksMissing(missing));
}

Ok(())
}

Expand Down Expand Up @@ -2027,9 +2053,100 @@ impl Chain {
target: "chain",
?block_hash,
chunk_hashes=missing_chunk_hashes.iter().map(|h| format!("{:?}", h)).join(","),
"Process block: missing chunks"
"Process block: critical chunks missing, cannot process"
);
}
Error::SomeChunksMissing(missing_chunks) => {
// This is a case where some chunks are missing but they're not critical
// for this validator, so we can process the block partially
let block_hash = *block.hash();
let missing_chunk_hashes: Vec<_> =
missing_chunks.iter().map(|header| header.chunk_hash()).collect();

// Still track the missing chunks for debugging and metrics
block_processing_artifact.blocks_missing_chunks.push(BlockMissingChunks {
prev_hash: *block.header().prev_hash(),
missing_chunks: missing_chunks.clone(),
});

// Mark that this block has some missing chunks, but we'll process it anyway
self.blocks_delay_tracker.mark_block_has_missing_chunks(block.hash());

info!(
target: "chain",
?block_hash,
chunk_hashes=missing_chunk_hashes.iter().map(|h| format!("{:?}", h)).join(","),
"Process block: non-critical chunks missing, proceeding with partial processing"
);

// Continue with block processing despite missing chunks
// Skip the missing chunks during preprocessing
let state_patch = self.pending_state_patch.take();
let missing_chunk_hashes: std::collections::HashSet<_> =
missing_chunks.iter().map(|header| header.chunk_hash()).collect();

// Use the standard preprocess_block function but filter out missing chunks in apply_chunks_preprocessing
match self.preprocess_block(
me,
&block,
&provenance,
&mut block_processing_artifact.challenges,
&mut block_processing_artifact.invalid_chunks,
block_received_time,
state_patch,
) {
Ok(preprocess_res) => {
let (
apply_chunk_work,
block_preprocess_info,
apply_chunks_still_applying,
) = preprocess_res;

if self
.epoch_manager
.is_next_block_epoch_start(block.header().prev_hash())?
{
// This is the end of the epoch. Next epoch we will generate new state parts. We can drop the old ones.
self.clear_all_downloaded_parts()?;
}

let block_hash = *block.hash();
let block_height = block.header().height();
self.blocks_in_processing.add(block, block_preprocess_info)?;

// Filter out jobs for missing chunks
let filtered_work: Vec<_> = apply_chunk_work
.into_iter()
.filter(|job| {
// Skip chunks that are in the missing_chunks list
!missing_chunk_hashes
.contains(&job.next_chunk_header.chunk_hash())
})
.collect();

// Schedule apply chunks, which will be executed in the rayon thread pool
self.schedule_apply_chunks(
BlockToApply::Normal(block_hash),
block_height,
filtered_work,
apply_chunks_still_applying,
apply_chunks_done_sender,
);

return Ok(());
}
Err(e) => {
// If partial processing fails, we still need to handle the error
warn!(
target: "chain",
?block_hash,
error=?e,
"Failed to partially process block with missing chunks"
);
return Err(e);
}
}
}
Error::EpochOutOfBounds(epoch_id) => {
// Possibly block arrived before we finished processing all of the blocks for epoch before last.
// Or someone is attacking with invalid chain.
Expand Down
28 changes: 27 additions & 1 deletion chain/epoch-manager/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,33 @@ pub trait EpochManagerAdapter: Send + Sync {
fn get_part_owner(&self, epoch_id: &EpochId, part_id: u64) -> Result<AccountId, EpochError> {
let epoch_info = self.get_epoch_info(&epoch_id)?;
let settlement = epoch_info.block_producers_settlement();
let validator_id = settlement[part_id as usize % settlement.len()];

// Calculate total stake to distribute parts proportionally to stake
let mut total_stake = 0;
let mut validator_stakes = Vec::with_capacity(settlement.len());
for &validator_id in settlement.iter() {
let stake = epoch_info.get_validator(validator_id).stake();
total_stake += stake;
validator_stakes.push((validator_id, stake));
}

// Sort validators by stake in descending order to ensure high-stake validators
// get proportionally more parts, preventing targeted attacks
validator_stakes.sort_by(|a, b| b.1.cmp(&a.1));

// Distribute parts based on stake proportion
let mut cumulative_stake = 0;
let target_stake = (part_id as u128 * total_stake) / (self.num_total_parts() as u128);

for (validator_id, stake) in validator_stakes {
cumulative_stake += stake;
if cumulative_stake > target_stake {
return Ok(epoch_info.get_validator(validator_id).account_id().clone());
}
}

// Fallback to the highest stake validator if we somehow didn't find one
let validator_id = validator_stakes.first().map(|(id, _)| *id).unwrap_or(settlement[0]);
Ok(epoch_info.get_validator(validator_id).account_id().clone())
}

Expand Down
1 change: 1 addition & 0 deletions test-loop-tests/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ mod reject_outdated_blocks;
mod resharding_v3;
mod state_sync;
mod syncing;
mod unavailable_chunk_tests;
mod view_requests_to_archival_node;
76 changes: 76 additions & 0 deletions test-loop-tests/src/tests/unavailable_chunk_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#![cfg(feature = "test_features")] // required for adversarial behaviors

use crate::setup::builder::TestLoopBuilder;
use crate::setup::env::TestLoopEnv;
use crate::utils::ONE_NEAR;
use crate::utils::client_queries::ClientQueries;
use near_async::messaging::CanSend as _;
use near_async::time::Duration;
use near_chain_configs::test_genesis::{TestEpochConfigBuilder, ValidatorsSpec};
use near_client::client_actor::{AdvProduceChunksMode, NetworkAdversarialMessage};
use near_primitives::shard_layout::ShardLayout;
use near_primitives::types::AccountId;

/// Test that verifies the chain continues processing when a malicious chunk producer
/// withholds chunk parts strategically to make the chunk unavailable.
///
/// This test simulates a scenario where a malicious chunk producer sends only a subset
/// of chunk parts, making the chunk unavailable for full reconstruction, but the chain
/// should continue processing blocks for shards that are not affected.
#[test]
fn test_producer_withholds_chunk_parts() {
let accounts =
(0..4).map(|i| format!("account{}", i).parse().unwrap()).collect::<Vec<AccountId>>();
let chunk_producer = accounts[0].as_str();
let validators: Vec<_> = accounts[1..].iter().map(|a| a.as_str()).collect();
let validators_spec = ValidatorsSpec::desired_roles(&[chunk_producer], &validators);
let genesis = TestLoopBuilder::new_genesis_builder()
.epoch_length(10)
.shard_layout(ShardLayout::v1_test())
.validators_spec(validators_spec)
.add_user_accounts_simple(&accounts, 1_000_000 * ONE_NEAR)
.genesis_height(10000)
.build();
let epoch_config_store = TestEpochConfigBuilder::build_store_from_genesis(&genesis);
let mut test_loop_env = TestLoopBuilder::new()
.genesis(genesis)
.epoch_config_store(epoch_config_store)
.clients(accounts.clone())
.build()
.warmup();
let TestLoopEnv { test_loop, node_datas, .. } = &mut test_loop_env;

// First, let's make the chunk producer malicious
let chunk_producer = &node_datas[0];
let data_clone = node_datas.clone();
test_loop.send_adhoc_event("set malicious chunk production".into(), move |_| {
data_clone[0].client_sender.send(NetworkAdversarialMessage::AdvProduceChunks(
AdvProduceChunksMode::WithholdChunks,
));
});

// Run the test for a while to see if the chain continues to make progress
test_loop.run_until(
|test_loop_data| {
// Check if all validators except the malicious one continue to make progress
for node in &node_datas[1..] {
let c = &test_loop_data.get(&node.client_sender.actor_handle()).client;
let h = c.chain.head().unwrap().height;
if h <= 10025 {
return false;
}
}
true
},
Duration::seconds(30),
);

// Verify that the chain made progress despite missing chunks
for node in &node_datas[1..] {
let client = &test_loop.data.get(&node.client_sender.actor_handle()).client;
let head = client.chain.head().unwrap();
assert!(head.height > 10025, "Chain should make progress despite missing chunks");
}

test_loop_env.shutdown_and_drain_remaining_events(Duration::seconds(20));
}
Loading