Skip to content

Commit

Permalink
Batch queries by chain when synchronizing certificates from validator. (
Browse files Browse the repository at this point in the history
linera-io#2666)

* Fetch batch of certificates in synchronize_received_certificates_from_validator

* Process through block batch by looking at local node first.

* Change MultichainTracker to track highest seen per chain.

* @afck review

* Comments
  • Loading branch information
deuszx authored Oct 22, 2024
1 parent 5489d9c commit fb2f26e
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 55 deletions.
213 changes: 158 additions & 55 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ use linera_base::{
};
use linera_chain::{
data_types::{
Block, BlockProposal, Certificate, CertificateValue, ExecutedBlock, HashedCertificateValue,
IncomingBundle, LiteCertificate, LiteVote, MessageAction, PostedMessage,
Block, BlockProposal, Certificate, CertificateValue, ChainAndHeight, ExecutedBlock,
HashedCertificateValue, IncomingBundle, LiteCertificate, LiteVote, MessageAction,
PostedMessage,
},
manager::ChainManagerInfo,
ChainError, ChainExecutionContext, ChainStateView,
Expand Down Expand Up @@ -425,6 +426,54 @@ where
}
}

/// Tracks which `(ChainId, BlockHeight)` pairs have been successfully downloaded.
struct MultichainTracker {
// Starting tracker index.
tracker: u64,
// Tracks the highest block height processed for each chain.
highest_seen: BTreeMap<ChainId, BlockHeight>,
}

impl MultichainTracker {
fn new(init: u64) -> Self {
Self {
tracker: init,
highest_seen: BTreeMap::new(),
}
}

/// Pushes a new `(ChainId, BlockHeight)` pair to the tracker.
/// Replaces previous entry if the new height is higher.
fn push(&mut self, chain_id: ChainId, height: BlockHeight) {
self.highest_seen
.entry(chain_id)
.and_modify(|h| *h = std::cmp::max(*h, height))
.or_insert(height);
}

/// Returns an index of the last processed certificate in the original remote_node_received_log.
///
/// Goes through the original `remote_node_received_log` and checks which of the certificates have been received.
/// Upon finding the first entry that has not been processed, it stops and returns the index of the last processed certificate.
fn finalize<'a>(
&mut self,
remote_node_received_log: impl IntoIterator<Item = &'a ChainAndHeight>,
) -> u64 {
for ChainAndHeight { chain_id, height } in remote_node_received_log {
if let Some(highest_processed) = self.highest_seen.get(chain_id) {
if height <= highest_processed {
self.tracker += 1;
} else {
break;
}
} else {
break;
}
}
self.tracker
}
}

impl<P, S> std::fmt::Debug for ChainClient<P, S>
where
S: linera_storage::Storage,
Expand Down Expand Up @@ -1136,79 +1185,133 @@ where
// Retrieve newly received certificates from this validator.
let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(tracker);
let info = remote_node.handle_chain_info_query(query).await?;
let mut tracker = MultichainTracker::new(tracker);
let remote_log = info.requested_received_log.clone();
let remote_node_chains_view = info.requested_received_log.into_iter().fold(
BTreeMap::<ChainId, Vec<BlockHeight>>::new(),
|mut chain_to_info, entry| {
chain_to_info
.entry(entry.chain_id)
.or_default()
.push(entry.height);
chain_to_info
},
);

// Collect all certificates that have been successfully processed.
let mut certificates: Vec<Certificate> = Vec::new();
let mut new_tracker = tracker;

