Skip to content

Commit

Permalink
[stateless_validation] Include chunk_endorsement signatures in block …
Browse files Browse the repository at this point in the history
…body (#10469)

## Chunk endorsements signatures in block

This PR writes the logic to include chunk endorsement signatures in
block. We tap into the logic in chunk_inclusion_tracker to filter out
chunks that don't have enough endorsed signatures.

In case of missing chunk, just like we borrow chunk headers from
previous block, we also borrow the chunk endorsement signatures
corresponding to the chunk from the previous block.

## Changes to ChunkInclusionTracker

Updated the ChunkInclusionTracker module to have an LRU cache for
`prev_block_to_chunk_headers_ready_for_inclusion` + HashMap for chunk
hash to random info associated with the chunk.

This is a two step indirection, and we handle cleaning up the
`chunk_hash_to_chunk_info` during cache eviction.

Additionally, call to `get_chunk_headers_ready_for_inclusion` now
returns just the chunk hash instead of all the random details for the
chunk and we can make subsequent calls to chunk_inclusion_tracker to get
the other chunk related info like header, signatures, time included etc.

Added filter
`chunk_validator.get_chunk_endorsement_signature(&chunk_info.chunk_header)`
to reject chunks that don't have enough chunk endorsements. Note that
this is behind a feature flag.

## Getting chunk endorsement signatures

Added `get_chunk_endorsement_signature` function to chunk validator
module. We use the cached chunk endorsements stored in calls to
`process_chunk_endorsement` to get the set of signatures. If we don't
have enough stake endorsed, we return None, else we return the signature
array.

TODO: Update/fix tests.
  • Loading branch information
Shreyan Gupta authored Jan 24, 2024
1 parent 1367881 commit 3b3b54b
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 84 deletions.
52 changes: 28 additions & 24 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use near_epoch_manager::types::BlockHeaderInfo;
use near_epoch_manager::EpochManagerAdapter;
use near_o11y::log_assert;
use near_primitives::block::{genesis_chunks, Block, BlockValidityError, Tip};
use near_primitives::block_body::ChunkEndorsementSignatures;
use near_primitives::block_header::BlockHeader;
use near_primitives::challenge::{
BlockDoubleSign, Challenge, ChallengeBody, ChallengesResult, ChunkProofs, ChunkState,
Expand Down Expand Up @@ -1029,15 +1030,6 @@ impl Chain {
}
}

// Check that block has chunk endorsements.
if checked_feature!("stable", ChunkValidation, epoch_protocol_version) {
// TODO(shreyan): Enable this in next PR once we start adding chunk endorsements.
// if block.chunk_endorsements().is_empty() {
// tracing::warn!("Block has no chunk endorsements: {:?}", block.hash());
// return Ok(VerifyBlockHashAndSignatureResult::Incorrect);
// }
}

// Verify the signature. Since the signature is signed on the hash of block header, this check
// makes sure the block header content is not tampered
if !self.epoch_manager.verify_header_signature(block.header())? {
Expand Down Expand Up @@ -1101,8 +1093,10 @@ impl Chain {

/// Do basic validation of the information that we can get from the chunk headers in `block`
fn validate_chunk_headers(&self, block: &Block, prev_block: &Block) -> Result<(), Error> {
let prev_chunk_headers =
Chain::get_prev_chunk_headers(self.epoch_manager.as_ref(), prev_block)?;
let (prev_chunk_headers, _) = Chain::get_prev_chunk_headers_and_chunk_endorsements(
self.epoch_manager.as_ref(),
prev_block,
)?;
for (chunk_header, prev_chunk_header) in
block.chunks().iter().zip(prev_chunk_headers.iter())
{
Expand Down Expand Up @@ -1887,7 +1881,7 @@ impl Chain {
/// Preprocess a block before applying chunks, verify that we have the necessary information
/// to process the block an the block is valid.
/// Note that this function does NOT introduce any changes to chain state.
pub(crate) fn preprocess_block(
fn preprocess_block(
&self,
me: &Option<AccountId>,
block: &MaybeValidated<Block>,
Expand Down Expand Up @@ -3264,8 +3258,10 @@ impl Chain {
invalid_chunks: &mut Vec<ShardChunkHeader>,
) -> Result<Vec<UpdateShardJob>, Error> {
let _span = tracing::debug_span!(target: "chain", "apply_chunks_preprocessing").entered();
let prev_chunk_headers =
Chain::get_prev_chunk_headers(self.epoch_manager.as_ref(), prev_block)?;
let (prev_chunk_headers, _) = Chain::get_prev_chunk_headers_and_chunk_endorsements(
self.epoch_manager.as_ref(),
prev_block,
)?;

let mut maybe_jobs = vec![];
for (shard_id, (chunk_header, prev_chunk_header)) in
Expand Down Expand Up @@ -4140,29 +4136,37 @@ impl Chain {
Ok(headers)
}

/// Returns a vector of chunk headers, each of which corresponds to the previous chunk of
/// a chunk in the block after `prev_block`
/// Returns a vector of chunk headers, and vector of chunk endorsement signatures each of which corresponds
/// to the chunk in the `prev_block`
/// This function is important when the block after `prev_block` has different number of chunks
/// from `prev_block`.
/// from `prev_block` in cases of resharding.
/// In block production and processing, often we need to get the previous chunks of chunks
/// in the current block, this function provides a way to do so while handling sharding changes
/// correctly.
/// For example, if `prev_block` has two shards 0, 1 and the block after `prev_block` will have
/// 4 shards 0, 1, 2, 3, 0 and 1 split from shard 0 and 2 and 3 split from shard 1.
/// `get_prev_chunks(runtime_adapter, prev_block)` will return
/// `get_prev_chunk_headers_and_chunk_endorsements(epoch_manager, prev_block)` will return
/// `[prev_block.chunks()[0], prev_block.chunks()[0], prev_block.chunks()[1], prev_block.chunks()[1]]`
pub fn get_prev_chunk_headers(
/// and the corresponding chunk endorsements.
pub fn get_prev_chunk_headers_and_chunk_endorsements(
epoch_manager: &dyn EpochManagerAdapter,
prev_block: &Block,
) -> Result<Vec<ShardChunkHeader>, Error> {
) -> Result<(Vec<ShardChunkHeader>, Vec<ChunkEndorsementSignatures>), Error> {
let epoch_id = epoch_manager.get_epoch_id_from_prev_block(prev_block.hash())?;
let shard_ids = epoch_manager.shard_ids(&epoch_id)?;
let prev_shard_ids = epoch_manager.get_prev_shard_ids(prev_block.hash(), shard_ids)?;
let chunks = prev_block.chunks();
Ok(prev_shard_ids
.into_iter()
.map(|shard_id| chunks.get(shard_id as usize).unwrap().clone())
.collect())
let chunk_endorsements = prev_block.chunk_endorsements();

let chunk_headers = prev_shard_ids
.iter()
.map(|shard_id| chunks.get(*shard_id as usize).cloned().unwrap())
.collect();
let chunk_endorsement = prev_shard_ids
.iter()
.map(|shard_id| chunk_endorsements.get(*shard_id as usize).cloned().unwrap_or_default())
.collect();
Ok((chunk_headers, chunk_endorsement))
}

pub fn get_prev_chunk_header(
Expand Down
179 changes: 140 additions & 39 deletions chain/client/src/chunk_inclusion_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,40 @@
use chrono::{DateTime, Utc};
use itertools::Itertools;
use lru::LruCache;
use std::collections::HashMap;

use near_chain_primitives::Error;
use near_primitives::block_body::ChunkEndorsementSignatures;
use near_primitives::hash::CryptoHash;
use near_primitives::sharding::ShardChunkHeader;
use near_primitives::sharding::{ChunkHash, ShardChunkHeader};
use near_primitives::types::{AccountId, EpochId, ShardId};

use crate::chunk_validation::ChunkValidator;
use crate::metrics;

const CHUNK_HEADERS_FOR_INCLUSION_CACHE_SIZE: usize = 2048;
const NUM_EPOCH_CHUNK_PRODUCERS_TO_KEEP_IN_BLOCKLIST: usize = 1000;

// chunk_header, received_time and chunk_producer are populated when we call mark_chunk_header_ready_for_inclusion
// signatures is populated later during call to prepare_chunk_headers_ready_for_inclusion
struct ChunkInfo {
pub chunk_header: ShardChunkHeader,
pub received_time: DateTime<Utc>,
pub chunk_producer: AccountId,
pub signatures: Option<ChunkEndorsementSignatures>,
}

pub struct ChunkInclusionTracker {
// Track chunks that are ready to be included in a block.
// Key is the previous_block_hash as the chunk is created based on this block. It's possible that
// the block included isn't of height previous_block_height + 1 in cases of skipped blocks etc.
// We store the map of chunks from [shard_id] to (chunk_header, chunk received_time, chunk_producer account_id)
prev_block_to_chunk_headers_ready_for_inclusion: LruCache<
CryptoHash,
HashMap<ShardId, (ShardChunkHeader, chrono::DateTime<chrono::Utc>, AccountId)>,
>,
// We store the map of chunks from [shard_id] to chunk_hash
prev_block_to_chunk_hash_ready: LruCache<CryptoHash, HashMap<ShardId, ChunkHash>>,

// Map from chunk_hash to chunk_info.
// ChunkInfo stores the chunk_header, received_time, chunk_producer and chunk endorsements.
// Cleaning up of chunk_hash_to_chunk_info is handled during cache eviction from prev_block_to_chunk_hash_ready.
chunk_hash_to_chunk_info: HashMap<ChunkHash, ChunkInfo>,

// Track banned chunk producers for a given epoch. We filter out chunks produced by them.
banned_chunk_producers: LruCache<(EpochId, AccountId), ()>,
Expand All @@ -28,9 +43,8 @@ pub struct ChunkInclusionTracker {
impl ChunkInclusionTracker {
pub fn new() -> Self {
Self {
prev_block_to_chunk_headers_ready_for_inclusion: LruCache::new(
CHUNK_HEADERS_FOR_INCLUSION_CACHE_SIZE,
),
prev_block_to_chunk_hash_ready: LruCache::new(CHUNK_HEADERS_FOR_INCLUSION_CACHE_SIZE),
chunk_hash_to_chunk_info: HashMap::new(),
banned_chunk_producers: LruCache::new(NUM_EPOCH_CHUNK_PRODUCERS_TO_KEEP_IN_BLOCKLIST),
}
}
Expand All @@ -42,12 +56,31 @@ impl ChunkInclusionTracker {
chunk_producer: AccountId,
) {
let prev_block_hash = chunk_header.prev_block_hash();
self.prev_block_to_chunk_headers_ready_for_inclusion
.get_or_insert(*prev_block_hash, || HashMap::new());
self.prev_block_to_chunk_headers_ready_for_inclusion
.get_mut(prev_block_hash)
.unwrap()
.insert(chunk_header.shard_id(), (chunk_header, chrono::Utc::now(), chunk_producer));
if let Some(entry) = self.prev_block_to_chunk_hash_ready.get_mut(prev_block_hash) {
// If prev_block_hash entry exists, add the new chunk to the entry.
entry.insert(chunk_header.shard_id(), chunk_header.chunk_hash());
} else {
let new_entry = HashMap::from([(chunk_header.shard_id(), chunk_header.chunk_hash())]);
// Call to prev_block_to_chunk_hash_ready.push might evict an entry from LRU cache.
// In case of an eviction, cleanup entries in chunk_hash_to_chunk_info
let maybe_evicted_entry =
self.prev_block_to_chunk_hash_ready.push(*prev_block_hash, new_entry);
if let Some((_, evicted_entry)) = maybe_evicted_entry {
self.process_evicted_entry(evicted_entry);
}
}
// Insert chunk info in chunk_hash_to_chunk_info. This would be cleaned up later during eviction
let chunk_hash = chunk_header.chunk_hash();
let chunk_info =
ChunkInfo { chunk_header, received_time: Utc::now(), chunk_producer, signatures: None };
self.chunk_hash_to_chunk_info.insert(chunk_hash, chunk_info);
}

// once a set of ChunkHash is evicted from prev_block_to_chunk_hash_ready, cleanup chunk_hash_to_chunk_info
fn process_evicted_entry(&mut self, evicted_entry: HashMap<ShardId, ChunkHash>) {
for (_, chunk_hash) in evicted_entry.into_iter() {
self.chunk_hash_to_chunk_info.remove(&chunk_hash);
}
}

/// Add account_id to the list of banned chunk producers for the given epoch.
Expand All @@ -56,35 +89,78 @@ impl ChunkInclusionTracker {
self.banned_chunk_producers.put((epoch_id, account_id), ());
}

/// Update signatures in chunk_info
pub fn prepare_chunk_headers_ready_for_inclusion(
&mut self,
prev_block_hash: &CryptoHash,
chunk_validator: &mut ChunkValidator,
) -> Result<(), Error> {
let Some(entry) = self.prev_block_to_chunk_hash_ready.get(prev_block_hash) else {
return Ok(());
};

for chunk_hash in entry.values() {
let chunk_info = self.chunk_hash_to_chunk_info.get_mut(chunk_hash).unwrap();
chunk_info.signatures =
chunk_validator.get_chunk_endorsement_signatures(&chunk_info.chunk_header)?;
}
Ok(())
}

fn is_banned(&self, epoch_id: &EpochId, chunk_info: &ChunkInfo) -> bool {
let banned = self
.banned_chunk_producers
.contains(&(epoch_id.clone(), chunk_info.chunk_producer.clone()));
if banned {
tracing::warn!(
target: "client",
chunk_hash = ?chunk_info.chunk_header.chunk_hash(),
chunk_producer = ?chunk_info.chunk_producer,
"Not including chunk from a banned validator");
metrics::CHUNK_DROPPED_BECAUSE_OF_BANNED_CHUNK_PRODUCER.inc();
}
banned
}

fn has_chunk_endorsements(&self, chunk_info: &ChunkInfo) -> bool {
let has_chunk_endorsements = chunk_info.signatures.is_some();
if !has_chunk_endorsements {
tracing::warn!(
target: "client",
chunk_hash = ?chunk_info.chunk_header.chunk_hash(),
chunk_producer = ?chunk_info.chunk_producer,
"Not including chunk because of insufficient chunk endorsements");
}
// TODO(shreyan): return has_chunk_endorsements here after fixing testing infra
true
}

/// Function to return the chunks that are ready to be included in a block.
/// We filter out the chunks that are produced by banned chunk producers.
/// Return type contains some extra information needed for debug logs and metrics.
/// HashMap from [shard_id] to (chunk_header, chunk received_time, chunk_producer account_id)
/// We filter out the chunks that are produced by banned chunk producers or have insufficient
/// chunk validator endorsements.
/// Return HashMap from [shard_id] -> chunk_hash
pub fn get_chunk_headers_ready_for_inclusion(
&self,
epoch_id: &EpochId,
prev_block_hash: &CryptoHash,
) -> HashMap<ShardId, (ShardChunkHeader, chrono::DateTime<chrono::Utc>, AccountId)> {
self.prev_block_to_chunk_headers_ready_for_inclusion
.peek(prev_block_hash)
.cloned()
.unwrap_or_default()
.into_iter()
.filter(|(_, (chunk_header, _, chunk_producer))| {
let banned = self
.banned_chunk_producers
.contains(&(epoch_id.clone(), chunk_producer.clone()));
if banned {
tracing::warn!(
target: "client",
chunk_hash = ?chunk_header.chunk_hash(),
?chunk_producer,
"Not including chunk from a banned validator");
metrics::CHUNK_DROPPED_BECAUSE_OF_BANNED_CHUNK_PRODUCER.inc();
}
!banned
})
.collect()
) -> HashMap<ShardId, ChunkHash> {
let Some(entry) = self.prev_block_to_chunk_hash_ready.peek(prev_block_hash) else {
return HashMap::new();
};

let mut chunk_headers_ready_for_inclusion = HashMap::new();
for (shard_id, chunk_hash) in entry {
let chunk_info = self.chunk_hash_to_chunk_info.get(chunk_hash).unwrap();
let banned = self.is_banned(epoch_id, &chunk_info);
let has_chunk_endorsements = self.has_chunk_endorsements(&chunk_info);
if !banned && has_chunk_endorsements {
// only add to chunk_headers_ready_for_inclusion if chunk is not from a banned chunk producer
// and chunk has sufficient chunk endorsements.
// Chunk endorsements are got as part of call to prepare_chunk_headers_ready_for_inclusion
chunk_headers_ready_for_inclusion.insert(*shard_id, chunk_hash.clone());
}
}
chunk_headers_ready_for_inclusion
}

pub fn num_chunk_headers_ready_for_inclusion(
Expand All @@ -102,4 +178,29 @@ impl ChunkInclusionTracker {
}
banned_chunk_producers.into_iter().collect_vec()
}

fn get_chunk_info(&self, chunk_hash: &ChunkHash) -> Result<&ChunkInfo, Error> {
// It should never happen that we are missing the key in chunk_hash_to_chunk_info
self.chunk_hash_to_chunk_info
.get(chunk_hash)
.ok_or(Error::Other(format!("missing key {:?} in ChunkInclusionTracker", chunk_hash)))
}

pub fn get_chunk_header_and_endorsements(
&self,
chunk_hash: &ChunkHash,
) -> Result<(ShardChunkHeader, ChunkEndorsementSignatures), Error> {
let chunk_info = self.get_chunk_info(chunk_hash)?;
let chunk_header = chunk_info.chunk_header.clone();
let signatures = chunk_info.signatures.clone().unwrap_or_default();
Ok((chunk_header, signatures))
}

pub fn get_chunk_producer_and_received_time(
&self,
chunk_hash: &ChunkHash,
) -> Result<(AccountId, DateTime<Utc>), Error> {
let chunk_info = self.get_chunk_info(chunk_hash)?;
Ok((chunk_info.chunk_producer.clone(), chunk_info.received_time))
}
}
Loading

0 comments on commit 3b3b54b

Please sign in to comment.