for entry in info.requested_received_log {
let query = ChainInfoQuery::new(entry.chain_id)
.with_sent_certificate_hashes_in_range(BlockHeightRange::single(entry.height));
for (chain_id, mut block_batch) in remote_node_chains_view {
block_batch.sort();

let local_response = self
.client
.local_node
.handle_chain_info_query(query.clone())
.await
.map_err(|error| NodeError::LocalNodeQuery {
error: error.to_string(),
})?
.info;
if !local_response.requested_sent_certificate_hashes.is_empty() {
// We've already processed incoming messages for this certificate.
new_tracker += 1;
self.advance_with_local(chain_id, &mut block_batch, &mut tracker)
.await?;

let Some(first_block) = block_batch.first() else {
// `advance_with_local` might have drained the whole `block_batch`.
// In that case, move to the next chain batch.
continue;
}
};
let batch_size = block_batch.last().unwrap().saturating_sub(*first_block).0 + 1; // safe to unwrap because we checked that the vec is not empty.
let block_batch_range = BlockHeightRange::multi(*first_block, batch_size);
let query = ChainInfoQuery::new(chain_id)
.with_sent_certificate_hashes_in_range(block_batch_range.clone());

let remote_response = remote_node.handle_chain_info_query(query).await?;
let certificate_hash =

let certificate_hashes =
match remote_response.requested_sent_certificate_hashes.as_slice() {
&[hash] => hash,
[] => {
warn!("Validator didn't have certificate he claimed to have.");
break;
}
_ => {
error!("Validator sent more than one certificate hash for a single block.");
hashes if hashes.len() as u64 != block_batch_range.limit.unwrap() => {
error!(
block_range = ?block_batch_range,
received_num = hashes.len(),
"Validator sent invalid number of certificate hashes."
);
return Err(NodeError::InvalidChainInfoResponse);
}
hashes => hashes.to_vec(),
};

let certificate = remote_node
let remote_certificates = remote_node
.node
.download_certificate(certificate_hash)
.download_certificates(certificate_hashes)
.await?;

match self
.check_certificate(max_epoch, &committees, &certificate)
.await?
{
HandleCertificateResult::FutureEpoch => {
warn!("Postponing received certificate from {:.8} at height {} from future epoch {}",
entry.chain_id, entry.height, certificate.value().epoch());
// Stop the synchronization here. Do not increment the tracker further so
// that this certificate can still be downloaded later, once our committee
// is updated.
break;
}
HandleCertificateResult::OldEpoch => {
// This epoch is not recognized any more. Let's skip the certificate.
// If a higher block with a recognized epoch comes up later from the
// same chain, the call to `receive_certificate` below will download
// the skipped certificate again.
warn!(
"Skipping received certificate from past epoch {:?}",
certificate.value().epoch()
);
new_tracker += 1;
}
HandleCertificateResult::New => {
certificates.push(certificate);
new_tracker += 1;
for certificate in remote_certificates {
match self
.check_certificate(max_epoch, &committees, &certificate)
.await?
{
HandleCertificateResult::FutureEpoch => {
warn!("Postponing received certificate from {:.8} at height {} from future epoch {}",
chain_id, certificate.value().height(), certificate.value().epoch());
// Stop the synchronization here. Do not increment the tracker further so
// that this certificate can still be downloaded later, once our committee
// is updated.
}
HandleCertificateResult::OldEpoch => {
tracker.push(chain_id, certificate.value().height());
// This epoch is not recognized any more. Let's skip the certificate.
// If a higher block with a recognized epoch comes up later from the
// same chain, the call to `receive_certificate` below will download
// the skipped certificate again.
warn!(
"Skipping received certificate from past epoch {:?}",
certificate.value().epoch()
);
}
HandleCertificateResult::New => {
tracker.push(chain_id, certificate.value().height());
certificates.push(certificate);
}
}
}
}
let new_tracker = tracker.finalize(&remote_log);
Ok((remote_node.name, new_tracker, certificates))
}

/// Uses local information (about already-processed blocks) to advance the `block_batch` to a place where only new blocks are left.
async fn advance_with_local(
&self,
chain_id: ChainId,
block_batch: &mut Vec<BlockHeight>,
tracker: &mut MultichainTracker,
) -> Result<(), NodeError> {
let local_response = self.local_query(ChainInfoQuery::new(chain_id)).await?;
let local_next = local_response.info.next_block_height;

// Record highest block height known locally for this chain.
if let Ok(highest_seen) = local_next.try_sub_one() {
tracker.push(chain_id, highest_seen);
}

// Find the first block in the batch that is higher than the highest block known locally.
match block_batch.iter().position(|b| b >= &local_next) {
None => {
// Our highest, locally-known block is higher than any block height from the current batch.
// Move to the next chain batch.
block_batch.clear();
}
Some(index) => {
// Blocks with height lower than block_batch[index] are already known.
// Skip processing them.
let _ = block_batch.drain(0..index);
}
}
Ok(())
}

async fn local_query(&self, query: ChainInfoQuery) -> Result<ChainInfoResponse, NodeError> {
self.client
.local_node
.handle_chain_info_query(query)
.await
.map_err(|error| NodeError::LocalNodeQuery {
error: error.to_string(),
})
}

async fn check_certificate(
&self,
highest_known_epoch: Epoch,
Expand Down
14 changes: 14 additions & 0 deletions linera-core/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ impl BlockHeightRange {
let limit = Some(1);
BlockHeightRange { start, limit }
}

/// Creates a range starting at the specified block height and containing up to `limit` elements.
pub fn multi(start: BlockHeight, limit: u64) -> BlockHeightRange {
BlockHeightRange {
start,
limit: Some(limit),
}
}

/// Returns the highest block height in the range.
pub fn highest(&self) -> BlockHeight {
self.limit
.map_or(self.start, |limit| BlockHeight(self.start.0 + limit - 1))
}
}

/// Request information about a chain.
Expand Down

0 comments on commit fb2f26e

Please sign in to comment.