diff --git a/Cargo.lock b/Cargo.lock index 8053aa1090..80d7406924 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1146,6 +1146,7 @@ dependencies = [ "cfg-if", "commonware-broadcast", "commonware-codec", + "commonware-coding", "commonware-cryptography", "commonware-macros", "commonware-p2p", @@ -1157,7 +1158,9 @@ dependencies = [ "governor", "prometheus-client", "rand 0.8.5", + "rand_core 0.6.4", "rand_distr", + "rayon", "thiserror 2.0.12", "tracing", "tracing-subscriber", diff --git a/coding/src/lib.rs b/coding/src/lib.rs index 817a2b09ba..01b2197185 100644 --- a/coding/src/lib.rs +++ b/coding/src/lib.rs @@ -22,7 +22,7 @@ mod no_coding; pub use no_coding::{NoCoding, NoCodingError}; /// Configuration common to all encoding schemes. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Config { /// The minimum number of shards needed to encode the data. pub minimum_shards: u16, @@ -116,13 +116,13 @@ pub trait Scheme: Debug + Clone + Send + Sync + 'static { /// the data. type ReShard: Clone + Eq + Codec + Send + Sync + 'static; /// Data which can assist in checking shards. - type CheckingData: Clone + Send; + type CheckingData: Clone + Send + Sync; /// A shard that has been checked for inclusion in the commitment. /// /// This allows excluding [Scheme::ReShard]s which are invalid, and shouldn't /// be considered as progress towards meeting the minimum number of shards. - type CheckedShard; - type Error: std::fmt::Debug; + type CheckedShard: Clone + Send; + type Error: std::fmt::Debug + Send; /// Encode a piece of data, returning a commitment, along with shards, and proofs. /// diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index b0496c82d0..d05c2b32d8 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -12,6 +12,7 @@ documentation = "https://docs.rs/commonware-consensus" [dependencies] commonware-codec = { workspace = true } +commonware-coding = { workspace = true } commonware-broadcast = { workspace = true } commonware-cryptography = { workspace = true } commonware-resolver = { workspace = true } @@ -20,6 +21,7 @@ bytes = { workspace = true } cfg-if = { workspace = true } thiserror = { workspace = true } futures = { workspace = true } +rand_core = { workspace = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] commonware-macros = { workspace = true } @@ -28,6 +30,7 @@ commonware-runtime = { workspace = true } commonware-storage = { workspace = true, features = ["std"] } prometheus-client = { workspace = true } governor = { workspace = true } +rayon = { workspace = true } rand = { workspace = true } rand_distr = { workspace = true } tracing = { workspace = true } diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index ee58c1f9f8..b3db23f395 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -56,6 +56,8 @@ cfg_if::cfg_if! { use commonware_cryptography::{Digest, PublicKey}; use futures::channel::{oneshot, mpsc}; use std::future::Future; + use rand::Rng; + use commonware_runtime::{Clock, Metrics, Spawner}; pub mod marshal; mod reporter; @@ -102,6 +104,38 @@ cfg_if::cfg_if! { ) -> impl Future> + Send; } + /// Application is the interface responsible for building new blocks on top of consensus-provided parent + /// commitments as well as receiving finalized blocks from marshal. + pub trait Application: Clone + Send + 'static + where + E: Rng + Spawner + Metrics + Clock + { + /// Context is metadata provided by the consensus engine associated with a given payload. + /// + /// This often includes things like the proposer, view number, the height, or the epoch. + type Context: Epochable; + + /// The block type produced by the application's builder. + type Block: Block; + + /// Payload used to initialize the consensus engine. + fn genesis( + &mut self, + epoch: ::Epoch + ) -> impl Future + Send; + + /// Build a new block on top of the provided parent commitment / block. + fn build( + &mut self, + context: E, + parent_commitment: ::Commitment, + parent_block: Self::Block, + ) -> impl Future + Send; + + /// Receive a finalized block from marshal. + fn finalize(&mut self, block: Self::Block) -> impl Future + Send; + } + /// Relay is the interface responsible for broadcasting payloads to the network. /// /// The consensus engine is only aware of a payload's digest, not its contents. It is up diff --git a/consensus/src/marshal/actor.rs b/consensus/src/marshal/actor.rs index 15c03e2af8..c6c76a4400 100644 --- a/consensus/src/marshal/actor.rs +++ b/consensus/src/marshal/actor.rs @@ -9,16 +9,18 @@ use super::{ }, }; use crate::{ - marshal::ingress::mailbox::Identifier as BlockID, + marshal::{ + coding::{self, CodedBlock}, + ingress::mailbox::Identifier as BlockID, + }, threshold_simplex::types::{Finalization, Notarization}, - types::Round, + types::{CodingCommitment, Round}, Block, Reporter, }; -use commonware_broadcast::{buffered, Broadcaster}; use commonware_codec::{Decode, Encode}; -use commonware_cryptography::{bls12381::primitives::variant::Variant, PublicKey}; +use commonware_coding::Scheme; +use commonware_cryptography::{bls12381::primitives::variant::Variant, Committable, PublicKey}; use commonware_macros::select; -use commonware_p2p::Recipients; use commonware_resolver::Resolver; use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage}; use commonware_storage::archive::{immutable, Archive as _, Identifier as ArchiveID}; @@ -57,7 +59,13 @@ struct BlockSubscription { /// finalization for a block that is ahead of its current view, it will request the missing blocks /// from its peers. This ensures that the actor can catch up to the rest of the network if it falls /// behind. -pub struct Actor { +pub struct Actor +where + B: Block, + E: Rng + Spawner + Metrics + Clock + GClock + Storage, + V: Variant, + S: Scheme, +{ // ---------- Context ---------- context: ContextCell, @@ -89,11 +97,11 @@ pub struct Actor, + cache: cache::Manager, V>, // Finalizations stored by height finalizations_by_height: immutable::Archive>, // Finalized blocks stored by height - finalized_blocks: immutable::Archive, + finalized_blocks: immutable::Archive>, // ---------- Metrics ---------- // Latest height metric @@ -102,7 +110,13 @@ pub struct Actor Actor { +impl Actor +where + B: Block, + E: Rng + Spawner + Metrics + Clock + GClock + Storage, + V: Variant, + S: Scheme, +{ /// Create a new application actor. pub async fn init(context: E, config: Config) -> (Self, Mailbox) { // Initialize cache @@ -234,11 +248,11 @@ impl( mut self, application: impl Reporter, - buffer: buffered::Mailbox, - resolver: (mpsc::Receiver>, R), + buffer: coding::Mailbox, + resolver: (mpsc::Receiver>>, R), ) -> Handle<()> where - R: Resolver>, + R: Resolver>>, P: PublicKey, { spawn_cell!(self.context, self.run(application, buffer, resolver).await) @@ -248,10 +262,10 @@ impl( mut self, application: impl Reporter, - mut buffer: buffered::Mailbox, - (mut resolver_rx, mut resolver): (mpsc::Receiver>, R), + mut shards: coding::Mailbox, + (mut resolver_rx, mut resolver): (mpsc::Receiver>>, R), ) where - R: Resolver>, + R: Resolver>>, P: PublicKey, { // Process all finalized blocks in order (fetching any that are missing) @@ -269,7 +283,7 @@ impl::default(); + let mut block_waiters = AbortablePool::<(B::Commitment, B)>::default(); // Handle messages loop { @@ -282,11 +296,11 @@ impl { + result = block_waiters.next_completed() => { let Ok((commitment, block)) = result else { continue; // Aborted future }; - self.notify_subscribers(commitment, &block).await; + self.notify_block_subscribers(commitment, &block).await; }, // Handle consensus before finalizer or backfiller mailbox_message = self.mailbox.next() => { @@ -318,69 +332,29 @@ impl { - let _peers = buffer.broadcast(Recipients::All, block).await; - } - Message::Verified { round, block } => { - self.cache_verified(round, block.commitment(), block).await; - } - Message::Notarization { notarization } => { - let round = notarization.round(); - let commitment = notarization.proposal.payload; - - // Store notarization by view - self.cache.put_notarization(round, commitment, notarization.clone()).await; - - // Search for block locally, otherwise fetch it remotely - if let Some(block) = self.find_block(&mut buffer, commitment).await { - // If found, persist the block - self.cache_block(round, commitment, block).await; - } else { - debug!(?round, "notarized block missing"); - resolver.fetch(Request::::Notarized { round }).await; - } - } - Message::Finalization { finalization } => { - // Cache finalization by round - let round = finalization.round(); - let commitment = finalization.proposal.payload; - self.cache.put_finalization(round, commitment, finalization.clone()).await; - - // Search for block locally, otherwise fetch it remotely - if let Some(block) = self.find_block(&mut buffer, commitment).await { - // If found, persist the block - let height = block.height(); - self.finalize(height, commitment, block, Some(finalization), &mut notifier_tx).await; - debug!(?round, height, "finalized block stored"); - } else { - // Otherwise, fetch the block from the network. - debug!(?round, ?commitment, "finalized block missing"); - resolver.fetch(Request::::Block(commitment)).await; - } - } Message::GetBlock { identifier, response } => { match identifier { BlockID::Commitment(commitment) => { - let result = self.find_block(&mut buffer, commitment).await; - let _ = response.send(result); + let result = self.find_block(&mut shards, commitment).await; + let _ = response.send(result.map(CodedBlock::into_inner)); } BlockID::Height(height) => { let result = self.get_finalized_block(height).await; - let _ = response.send(result); + let _ = response.send(result.map(CodedBlock::into_inner)); } BlockID::Latest => { let block = match self.get_latest().await { - Some((_, commitment)) => self.find_block(&mut buffer, commitment).await, + Some((_, commitment)) => self.find_block(&mut shards, commitment).await, None => None, }; - let _ = response.send(block); + let _ = response.send(block.map(CodedBlock::into_inner)); } } } Message::Subscribe { round, commitment, response } => { // Check for block locally - if let Some(block) = self.find_block(&mut buffer, commitment).await { - let _ = response.send(block); + if let Some(block) = self.find_block(&mut shards, commitment).await { + let _ = response.send(block.into_inner()); continue; } @@ -401,7 +375,7 @@ impl::Notarized { round }).await; + resolver.fetch(Request::>::Notarized { round }).await; } // Register subscriber @@ -411,10 +385,9 @@ impl { - let (tx, rx) = oneshot::channel(); - buffer.subscribe_prepared(None, commitment, None, tx).await; - let aborter = waiters.push(async move { - (commitment, rx.await.expect("buffer subscriber closed")) + let rx = shards.subscribe_block(commitment).await; + let aborter = block_waiters.push(async move { + (commitment, rx.await.expect("buffer subscriber closed").into_inner()) }); entry.insert(BlockSubscription { subscribers: vec![response], @@ -423,6 +396,41 @@ impl { + tracing::warn!(?notarization, "Received notarization"); + let round = notarization.round(); + let commitment = notarization.proposal.payload; + + // Store notarization by round + self.cache.put_notarization(round, commitment, notarization.clone()).await; + + // Search for block locally, otherwise fetch it remotely + if let Some(block) = self.find_block(&mut shards, commitment).await { + // If found, persist the block + self.cache_block(round, commitment, block).await; + } else { + debug!(?round, "notarized block missing"); + resolver.fetch(Request::>::Notarized { round }).await; + } + } + Message::Finalization { finalization } => { + // Cache finalization by round + let round = finalization.round(); + let commitment = finalization.proposal.payload; + self.cache.put_finalization(round, commitment, finalization.clone()).await; + + // Search for block locally, otherwise fetch it remotely + if let Some(block) = self.find_block(&mut shards, commitment).await { + // If found, persist the block + let height = block.height(); + self.finalize(height, commitment, block, Some(finalization), &mut notifier_tx).await; + debug!(?round, height, "finalized block stored"); + } else { + // Otherwise, fetch the block from the network. + debug!(?round, ?commitment, "finalized block missing"); + resolver.fetch(Request::>::Block(commitment)).await; + } + } } }, // Handle finalizer messages next @@ -437,13 +445,13 @@ impl { + Orchestration::Processed { height, commitment } => { // Update metrics self.processed_height.set(height as i64); - // Cancel any outstanding requests (by height and by digest) - resolver.cancel(Request::::Block(digest)).await; - resolver.retain(Request::::Finalized { height }.predicate()).await; + // Cancel any outstanding requests (by height and by commitment) + resolver.cancel(Request::>::Block(commitment)).await; + resolver.retain(Request::>::Finalized { height }.predicate()).await; // If finalization exists, prune the archives if let Some(finalization) = self.get_finalization_by_height(height).await { @@ -453,13 +461,14 @@ impl::Notarized { round }.predicate()).await; + resolver.retain(Request::>::Notarized { round }.predicate()).await; } } Orchestration::Repair { height } => { @@ -479,14 +488,14 @@ impl height { let commitment = cursor.parent(); - if let Some(block) = self.find_block(&mut buffer, commitment).await { + if let Some(block) = self.find_block(&mut shards, commitment).await { let finalization = self.cache.get_finalization_for(commitment).await; self.finalize(block.height(), commitment, block.clone(), finalization, &mut notifier_tx).await; debug!(height = block.height(), "repaired block"); cursor = block; } else { // Request the next missing block digest - resolver.fetch(Request::::Block(commitment)).await; + resolver.fetch(Request::>::Block(commitment)).await; break; } } @@ -499,7 +508,7 @@ impl::Finalized { height }).await; + resolver.fetch(Request::>::Finalized { height }).await; } } } @@ -515,7 +524,7 @@ impl { // Check for block locally - let Some(block) = self.find_block(&mut buffer, commitment).await else { + let Some(block) = self.find_block(&mut shards, commitment).await else { debug!(?commitment, "block missing on request"); continue; }; @@ -546,8 +555,8 @@ impl { // Parse block - let Ok(block) = B::decode_cfg(value.as_ref(), &self.codec_config) else { + let Ok(block) = CodedBlock::::decode_cfg(value.as_ref(), &self.codec_config) else { let _ = response.send(false); continue; }; @@ -578,7 +587,7 @@ impl { // Parse finalization - let Ok((finalization, block)) = <(Finalization, B)>::decode_cfg(value, &((), self.codec_config.clone())) else { + let Ok((finalization, block)) = <(Finalization, CodedBlock)>::decode_cfg(value, &((), self.codec_config.clone())) else { let _ = response.send(false); continue; }; @@ -599,7 +608,7 @@ impl { // Parse notarization - let Ok((notarization, block)) = <(Notarization, B)>::decode_cfg(value, &((), self.codec_config.clone())) else { + let Ok((notarization, block)) = <(Notarization, CodedBlock)>::decode_cfg(value, &((), self.codec_config.clone())) else { let _ = response.send(false); continue; }; @@ -644,7 +653,7 @@ impl, + ) { + self.notify_block_subscribers(commitment, block.inner()) + .await; self.cache.put_block(round, commitment, block).await; } // -------------------- Immutable Storage -------------------- /// Get a finalized block from the immutable archive. - async fn get_finalized_block(&self, height: u64) -> Option { + async fn get_finalized_block(&self, height: u64) -> Option> { match self.finalized_blocks.get(ArchiveID::Index(height)).await { Ok(block) => block, Err(e) => panic!("failed to get block: {e}"), @@ -699,11 +708,12 @@ impl, finalization: Option>, notifier: &mut mpsc::Sender<()>, ) { - self.notify_subscribers(commitment, &block).await; + self.notify_block_subscribers(commitment, block.inner()) + .await; // In parallel, update the finalized blocks and finalizations archives if let Err(e) = try_join!( @@ -757,14 +767,21 @@ impl( &mut self, - buffer: &mut buffered::Mailbox, + buffer: &mut coding::Mailbox, commitment: B::Commitment, - ) -> Option { - // Check buffer. - if let Some(block) = buffer.get(None, commitment, None).await.into_iter().next() { + ) -> Option> { + // Check shard mailbox. + if let Some(block) = buffer + .try_reconstruct(commitment) + .await + .await + .expect("mailbox closed") + .expect("reconstruction error not yet handled") + { return Some(block); } - // Check verified / notarized blocks via cache manager. + + // Check notarized blocks via cache manager. if let Some(block) = self.cache.find_block(commitment).await { return Some(block); } diff --git a/consensus/src/marshal/cache.rs b/consensus/src/marshal/cache.rs index ea80d3934f..8a42b6c6bd 100644 --- a/consensus/src/marshal/cache.rs +++ b/consensus/src/marshal/cache.rs @@ -36,8 +36,6 @@ pub(crate) struct Config { /// Prunable archives for a single epoch. struct Cache { - /// Verified blocks stored by view - verified_blocks: prunable::Archive, /// Notarized blocks stored by view notarized_blocks: prunable::Archive, /// Notarizations stored by view @@ -50,7 +48,6 @@ impl Option { // Check in reverse order for cache in self.caches.values().rev() { - // Check verified blocks - if let Some(block) = cache - .verified_blocks - .get(Identifier::Key(&commitment)) - .await - .expect("failed to get verified block") - { - return Some(block); - } - // Check notarized blocks if let Some(block) = cache .notarized_blocks @@ -343,12 +314,10 @@ impl { - verified_blocks: vb, notarized_blocks: nb, notarizations: nv, finalizations: fv, } = self.caches.remove(epoch).unwrap(); - vb.destroy().await.expect("failed to destroy vb"); nb.destroy().await.expect("failed to destroy nb"); nv.destroy().await.expect("failed to destroy nv"); fv.destroy().await.expect("failed to destroy fv"); diff --git a/consensus/src/marshal/coding/actor.rs b/consensus/src/marshal/coding/actor.rs new file mode 100644 index 0000000000..3c496c59eb --- /dev/null +++ b/consensus/src/marshal/coding/actor.rs @@ -0,0 +1,769 @@ +//! Erasure coding wrapper for [buffered::Mailbox] + +use crate::{ + marshal::coding::{ + mailbox::{Mailbox, Message}, + types::{CodedBlock, DistributionShard, Shard}, + }, + types::CodingCommitment, + Block, +}; +use commonware_broadcast::{buffered, Broadcaster}; +use commonware_codec::Error as CodecError; +use commonware_coding::Scheme; +use commonware_cryptography::{ + bls12381::primitives::variant::Variant, Committable, Hasher, PublicKey, +}; +use commonware_macros::select; +use commonware_p2p::Recipients; +use commonware_runtime::{ContextCell, spawn_cell, Clock, Handle, Metrics, Spawner}; +use commonware_utils::futures::{AbortablePool, Aborter}; +use futures::{ + channel::{mpsc, oneshot}, + StreamExt, +}; +use governor::clock::Clock as GClock; +use prometheus_client::metrics::gauge::Gauge; +use rand::Rng; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use std::{ + collections::{btree_map::Entry, BTreeMap}, + ops::Deref, + time::Instant, +}; +use thiserror::Error; +use tracing::{debug, info}; + +/// An error that can occur during reconstruction of a [CodedBlock] from [Shard]s +#[derive(Debug, Error)] +pub enum ReconstructionError { + /// An error occurred while recovering the encoded blob from the [Shard]s + #[error(transparent)] + CodingRecovery(S::Error), + + /// An error occurred while decoding the reconstructed blob into a [CodedBlock] + #[error(transparent)] + Codec(#[from] CodecError), +} + +/// A subscription for a [Block] by its commitment +struct BlockSubscription { + /// A list of subscribers waiting for the block to be reconstructed + subscribers: Vec>, +} + +/// A subscription for a [Shard]'s validity, relative to a [CodingCommitment]. +struct ShardSubscription { + /// A list of subscribers waiting for the [Shard]'s validity to be checked. + subscribers: Vec>, + /// Aborter that aborts the waiter future when dropped. + _aborter: Aborter, +} + +/// A wrapper around a [buffered::Mailbox] for broadcasting and receiving erasure-coded +/// [Block]s as [Shard]s. +/// +/// When enough [Shard]s are present in the mailbox, the [Actor] may facilitate +/// reconstruction of the original [Block] and notify any subscribers waiting for it. +pub struct Actor +where + E: Rng + Spawner + Metrics + Clock + GClock, + V: Variant, + S: Scheme, + H: Hasher, + B: Block, + P: PublicKey, +{ + /// Context held by the actor. + context: ContextCell, + + /// Receiver for incoming messages to the actor. + mailbox: mpsc::Receiver>, + + /// Buffered mailbox for broadcasting and receiving [Shard]s to/from peers + buffer: buffered::Mailbox>, + + /// [commonware_codec::Read] configuration for decoding blocks + block_codec_cfg: B::Cfg, + + /// Open subscriptions for [CodedBlock]s by commitment + block_subscriptions: BTreeMap>>, + + /// Open subscriptions for [Shard]s checks by commitment and index + shard_subscriptions: BTreeMap<(CodingCommitment, usize), ShardSubscription>, + + /// A temporary in-memory cache of reconstructed blocks by commitment. + /// + /// These blocks are evicted by marshal after they are delivered to the application. + reconstructed_blocks: BTreeMap>, + + erasure_decode_duration: Gauge, +} + +impl Actor +where + E: Rng + Spawner + Metrics + Clock + GClock, + V: Variant, + S: Scheme, + H: Hasher, + B: Block, + P: PublicKey, +{ + /// Create a new [Actor]. + pub fn new( + context: E, + buffer: buffered::Mailbox>, + block_codec_cfg: B::Cfg, + ) -> (Self, Mailbox) { + let erasure_decode_duration = Gauge::default(); + context.register( + "erasure_decode_duration", + "Duration of erasure decoding in milliseconds", + erasure_decode_duration.clone(), + ); + + // TODO: Configure mailbox size. + let (sender, mailbox) = mpsc::channel(1000); + ( + Self { + context: ContextCell::new(context), + mailbox, + buffer, + block_codec_cfg, + block_subscriptions: BTreeMap::new(), + shard_subscriptions: BTreeMap::new(), + reconstructed_blocks: BTreeMap::new(), + erasure_decode_duration, + }, + Mailbox::new(sender), + ) + } + + /// Start the actor. + pub fn start(mut self) -> Handle<()> { + spawn_cell!(self.context, self.run().await) + } + + /// Run the shard actor. + async fn run(mut self) { + let mut shard_validity_waiters = + AbortablePool::<((CodingCommitment, usize), Shard)>::default(); + + loop { + // Prune any dropped subscribers. + self.shard_subscriptions.retain(|_, sub| { + sub.subscribers.retain(|tx| !tx.is_canceled()); + !sub.subscribers.is_empty() + }); + + select! { + // Always serve any outstanding subscriptions first to unblock the hotpath of proposals / notarizations. + result = shard_validity_waiters.next_completed() => { + let Ok(((commitment, index), shard)) = result else { + // Aborted future + continue; + }; + + let valid = shard.verify(); + + // Notify all subscribers + if let Some(mut sub) = self.shard_subscriptions.remove(&(commitment, index)) { + for responder in sub.subscribers.drain(..) { + let _ = responder.send(valid); + } + } + + if valid { + // If the shard is valid, broadcast it to all peers. + if let Some(shard) = self.get_shard(commitment, index).await { + self.broadcast_shard(shard).await; + } + } + }, + message = self.mailbox.next() => { + let Some(message) = message else { + info!("Shard mailbox closed, shutting down"); + return; + }; + match message { + Message::Broadcast { block, peers } => { + self.broadcast_shards(block, peers).await; + } + Message::VerifyShard { + commitment, + index, + response, + } => { + self.subscribe_shard_validity( + commitment, + index, + response, + &mut shard_validity_waiters + ).await; + } + Message::TryReconstruct { + commitment, + response, + } => { + let result = self.try_reconstruct(commitment).await; + + // Send the response; if the receiver has been dropped, we don't care. + let _ = response.send(result); + } + Message::SubscribeBlock { + commitment, + response, + } => { + self.subscribe_block(commitment, response).await; + } + Message::Finalized { commitment } => { + // Evict any finalized blocks from the cache to free up memory; They're + // now persisted on disk durably by marshal. + self.reconstructed_blocks.remove(&commitment); + } + Message::Notarize { notarization } => { + let _ = self.try_reconstruct(notarization.proposal.payload).await; + } + } + } + } + } + } + + /// Broadcasts [Shard]s of a [Block] to a pre-determined set of peers + /// + /// ## Panics + /// + /// Panics if the number of `participants` is not equal to the number of [Shard]s in the `block` + async fn broadcast_shards(&mut self, mut block: CodedBlock, participants: Vec

) { + assert_eq!( + participants.len(), + block.shards().len(), + "number of participants must equal number of shards" + ); + + for (index, peer) in participants.into_iter().enumerate() { + let message = block + .shard(index) + .expect("peer index impossibly out of bounds"); + let _peers = self.buffer.broadcast(Recipients::One(peer), message).await; + } + } + + /// Broadcasts a local [Shard] of a block to all peers. + async fn broadcast_shard(&mut self, shard: Shard) { + let commitment = shard.commitment(); + let index = shard.index(); + + let DistributionShard::Strong(shard) = shard.into_inner() else { + // If the shard is already weak, it's been broadcasted to us already; + // no need to re-broadcast. + return; + }; + + let Ok((_, _, reshard)) = S::reshard( + &commitment.config(), + &commitment.inner(), + index as u16, + shard, + ) else { + // If the shard can't be verified locally, don't broadcast anything. + return; + }; + + // Broadcast the weak shard to all peers for reconstruction. + let reshard = Shard::new(commitment, index, DistributionShard::Weak(reshard)); + let _peers = self.buffer.broadcast(Recipients::All, reshard).await; + + debug!(%commitment, index, "broadcasted local shard to all peers"); + } + + /// Attempts to reconstruct a [CodedBlock] from [Shard]s present in the mailbox + /// + /// If not enough [Shard]s are present, returns [None]. If enough [Shard]s are present and + /// reconstruction fails, returns a [ReconstructionError] + async fn try_reconstruct( + &mut self, + commitment: CodingCommitment, + ) -> Result>, ReconstructionError> { + if let Some(block) = self.reconstructed_blocks.get(&commitment) { + // Notify any subscribers that have been waiting for this block to be reconstructed + if let Some(mut sub) = self.block_subscriptions.remove(&commitment) { + for sub in sub.subscribers.drain(..) { + let _ = sub.send(block.clone()); + } + } + return Ok(Some(block.clone())); + } + + let shards = self.buffer.get(None, commitment, None).await; + let config = commitment.config(); + + // Search for a strong shard to form the checking data. We must have at least one strong shard + // sent to us by the proposer. In the case of the proposer, all shards in the mailbox will be strong, + // but any can be used for forming the checking data. + // + // NOTE: Byzantine peers may send us strong shards as well, but we don't care about those; + // `Scheme::reshard` verifies the shard against the commitment, and if it doesn't check out, + // it will be ignored. + let Some(checking_data) = shards.iter().find_map(|s| { + if let DistributionShard::Strong(shard) = s.deref() { + S::reshard( + &config, + &commitment.inner(), + s.index() as u16, + shard.clone(), + ) + .map(|(checking_data, _, _)| checking_data) + .ok() + } else { + None + } + }) else { + debug!(%commitment, "No strong shards present to form checking data"); + return Ok(None); + }; + + let checked_shards = shards + .into_par_iter() + .filter_map(|s| { + let index = s.index() as u16; + match s.into_inner() { + DistributionShard::Strong(shard) => { + // Any strong shards, at this point, were sent from the proposer. + // We use the reshard interface to produce our checked shard rather + // than taking two hops. + S::reshard(&config, &commitment.inner(), index, shard) + .map(|(_, checked, _)| checked) + .ok() + } + DistributionShard::Weak(re_shard) => S::check( + &config, + &commitment.inner(), + &checking_data, + index, + re_shard, + ) + .ok(), + } + }) + .collect::>(); + + if checked_shards.len() < config.minimum_shards as usize { + debug!(%commitment, "Not enough checked shards to reconstruct block"); + return Ok(None); + } + + // Attempt to reconstruct the encoded blob + let start = Instant::now(); + let decoded = S::decode( + &config, + &commitment.inner(), + checking_data.clone(), + checked_shards.as_slice(), + ) + .map_err(ReconstructionError::CodingRecovery)?; + self.erasure_decode_duration + .set(start.elapsed().as_millis() as i64); + + // Attempt to decode the block from the encoded blob + let inner = B::read_cfg(&mut decoded.as_slice(), &self.block_codec_cfg)?; + + // Construct a coding block with a _trusted_ commitment. `S::decode` verified the blob's + // integrity, so shards can be lazily re-constructed if need be. + let block = CodedBlock::new_trusted(inner, commitment); + + debug!( + %commitment, + parent = %block.parent(), + height = block.height(), + "Successfully reconstructed block from shards" + ); + + self.reconstructed_blocks.insert(commitment, block.clone()); + + // Notify any subscribers that have been waiting for this block to be reconstructed + if let Some(mut sub) = self.block_subscriptions.remove(&commitment) { + for sub in sub.subscribers.drain(..) { + let _ = sub.send(block.clone()); + } + } + + Ok(Some(block)) + } + + /// Performs a best-effort retrieval of a [Shard] by commitment and index + /// + /// If the mailbox does not have the shard cached, [None] is returned + pub async fn get_shard( + &mut self, + commitment: CodingCommitment, + index: usize, + ) -> Option> { + let index_hash = Shard::::uuid(commitment, index); + self.buffer + .get(None, commitment, Some(index_hash)) + .await + .first() + .cloned() + } + + /// Subscribes to a [Shard] by commitment and index with an externally prepared responder + /// + /// The responder will be sent the shard when it is available; either instantly (if cached) + /// or when it is received from the network. The request can be canceled by dropping the + /// responder + #[allow(clippy::type_complexity)] + pub async fn subscribe_shard_validity( + &mut self, + commitment: CodingCommitment, + index: usize, + responder: oneshot::Sender, + pool: &mut AbortablePool<((CodingCommitment, usize), Shard)>, + ) { + // If we already have the shard cached, send it immediately. + if let Some(shard) = self.get_shard(commitment, index).await { + let valid = shard.verify(); + let _ = responder.send(valid); + + // Broadcast the shard to all peers if it's valid. + if valid { + self.broadcast_shard(shard).await; + } + return; + } + + match self.shard_subscriptions.entry((commitment, index)) { + Entry::Vacant(entry) => { + let (tx, rx) = oneshot::channel(); + let index_hash = Shard::::uuid(commitment, index); + self.buffer + .subscribe_prepared(None, commitment, Some(index_hash), tx) + .await; + let aborter = pool.push(async move { + let shard = rx.await.expect("shard subscription aborted"); + ((commitment, index), shard) + }); + entry.insert(ShardSubscription { + subscribers: vec![responder], + _aborter: aborter, + }); + } + Entry::Occupied(mut entry) => { + entry.get_mut().subscribers.push(responder); + } + } + } + + /// Subscribes to a [CodedBlock] by commitment with an externally prepared responder + /// + /// The responder will be sent the block when it is available; either instantly (if cached) + /// or when it is received from the network. The request can be canceled by dropping the + /// responder + pub async fn subscribe_block( + &mut self, + commitment: CodingCommitment, + responder: oneshot::Sender>, + ) { + if let Some(block) = self.reconstructed_blocks.get(&commitment) { + // If we already have the block reconstructed, send it immediately. + let _ = responder.send(block.clone()); + return; + } + + match self.block_subscriptions.entry(commitment) { + Entry::Vacant(entry) => { + entry.insert(BlockSubscription { + subscribers: vec![responder], + }); + } + Entry::Occupied(mut entry) => { + entry.get_mut().subscribers.push(responder); + } + } + + // Attempt reconstruction in case it hasn't been attempted yet. + let _ = self.try_reconstruct(commitment).await; + } +} + +#[cfg(test)] +mod test { + // use super::*; + // use crate::marshal::mocks::block::Block as MockBlock; + // use commonware_coding::ReedSolomon; + // use commonware_cryptography::{ + // ed25519::{PrivateKey, PublicKey}, + // sha256::Digest as Sha256Digest, + // Committable, PrivateKeyExt, Sha256, Signer, + // }; + // use commonware_macros::test_traced; + // use commonware_p2p::simulated::{Link, Oracle, Receiver, Sender}; + // use commonware_runtime::{deterministic, Clock, Metrics, Runner}; + // use std::time::Duration; + // + // // Number of messages to cache per sender + // const CACHE_SIZE: usize = 10; + // + // // Network speed for the simulated network + // const NETWORK_SPEED: Duration = Duration::from_millis(100); + // + // // The max size of a shard sent over the wire + // const MAX_SHARD_SIZE: usize = 1024 * 1024; // 1 MiB + // + // // The number of peers in the simulated network + // const NUM_PEERS: u32 = 8; + // + // type Registrations = BTreeMap, Receiver)>; + // type B = MockBlock; + // type SMailbox = ShardMailbox, Sha256, B, PublicKey>; + // + // async fn initialize_simulation( + // context: deterministic::Context, + // num_peers: u32, + // success_rate: f64, + // ) -> (Vec, Registrations, Oracle) { + // let (network, mut oracle) = + // commonware_p2p::simulated::Network::::new( + // context.with_label("network"), + // commonware_p2p::simulated::Config { + // max_size: 1024 * 1024, + // disconnect_on_block: true, + // }, + // ); + // network.start(); + // + // let mut schemes = (0..num_peers) + // .map(|i| PrivateKey::from_seed(i as u64)) + // .collect::>(); + // schemes.sort_by_key(|s| s.public_key()); + // let peers: Vec = schemes.iter().map(|c| c.public_key()).collect(); + // + // let mut registrations: Registrations = BTreeMap::new(); + // for peer in peers.iter() { + // let (sender, receiver) = oracle.register(peer.clone(), 0).await.unwrap(); + // registrations.insert(peer.clone(), (sender, receiver)); + // } + // + // // Add links between all peers + // let link = Link { + // latency: NETWORK_SPEED, + // jitter: Duration::ZERO, + // success_rate, + // }; + // for p1 in peers.iter() { + // for p2 in peers.iter() { + // if p2 == p1 { + // continue; + // } + // oracle + // .add_link(p1.clone(), p2.clone(), link.clone()) + // .await + // .unwrap(); + // } + // } + // + // (peers, registrations, oracle) + // } + // + // fn spawn_peer_engines( + // context: deterministic::Context, + // registrations: &mut Registrations, + // ) -> BTreeMap { + // let mut mailboxes = BTreeMap::new(); + // while let Some((peer, network)) = registrations.pop_first() { + // let context = context.with_label(&peer.to_string()); + // let config = buffered::Config { + // public_key: peer.clone(), + // mailbox_size: 1024, + // deque_size: CACHE_SIZE, + // priority: false, + // codec_config: (MAX_SHARD_SIZE, MAX_SHARD_SIZE), + // }; + // let (engine, engine_mailbox) = buffered::Engine::< + // _, + // PublicKey, + // Shard, Sha256>, + // >::new(context.clone(), config); + // let shard_mailbox = + // SMailbox::new(context.with_label("shard_mailbox"), engine_mailbox, ()); + // mailboxes.insert(peer.clone(), shard_mailbox); + // + // engine.start(network); + // } + // mailboxes + // } + // + // #[test] + // #[should_panic] + // fn test_broadcast_mismatched_peers_panics() { + // let executor = deterministic::Runner::default(); + // executor.start(|ctx| async move { + // let (peers, mut registrations, _network) = + // initialize_simulation(ctx.with_label("network"), NUM_PEERS, 1.0).await; + // let mut mailboxes = spawn_peer_engines(ctx.with_label("mailboxes"), &mut registrations); + // + // let coding_config = commonware_coding::Config { + // minimum_shards: (NUM_PEERS / 2) as u16, + // extra_shards: (NUM_PEERS / 2) as u16, + // }; + // + // let inner = B::new::(Default::default(), 1, 2); + // let coded_block = CodedBlock::>::new(inner, coding_config); + // + // // Broadcast all shards out (proposer) with too few peers - should panic. + // let first_mailbox = mailboxes.get_mut(peers.first().unwrap()).unwrap(); + // first_mailbox + // .broadcast_shards( + // coded_block.clone(), + // peers + // .clone() + // .into_iter() + // .take(NUM_PEERS as usize - 1) + // .collect(), + // ) + // .await; + // }); + // } + // + // #[test_traced("DEBUG")] + // fn test_basic_delivery_and_retrieval() { + // let executor = deterministic::Runner::default(); + // executor.start(|ctx| async move { + // let (peers, mut registrations, _network) = + // initialize_simulation(ctx.with_label("network"), NUM_PEERS, 1.0).await; + // let mut mailboxes = spawn_peer_engines(ctx.with_label("mailboxes"), &mut registrations); + // + // let coding_config = commonware_coding::Config { + // minimum_shards: (NUM_PEERS / 2) as u16, + // extra_shards: (NUM_PEERS / 2) as u16, + // }; + // + // let inner = B::new::(Default::default(), 1, 2); + // let coded_block = CodedBlock::>::new(inner, coding_config); + // + // // Broadcast all shards out (proposer) + // let first_mailbox = mailboxes.get_mut(peers.first().unwrap()).unwrap(); + // first_mailbox + // .broadcast_shards(coded_block.clone(), peers.clone()) + // .await; + // ctx.sleep(Duration::from_millis(200)).await; + // + // // Broadcast individual shards (post-notarization votes). + // for (i, peer) in peers.iter().enumerate() { + // let mailbox = mailboxes.get_mut(peer).unwrap(); + // mailbox + // .try_broadcast_shard(coded_block.commitment(), i) + // .await; + // } + // ctx.sleep(Duration::from_millis(200)).await; + // + // // Ensure all peers get the block. + // for peer in peers.iter() { + // let first_mailbox = mailboxes.get_mut(peer).unwrap(); + // let block = first_mailbox + // .try_reconstruct(coded_block.commitment()) + // .await + // .unwrap() + // .unwrap(); + // assert_eq!(block.commitment(), coded_block.commitment()); + // assert_eq!(block.height(), coded_block.height()); + // } + // }); + // } + // + // #[test_traced("DEBUG")] + // fn test_subscribe_to_block() { + // let executor = deterministic::Runner::default(); + // executor.start(|ctx| async move { + // let (peers, mut registrations, _network) = + // initialize_simulation(ctx.with_label("network"), NUM_PEERS, 1.0).await; + // let mut mailboxes = spawn_peer_engines(ctx.with_label("mailboxes"), &mut registrations); + // + // let coding_config = commonware_coding::Config { + // minimum_shards: (NUM_PEERS / 2) as u16, + // extra_shards: (NUM_PEERS / 2) as u16, + // }; + // + // let inner = B::new::(Default::default(), 1, 2); + // let coded_block = CodedBlock::>::new(inner, coding_config); + // + // // Broadcast all shards out (proposer) + // let first_mailbox = mailboxes.get_mut(peers.first().unwrap()).unwrap(); + // first_mailbox + // .broadcast_shards(coded_block.clone(), peers.clone()) + // .await; + // ctx.sleep(Duration::from_millis(200)).await; + // + // // Broadcast individual shards (post-notarization votes). + // for (i, peer) in peers.iter().enumerate() { + // let mailbox = mailboxes.get_mut(peer).unwrap(); + // mailbox + // .try_broadcast_shard(coded_block.commitment(), i) + // .await; + // } + // ctx.sleep(Duration::from_millis(200)).await; + // + // // Ensure all peers get the block. + // for peer in peers.iter() { + // let first_mailbox = mailboxes.get_mut(peer).unwrap(); + // let (tx, rx) = oneshot::channel(); + // first_mailbox + // .subscribe_block(coded_block.commitment(), tx) + // .await + // .unwrap(); + // let block = rx.await.unwrap(); + // + // assert_eq!(block.commitment(), coded_block.commitment()); + // assert_eq!(block.height(), coded_block.height()); + // } + // }); + // } + // + // #[test_traced("DEBUG")] + // fn test_subscribe_to_shard() { + // let executor = deterministic::Runner::default(); + // executor.start(|ctx| async move { + // let (peers, mut registrations, _network) = + // initialize_simulation(ctx.with_label("network"), NUM_PEERS, 1.0).await; + // let mut mailboxes = spawn_peer_engines(ctx.with_label("mailboxes"), &mut registrations); + // + // let coding_config = commonware_coding::Config { + // minimum_shards: (NUM_PEERS / 2) as u16, + // extra_shards: (NUM_PEERS / 2) as u16, + // }; + // + // let inner = B::new::(Default::default(), 1, 2); + // let coded_block = CodedBlock::>::new(inner, coding_config); + // + // // Broadcast all shards out (proposer) + // let first_mailbox = mailboxes.get_mut(peers.first().unwrap()).unwrap(); + // first_mailbox + // .broadcast_shards(coded_block.clone(), peers.clone()) + // .await; + // ctx.sleep(Duration::from_millis(200)).await; + // + // // Broadcast individual shards (post-notarization votes). + // for (i, peer) in peers.iter().enumerate() { + // let mailbox = mailboxes.get_mut(peer).unwrap(); + // mailbox + // .try_broadcast_shard(coded_block.commitment(), i) + // .await; + // } + // ctx.sleep(Duration::from_millis(200)).await; + // + // // Ensure all peers get their shards. + // for (i, peer) in peers.iter().enumerate() { + // let first_mailbox = mailboxes.get_mut(peer).unwrap(); + // let (tx, rx) = oneshot::channel(); + // first_mailbox + // .subscribe_shard(coded_block.commitment(), i, tx) + // .await; + // let shard = rx.await.unwrap(); + // + // assert_eq!(shard.commitment(), coded_block.commitment()); + // assert_eq!(shard.index(), i); + // } + // }); + // } +} diff --git a/consensus/src/marshal/coding/application.rs b/consensus/src/marshal/coding/application.rs new file mode 100644 index 0000000000..ee26add89d --- /dev/null +++ b/consensus/src/marshal/coding/application.rs @@ -0,0 +1,293 @@ +//! A wrapper around an [Application] that intercepts messages from consensus and marshal, +//! hiding details of erasure coded broadcast and shard verification. + +use crate::{ + marshal::coding::{self, types::CodedBlock}, + threshold_simplex::types::Context, + types::{CodingCommitment, View}, + Application, Automaton, Block, Epochable, Relay, Reporter, Supervisor, Viewable, +}; +use commonware_coding::{Config as CodingConfig, Scheme}; +use commonware_cryptography::{bls12381::primitives::variant::Variant, Committable, PublicKey}; +use commonware_runtime::{Clock, Metrics, Spawner}; +use futures::{channel::oneshot, lock::Mutex}; +use prometheus_client::metrics::gauge::Gauge; +use rand::Rng; +use std::{sync::Arc, time::Instant}; +use tracing::{debug, info, warn}; + +/// An [Application] adapter that handles erasure coding and shard verification for consensus. +#[derive(Clone)] +#[allow(clippy::type_complexity)] +pub struct CodingAdapter +where + E: Rng + Spawner + Metrics + Clock, + A: Application, + V: Variant, + B: Block, + S: Scheme, + P: PublicKey, + Z: Supervisor, +{ + context: E, + application: A, + coding: coding::mailbox::Mailbox, + identity: P, + supervisor: Z, + last_built: Arc)>>>, + + parent_fetch_duration: Gauge, + build_duration: Gauge, + erasure_code_duration: Gauge, +} + +impl CodingAdapter +where + E: Rng + Spawner + Metrics + Clock, + A: Application>, + V: Variant, + B: Block, + S: Scheme, + P: PublicKey, + Z: Supervisor, +{ + pub fn new( + context: E, + application: A, + coding: coding::mailbox::Mailbox, + identity: P, + supervisor: Z, + ) -> Self { + let parent_fetch_duration = Gauge::default(); + context.register( + "parent_fetch_duration", + "Time taken to fetch a parent block from marshal to build on top of, in milliseconds", + parent_fetch_duration.clone(), + ); + + let build_duration = Gauge::default(); + context.register( + "build_duration", + "Time taken for the application to build a new block, in milliseconds", + build_duration.clone(), + ); + + let erasure_code_duration = Gauge::default(); + context.register( + "erasure_code_duration", + "Time taken to erasure code a built block, in milliseconds", + erasure_code_duration.clone(), + ); + + Self { + context, + application, + coding, + identity, + supervisor, + last_built: Arc::new(Mutex::new(None)), + + parent_fetch_duration, + build_duration, + erasure_code_duration, + } + } +} + +impl Automaton for CodingAdapter +where + E: Rng + Spawner + Metrics + Clock, + A: Application>, + V: Variant, + B: Block, + S: Scheme, + P: PublicKey, + Z: Supervisor, +{ + type Digest = B::Commitment; + type Context = A::Context; + + async fn genesis(&mut self, epoch: ::Epoch) -> Self::Digest { + self.application.genesis(epoch).await.commitment() + } + + async fn propose(&mut self, context: Context) -> oneshot::Receiver { + let (_, parent_commitment) = context.parent; + let genesis = self.application.genesis(context.epoch()).await; + let mut coding = self.coding.clone(); + let mut application = self.application.clone(); + let last_built = self.last_built.clone(); + + let participants = self + .supervisor + .participants(context.view()) + .expect("failed to get participants for round"); + + // Compute the coding configuration from the number of participants. + // + // Currently, `CodingAdapter` mandates the use of `threshold_simplex`, + // which requires at least `3f + 1` participants to tolerate `f` faults. + let n_participants = participants.len() as u16; + let coding_config = coding_config_for_participants(n_participants); + + // Metrics + let parent_fetch_duration = self.parent_fetch_duration.clone(); + let build_duration = self.build_duration.clone(); + let erasure_code_duration = self.erasure_code_duration.clone(); + + let (tx, rx) = oneshot::channel(); + self.context + .with_label("propose") + .spawn(move |r_ctx| async move { + let start = Instant::now(); + let parent_block = if parent_commitment == genesis.commitment() { + genesis + } else { + let block_request = coding.subscribe_block(parent_commitment).await.await; + + if let Ok(block) = block_request { + block.into_inner() + } else { + warn!("propose job aborted"); + return; + } + }; + parent_fetch_duration.set(start.elapsed().as_millis() as i64); + + let start = Instant::now(); + let built_block = application + .build(r_ctx.with_label("build"), parent_commitment, parent_block) + .await; + build_duration.set(start.elapsed().as_millis() as i64); + + let start = Instant::now(); + let coded_block = CodedBlock::new(built_block, coding_config); + erasure_code_duration.set(start.elapsed().as_millis() as i64); + + let commitment = coded_block.commitment(); + + // Update the latest built block. + { + let mut lock = last_built.lock().await; + *lock = Some((context.view(), coded_block)); + } + + let result = tx.send(commitment); + info!( + round = %context.round, + ?commitment, + success = result.is_ok(), + "proposed new block" + ); + }); + rx + } + + async fn verify( + &mut self, + context: Context, + payload: Self::Digest, + ) -> oneshot::Receiver { + let participants = self + .supervisor + .participants(context.view()) + .expect("failed to get participants for round"); + + let coding_config = coding_config_for_participants(participants.len() as u16); + let config_matches = payload.config() == coding_config; + if !config_matches { + warn!( + round = %context.round, + got = ?payload.config(), + expected = ?coding_config, + "rejected proposal with unexpected coding configuration" + ); + + let (tx, rx) = oneshot::channel(); + tx.send(false).expect("failed to send verify result"); + return rx; + } + + let mut coding = self.coding.clone(); + let self_index = self + .supervisor + .is_participant(context.view(), &self.identity) + .expect("failed to get self index among participants"); + + #[allow(clippy::async_yields_async)] + self.context + .with_label("verify") + .spawn(move |_| async move { coding.verify_shard(payload, self_index as usize).await }) + .await + .expect("failed to spawn verify task") + } +} + +impl Relay for CodingAdapter +where + E: Rng + Spawner + Metrics + Clock, + A: Application>, + V: Variant, + B: Block, + S: Scheme, + P: PublicKey, + Z: Supervisor, +{ + type Digest = B::Commitment; + + async fn broadcast(&mut self, _commitment: Self::Digest) { + let Some((round, block)) = self.last_built.lock().await.clone() else { + warn!("missing block to broadcast"); + return; + }; + + let participants = self + .supervisor + .participants(round) + .expect("failed to get participants for round") + .to_vec(); + + debug!( + round = %round, + commitment = %block.commitment(), + height = block.height(), + "requested broadcast of built block" + ); + self.coding.broadcast(block, participants).await; + } +} + +impl Reporter for CodingAdapter +where + E: Rng + Spawner + Metrics + Clock, + A: Application>, + V: Variant, + B: Block, + S: Scheme, + P: PublicKey, + Z: Supervisor, +{ + type Activity = B; + + async fn report(&mut self, block: Self::Activity) { + self.application.finalize(block).await + } +} + +/// Compute the [CodingConfig] for a given number of participants. +/// +/// Currently, [CodingAdapter] mandates the use of `threshold_simplex`, +/// which requires at least `3f + 1` participants to tolerate `f` faults. +/// +/// The generated coding configuration facilitates any `f + 1` parts to reconstruct the data. +fn coding_config_for_participants(n_participants: u16) -> CodingConfig { + assert!( + n_participants >= 4, + "Need at least 4 participants to maintain fault tolerance with threshold_simplex" + ); + let max_faults = (n_participants - 1) / 3; + CodingConfig { + minimum_shards: max_faults + 1, + extra_shards: n_participants - (max_faults + 1), + } +} diff --git a/consensus/src/marshal/coding/mailbox.rs b/consensus/src/marshal/coding/mailbox.rs new file mode 100644 index 0000000000..c60b872610 --- /dev/null +++ b/consensus/src/marshal/coding/mailbox.rs @@ -0,0 +1,179 @@ +//! Mailbox for the shard layer actor + +use super::types::CodedBlock; +use crate::{ + marshal::coding::actor::ReconstructionError, + threshold_simplex::types::{Activity, Notarize}, + types::CodingCommitment, + Block, Reporter, +}; +use commonware_coding::Scheme; +use commonware_cryptography::{bls12381::primitives::variant::Variant, PublicKey}; +use futures::{ + channel::{mpsc, oneshot}, + SinkExt, +}; +use tracing::error; + +/// A message that can be sent to the [Actor]. +/// +/// [Actor]: super::Actor +pub enum Message +where + V: Variant, + B: Block, + S: Scheme, + P: PublicKey, +{ + /// Broadcast an erasure coded block's shards to a set of peers. + Broadcast { + /// The erasure coded block. + block: CodedBlock, + /// The peers to broadcast the shards to. + peers: Vec

, + }, + /// Verifies a shard at a given commitment and index. If the shard is valid, it will be + /// broadcasted to all peers. + VerifyShard { + /// The [CodingCommitment] for the block the shard belongs to. + commitment: CodingCommitment, + /// The index of the shard in the erasure coded block. + index: usize, + /// A response channel to send the result to. + response: oneshot::Sender, + }, + /// Attempt to reconstruct a block from received shards. + TryReconstruct { + /// The [CodingCommitment] for the block to reconstruct. + commitment: CodingCommitment, + /// A response channel to send the reconstructed block to. + #[allow(clippy::type_complexity)] + response: oneshot::Sender>, ReconstructionError>>, + }, + /// Subscribe to notifications for when a block is fully reconstructed. + SubscribeBlock { + /// The [CodingCommitment] for the block to subscribe to. + commitment: CodingCommitment, + /// A response channel to send the reconstructed block to. + response: oneshot::Sender>, + }, + /// A notarization vote to be processed. + Notarize { + /// The notarization vote. + notarization: Notarize, + }, + /// A notice that a block has been finalized. + Finalized { + /// The [CodingCommitment] for the finalized block. + commitment: CodingCommitment, + }, +} + +/// A mailbox for sending messages to the [Actor]. +/// +/// [Actor]: super::Actor +#[derive(Clone)] +pub struct Mailbox +where + V: Variant, + B: Block, + S: Scheme, + P: PublicKey, +{ + sender: mpsc::Sender>, +} + +impl Mailbox +where + V: Variant, + B: Block, + S: Scheme, + P: PublicKey, +{ + /// Create a new [Mailbox] with the given sender. + pub fn new(sender: mpsc::Sender>) -> Self { + Self { sender } + } + + /// Broadcast an erasure coded block's shards to a set of peers. + pub async fn broadcast(&mut self, block: CodedBlock, peers: Vec

) { + let msg = Message::Broadcast { block, peers }; + self.sender.send(msg).await.expect("mailbox closed"); + } + + /// Broadcast an individual shard to all peers. + pub async fn verify_shard( + &mut self, + commitment: CodingCommitment, + index: usize, + ) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + let msg = Message::VerifyShard { + commitment, + index, + response: tx, + }; + self.sender.send(msg).await.expect("mailbox closed"); + + rx + } + + /// Attempt to reconstruct a block from received shards. + pub async fn try_reconstruct( + &mut self, + commitment: CodingCommitment, + ) -> oneshot::Receiver>, ReconstructionError>> { + let (tx, rx) = oneshot::channel(); + let msg = Message::TryReconstruct { + commitment, + response: tx, + }; + self.sender.send(msg).await.expect("mailbox closed"); + + rx + } + + /// Subscribe to notifications for when a block is fully reconstructed. + pub async fn subscribe_block( + &mut self, + commitment: CodingCommitment, + ) -> oneshot::Receiver> { + let (tx, rx) = oneshot::channel(); + let msg = Message::SubscribeBlock { + commitment, + response: tx, + }; + self.sender.send(msg).await.expect("mailbox closed"); + + rx + } + + /// A notice that a block has been finalized. + pub async fn finalized(&mut self, commitment: CodingCommitment) { + let msg = Message::Finalized { commitment }; + self.sender.send(msg).await.expect("mailbox closed"); + } +} + +impl Reporter for Mailbox +where + V: Variant, + B: Block, + S: Scheme, + P: PublicKey, +{ + type Activity = Activity; + + async fn report(&mut self, activity: Self::Activity) { + let message = match activity { + Activity::Notarize(notarization) => Message::Notarize { notarization }, + _ => { + // Ignore other activity types + return; + } + }; + if self.sender.send(message).await.is_err() { + error!("failed to report activity to actor: receiver dropped"); + } + } +} diff --git a/consensus/src/marshal/coding/mod.rs b/consensus/src/marshal/coding/mod.rs new file mode 100644 index 0000000000..a30750d7a5 --- /dev/null +++ b/consensus/src/marshal/coding/mod.rs @@ -0,0 +1,11 @@ +mod application; +pub use application::CodingAdapter; + +mod actor; +pub use actor::{Actor, ReconstructionError}; + +mod mailbox; +pub use mailbox::Mailbox; + +mod types; +pub use types::{CodedBlock, DistributionShard, Shard}; diff --git a/consensus/src/marshal/coding/types.rs b/consensus/src/marshal/coding/types.rs new file mode 100644 index 0000000000..1a2890b2be --- /dev/null +++ b/consensus/src/marshal/coding/types.rs @@ -0,0 +1,482 @@ +//! Types for erasure coded [Block] broadcast and reconstruction. + +use crate::{types::CodingCommitment, Block}; +use commonware_codec::{EncodeSize, FixedSize, RangeCfg, Read, ReadExt, Write}; +use commonware_coding::{Config as CodingConfig, Scheme}; +use commonware_cryptography::{Committable, Digestible, Hasher}; +use std::{fmt::Debug, ops::Deref}; + +const STRONG_SHARD_TAG: u8 = 0; +const WEAK_SHARD_TAG: u8 = 1; + +/// A shard of erasure coded data, either a strong shard (from the proposer) or a weak shard +/// (from a non-proposer). +/// +/// A weak shard cannot be checked for validity on its own. +#[derive(Clone)] +pub enum DistributionShard { + /// A shard that is broadcasted by the proposer, containing extra information for generating + /// checking data. + Strong(S::Shard), + /// A shard that is broadcasted by a non-proposer, containing only the shard data. + Weak(S::ReShard), +} + +impl Write for DistributionShard { + fn write(&self, buf: &mut impl bytes::BufMut) { + match self { + DistributionShard::Strong(shard) => { + buf.put_u8(STRONG_SHARD_TAG); + shard.write(buf); + } + DistributionShard::Weak(reshard) => { + buf.put_u8(WEAK_SHARD_TAG); + reshard.write(buf); + } + } + } +} + +impl EncodeSize for DistributionShard { + fn encode_size(&self) -> usize { + 1 + match self { + DistributionShard::Strong(shard) => shard.encode_size(), + DistributionShard::Weak(reshard) => reshard.encode_size(), + } + } +} + +impl Read for DistributionShard { + type Cfg = (::Cfg, ::Cfg); + + fn read_cfg( + buf: &mut impl bytes::Buf, + (shard_cfg, reshard_cfg): &Self::Cfg, + ) -> Result { + match buf.get_u8() { + STRONG_SHARD_TAG => { + let shard = S::Shard::read_cfg(buf, shard_cfg)?; + Ok(DistributionShard::Strong(shard)) + } + WEAK_SHARD_TAG => { + let reshard = S::ReShard::read_cfg(buf, reshard_cfg)?; + Ok(DistributionShard::Weak(reshard)) + } + _ => Err(commonware_codec::Error::Invalid( + "DistributionShard", + "invalid tag", + )), + } + } +} + +impl PartialEq for DistributionShard { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (DistributionShard::Strong(a), DistributionShard::Strong(b)) => a == b, + (DistributionShard::Weak(a), DistributionShard::Weak(b)) => a == b, + _ => false, + } + } +} + +impl Eq for DistributionShard {} + +/// A broadcastable shard of erasure coded data, including the coding commitment and +/// the configuration used to code the data. +pub struct Shard { + /// The coding commitment + commitment: CodingCommitment, + /// The index of this shard within the commitment. + index: usize, + /// An individual shard within the commitment. + inner: DistributionShard, + /// Phantom data for the hasher. + _hasher: std::marker::PhantomData, +} + +impl Shard { + pub fn new(commitment: CodingCommitment, index: usize, inner: DistributionShard) -> Self { + Self { + commitment, + index, + inner, + _hasher: std::marker::PhantomData, + } + } + + /// Returns the index of this shard within the commitment. + pub fn index(&self) -> usize { + self.index + } + + /// Takes the inner [Shard]. + pub fn into_inner(self) -> DistributionShard { + self.inner + } + + /// Verifies that this shard is valid for the given commitment and index. + /// + /// NOTE: If the inner shard is a weak shard, this will always return false, as weak shards + /// cannot be verified in isolation. + pub fn verify(&self) -> bool { + match &self.inner { + DistributionShard::Strong(shard) => S::reshard( + &self.commitment.config(), + &self.commitment.inner(), + self.index as u16, + shard.clone(), + ) + .is_ok(), + DistributionShard::Weak(_) => false, + } + } + + /// Returns the UUID of a shard with the given commitment and index. + pub fn uuid(commitment: CodingCommitment, index: usize) -> H::Digest { + let mut buf = vec![0u8; CodingCommitment::SIZE + u32::SIZE]; + buf[..commitment.encode_size()].copy_from_slice(&commitment); + buf[commitment.encode_size()..].copy_from_slice((index as u32).to_le_bytes().as_ref()); + H::hash(&buf) + } +} + +impl Clone for Shard { + fn clone(&self) -> Self { + Self { + commitment: self.commitment, + index: self.index, + inner: self.inner.clone(), + _hasher: std::marker::PhantomData, + } + } +} + +impl Deref for Shard { + type Target = DistributionShard; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl Committable for Shard { + type Commitment = CodingCommitment; + + fn commitment(&self) -> Self::Commitment { + self.commitment + } +} + +impl Digestible for Shard { + type Digest = H::Digest; + + fn digest(&self) -> Self::Digest { + Self::uuid(self.commitment, self.index) + } +} + +impl Write for Shard { + fn write(&self, buf: &mut impl bytes::BufMut) { + self.commitment.write(buf); + self.index.write(buf); + self.inner.write(buf); + } +} + +impl EncodeSize for Shard { + fn encode_size(&self) -> usize { + self.commitment.encode_size() + self.index.encode_size() + self.inner.encode_size() + } +} + +impl Read for Shard { + type Cfg = (::Cfg, ::Cfg); + + fn read_cfg( + buf: &mut impl bytes::Buf, + cfg: &Self::Cfg, + ) -> Result { + let commitment = CodingCommitment::read(buf)?; + let index = usize::read_cfg(buf, &RangeCfg::from(0..=usize::MAX))?; + let inner = DistributionShard::read_cfg(buf, cfg)?; + + Ok(Self { + commitment, + index, + inner, + _hasher: std::marker::PhantomData, + }) + } +} + +impl PartialEq for Shard { + fn eq(&self, other: &Self) -> bool { + self.commitment == other.commitment + && self.index == other.index + && self.inner == other.inner + } +} + +impl Eq for Shard {} + +/// An envelope type for an erasure coded [Block]. +#[derive(Debug)] +pub struct CodedBlock { + /// The inner block type. + inner: B, + /// The erasure coding configuration. + config: CodingConfig, + /// The erasure coding commitment. + commitment: S::Commitment, + /// The coded shards. + /// + /// These shards are optional to enable lazy construction. + shards: Option>, +} + +impl, S: Scheme> CodedBlock { + /// Erasure codes the block. + fn encode(inner: &B, config: CodingConfig) -> (S::Commitment, Vec) { + let mut buf = Vec::with_capacity(config.encode_size() + inner.encode_size()); + inner.write(&mut buf); + config.write(&mut buf); + + S::encode(&config, buf.as_slice()).expect("encoding a block should not fail") + } + + /// Create a new [CodedBlock] from a [Block] and a configuration. + pub fn new(inner: B, config: CodingConfig) -> Self { + let (commitment, shards) = Self::encode(&inner, config); + Self { + inner, + config, + commitment, + shards: Some(shards), + } + } + + /// Create a new [CodedBlock] from a [Block] and trusted [CodingCommitment]. + pub fn new_trusted(inner: B, commitment: CodingCommitment) -> Self { + Self { + inner, + config: commitment.config(), + commitment: commitment.inner(), + shards: None, + } + } + + /// Returns the coding configuration for the data committed. + pub fn config(&self) -> CodingConfig { + self.config + } + + /// Returns a refernce to the shards in this coded block. + pub fn shards(&mut self) -> &[S::Shard] { + match self.shards { + Some(ref shards) => shards, + None => { + let (commitment, shards) = Self::encode(&self.inner, self.config); + + assert_eq!(commitment, self.commitment); + + self.shards = Some(shards); + self.shards.as_ref().unwrap() + } + } + } + + /// Returns a [Shard] at the given index, if the index is valid. + pub fn shard(&self, index: usize) -> Option> { + Some(Shard::new( + self.commitment(), + index, + DistributionShard::Strong(self.shards.as_ref()?.get(index)?.clone()), + )) + } + + /// Returns a reference to the inner [Block]. + pub fn inner(&self) -> &B { + &self.inner + } + + /// Takes the inner [Block]. + pub fn into_inner(self) -> B { + self.inner + } +} + +impl Clone for CodedBlock { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + config: self.config, + commitment: self.commitment, + shards: self.shards.clone(), + } + } +} + +impl, S: Scheme> Committable for CodedBlock { + type Commitment = CodingCommitment; + + fn commitment(&self) -> Self::Commitment { + CodingCommitment::from((self.commitment, self.config)) + } +} + +impl Digestible for CodedBlock { + type Digest = B::Digest; + + fn digest(&self) -> Self::Digest { + self.inner.digest() + } +} + +impl Write for CodedBlock { + fn write(&self, buf: &mut impl bytes::BufMut) { + self.inner.write(buf); + self.config.write(buf); + } +} + +impl EncodeSize for CodedBlock { + fn encode_size(&self) -> usize { + self.inner.encode_size() + self.config.encode_size() + } +} + +impl Read for CodedBlock { + type Cfg = ::Cfg; + + fn read_cfg( + buf: &mut impl bytes::Buf, + cfg: &Self::Cfg, + ) -> Result { + let inner = B::read_cfg(buf, cfg)?; + let config = CodingConfig::read(buf)?; + + let mut buf = Vec::with_capacity(config.encode_size() + inner.encode_size()); + inner.write(&mut buf); + config.write(&mut buf); + let (commitment, shards) = S::encode(&config, buf.as_slice()).map_err(|_| { + commonware_codec::Error::Invalid("CodedBlock", "Failed to re-commit to block") + })?; + + Ok(Self { + inner, + config, + commitment, + shards: Some(shards), + }) + } +} + +impl, S: Scheme> Block for CodedBlock { + fn height(&self) -> u64 { + self.inner.height() + } + + fn parent(&self) -> Self::Commitment { + self.inner.parent() + } +} + +impl PartialEq for CodedBlock { + fn eq(&self, other: &Self) -> bool { + self.inner == other.inner + && self.config == other.config + && self.commitment == other.commitment + && self.shards == other.shards + } +} + +impl Eq for CodedBlock {} + +#[cfg(test)] +mod test { + use super::*; + use crate::marshal::mocks::block::Block as MockBlock; + use commonware_codec::{Decode, Encode}; + use commonware_coding::ReedSolomon; + use commonware_cryptography::Sha256; + + const MAX_SHARD_SIZE: usize = 1024 * 1024; // 1 MiB + + type H = Sha256; + type RS = ReedSolomon; + type RShard = Shard; + type Block = MockBlock<::Digest>; + + #[test] + fn test_distribution_shard_codec_roundtrip() { + const MOCK_BLOCK_DATA: &[u8] = b"commonware shape rotator club"; + const CONFIG: CodingConfig = CodingConfig { + minimum_shards: 1, + extra_shards: 2, + }; + + let (_, shards) = RS::encode(&CONFIG, MOCK_BLOCK_DATA).unwrap(); + let raw_shard = shards.first().cloned().unwrap(); + + let strong_shard = DistributionShard::::Strong(raw_shard.clone()); + let encoded = strong_shard.encode(); + let decoded = DistributionShard::::decode_cfg( + &mut encoded.as_ref(), + &(MAX_SHARD_SIZE, MAX_SHARD_SIZE), + ) + .unwrap(); + assert!(strong_shard == decoded); + + let weak_shard = DistributionShard::::Weak(raw_shard.clone()); + let encoded = weak_shard.encode(); + let decoded = DistributionShard::::decode_cfg( + &mut encoded.as_ref(), + &(MAX_SHARD_SIZE, MAX_SHARD_SIZE), + ) + .unwrap(); + assert!(weak_shard == decoded); + } + + #[test] + fn test_shard_codec_roundtrip() { + const MOCK_BLOCK_DATA: &[u8] = b"commonware supremacy"; + const CONFIG: CodingConfig = CodingConfig { + minimum_shards: 1, + extra_shards: 2, + }; + + let (commitment, shards) = RS::encode(&CONFIG, MOCK_BLOCK_DATA).unwrap(); + let raw_shard = shards.first().cloned().unwrap(); + + let commitment = CodingCommitment::from((commitment, CONFIG)); + let shard = RShard::new(commitment, 0, DistributionShard::Strong(raw_shard.clone())); + let encoded = shard.encode(); + let decoded = + RShard::decode_cfg(&mut encoded.as_ref(), &(MAX_SHARD_SIZE, MAX_SHARD_SIZE)).unwrap(); + assert!(shard == decoded); + + let shard = RShard::new(commitment, 0, DistributionShard::Weak(raw_shard)); + let encoded = shard.encode(); + let decoded = + RShard::decode_cfg(&mut encoded.as_ref(), &(MAX_SHARD_SIZE, MAX_SHARD_SIZE)).unwrap(); + assert!(shard == decoded); + } + + #[test] + fn test_coded_block_codec_roundtrip() { + const CONFIG: CodingConfig = CodingConfig { + minimum_shards: 1, + extra_shards: 2, + }; + + let parent = CodingCommitment::from((Sha256::hash(b"parent"), CONFIG)); + let block = Block::new::(parent, 42, 1_234_567); + let coded_block = CodedBlock::::new(block, CONFIG); + + let encoded = coded_block.encode(); + let decoded = CodedBlock::::decode_cfg(encoded.freeze(), &()).unwrap(); + + assert!(coded_block == decoded); + } +} diff --git a/consensus/src/marshal/finalizer.rs b/consensus/src/marshal/finalizer.rs index 08802ddc05..0a99efd568 100644 --- a/consensus/src/marshal/finalizer.rs +++ b/consensus/src/marshal/finalizer.rs @@ -1,4 +1,8 @@ -use crate::{marshal::ingress::orchestrator::Orchestrator, Block, Reporter}; +use crate::{ + marshal::ingress::orchestrator::Orchestrator, types::CodingCommitment, Block, Reporter, +}; +use commonware_coding::Scheme; +use commonware_cryptography::Committable; use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage}; use commonware_storage::metadata::{self, Metadata}; use commonware_utils::sequence::FixedBytes; @@ -13,14 +17,20 @@ const LATEST_KEY: FixedBytes<1> = FixedBytes::new([0u8]); /// /// Stores the highest height for which the application has processed. This allows resuming /// processing from the last processed height after a restart. -pub struct Finalizer> { +pub struct Finalizer +where + B: Block, + R: Spawner + Clock + Metrics + Storage, + S: Scheme, + Z: Reporter, +{ context: ContextCell, // Application that processes the finalized blocks. application: Z, // Orchestrator that stores the finalized blocks. - orchestrator: Orchestrator, + orchestrator: Orchestrator, // Notifier to indicate that the finalized blocks have been updated and should be re-queried. notifier_rx: mpsc::Receiver<()>, @@ -29,15 +39,19 @@ pub struct Finalizer, u64>, } -impl> - Finalizer +impl Finalizer +where + B: Block, + R: Spawner + Clock + Metrics + Storage, + S: Scheme, + Z: Reporter, { /// Initialize the finalizer. pub async fn new( context: R, partition_prefix: String, application: Z, - orchestrator: Orchestrator, + orchestrator: Orchestrator, notifier_rx: mpsc::Receiver<()>, ) -> Self { // Initialize metadata @@ -89,7 +103,7 @@ impl // height is processed by the application), it is possible that the application may // be asked to process a block it has already seen (which it can simply ignore). let commitment = block.commitment(); - self.application.report(block).await; + self.application.report(block.into_inner()).await; // Record that we have processed up through this height. latest = height; diff --git a/consensus/src/marshal/ingress/handler.rs b/consensus/src/marshal/ingress/handler.rs index 9daf03726f..3e89883ad5 100644 --- a/consensus/src/marshal/ingress/handler.rs +++ b/consensus/src/marshal/ingress/handler.rs @@ -240,7 +240,7 @@ impl Debug for Request { #[cfg(test)] mod tests { use super::*; - use crate::marshal::mocks::block::Block as TestBlock; + use crate::{marshal::mocks::block::Block as TestBlock, types::CodingCommitment}; use commonware_codec::{Encode, ReadExt}; use commonware_cryptography::{ sha256::{Digest as Sha256Digest, Sha256}, @@ -252,12 +252,12 @@ mod tests { #[test] fn test_subject_block_encoding() { - let commitment = Sha256::hash(b"test"); + let commitment = CodingCommitment::from((Sha256::hash(b"test"), Default::default())); let request = Request::::Block(commitment); // Test encoding let encoded = request.encode(); - assert_eq!(encoded.len(), 33); // 1 byte for enum variant + 32 bytes for commitment + assert_eq!(encoded.len(), 37); // 1 byte for enum variant + 32 bytes for commitment + 4 bytes for coding config assert_eq!(encoded[0], 0); // Block variant // Test decoding @@ -331,7 +331,7 @@ mod tests { #[test] fn test_encode_size() { - let commitment = Sha256::hash(&[0u8; 32]); + let commitment = CodingCommitment::default(); let r1 = Request::::Block(commitment); let r2 = Request::::Finalized { height: u64::MAX }; let r3 = Request::::Notarized { @@ -347,8 +347,8 @@ mod tests { #[test] fn test_request_ord_same_variant() { // Test ordering within the same variant - let commitment1 = Sha256::hash(b"test1"); - let commitment2 = Sha256::hash(b"test2"); + let commitment1 = CodingCommitment::from((Sha256::hash(b"test1"), Default::default())); + let commitment2 = CodingCommitment::from((Sha256::hash(b"test2"), Default::default())); let block1 = Request::::Block(commitment1); let block2 = Request::::Block(commitment2); @@ -388,7 +388,7 @@ mod tests { #[test] fn test_request_ord_cross_variant() { - let commitment = Sha256::hash(b"test"); + let commitment = CodingCommitment::from((Sha256::hash(b"test"), Default::default())); let block = Request::::Block(commitment); let finalized = Request::::Finalized { height: 100 }; let notarized = Request::::Notarized { @@ -415,8 +415,8 @@ mod tests { #[test] fn test_request_partial_ord() { - let commitment1 = Sha256::hash(b"test1"); - let commitment2 = Sha256::hash(b"test2"); + let commitment1 = CodingCommitment::from((Sha256::hash(b"test1"), Default::default())); + let commitment2 = CodingCommitment::from((Sha256::hash(b"test2"), Default::default())); let block1 = Request::::Block(commitment1); let block2 = Request::::Block(commitment2); let finalized = Request::::Finalized { height: 100 }; @@ -446,9 +446,9 @@ mod tests { #[test] fn test_request_ord_sorting() { - let commitment1 = Sha256::hash(b"a"); - let commitment2 = Sha256::hash(b"b"); - let commitment3 = Sha256::hash(b"c"); + let commitment1 = CodingCommitment::from((Sha256::hash(b"a"), Default::default())); + let commitment2 = CodingCommitment::from((Sha256::hash(b"b"), Default::default())); + let commitment3 = CodingCommitment::from((Sha256::hash(b"c"), Default::default())); let requests = vec![ Request::::Notarized { @@ -515,7 +515,7 @@ mod tests { assert!(max_finalized < min_notarized); // Test self-comparison - let commitment = Sha256::hash(b"self"); + let commitment = CodingCommitment::from((Sha256::hash(b"self"), Default::default())); let block = Request::::Block(commitment); assert_eq!(block.cmp(&block), std::cmp::Ordering::Equal); assert_eq!(min_finalized.cmp(&min_finalized), std::cmp::Ordering::Equal); diff --git a/consensus/src/marshal/ingress/mailbox.rs b/consensus/src/marshal/ingress/mailbox.rs index 347043a545..21b5d715fa 100644 --- a/consensus/src/marshal/ingress/mailbox.rs +++ b/consensus/src/marshal/ingress/mailbox.rs @@ -80,18 +80,6 @@ pub(crate) enum Message { /// A channel to send the retrieved block. response: oneshot::Sender, }, - /// A request to broadcast a block to all peers. - Broadcast { - /// The block to broadcast. - block: B, - }, - /// A notification that a block has been verified by the application. - Verified { - /// The round in which the block was verified. - round: Round, - /// The verified block. - block: B, - }, // -------------------- Consensus Engine Messages -------------------- /// A notarization from the consensus engine. @@ -171,7 +159,7 @@ impl Mailbox { } } - /// A request to retrieve a block by its commitment. + /// Subscribe is a request to retrieve a block by its commitment. /// /// If the block is found available locally, the block will be returned immediately. /// @@ -200,30 +188,6 @@ impl Mailbox { } rx } - - /// Broadcast indicates that a block should be sent to all peers. - pub async fn broadcast(&mut self, block: B) { - if self - .sender - .send(Message::Broadcast { block }) - .await - .is_err() - { - error!("failed to send broadcast message to actor: receiver dropped"); - } - } - - /// Notifies the actor that a block has been verified. - pub async fn verified(&mut self, round: Round, block: B) { - if self - .sender - .send(Message::Verified { round, block }) - .await - .is_err() - { - error!("failed to send verified message to actor: receiver dropped"); - } - } } impl Reporter for Mailbox { diff --git a/consensus/src/marshal/ingress/orchestrator.rs b/consensus/src/marshal/ingress/orchestrator.rs index efaa826668..e7a2b0ef57 100644 --- a/consensus/src/marshal/ingress/orchestrator.rs +++ b/consensus/src/marshal/ingress/orchestrator.rs @@ -1,4 +1,5 @@ -use crate::Block; +use crate::{marshal::coding::CodedBlock, Block}; +use commonware_coding::Scheme; use futures::{ channel::{mpsc, oneshot}, SinkExt, @@ -9,20 +10,20 @@ use tracing::error; /// /// We break this into a separate enum to establish a separate priority for /// finalizer messages over consensus messages. -pub enum Orchestration { +pub enum Orchestration { /// A request to get the next finalized block. Get { /// The height of the block to get. height: u64, /// A channel to send the block, if found. - result: oneshot::Sender>, + result: oneshot::Sender>>, }, /// A notification that a block has been processed by the application. Processed { /// The height of the processed block. height: u64, - /// The digest of the processed block. - digest: B::Commitment, + /// The commitment of the processed block. + commitment: B::Commitment, }, /// A request to repair a gap in the finalized block sequence. Repair { @@ -33,18 +34,18 @@ pub enum Orchestration { /// A handle for the finalizer to communicate with the main actor loop. #[derive(Clone)] -pub struct Orchestrator { - sender: mpsc::Sender>, +pub struct Orchestrator { + sender: mpsc::Sender>, } -impl Orchestrator { +impl Orchestrator { /// Creates a new orchestrator. - pub fn new(sender: mpsc::Sender>) -> Self { + pub fn new(sender: mpsc::Sender>) -> Self { Self { sender } } /// Gets the finalized block at the given height. - pub async fn get(&mut self, height: u64) -> Option { + pub async fn get(&mut self, height: u64) -> Option> { let (response, receiver) = oneshot::channel(); if self .sender @@ -62,10 +63,10 @@ impl Orchestrator { } /// Notifies the actor that a block has been processed. - pub async fn processed(&mut self, height: u64, digest: B::Commitment) { + pub async fn processed(&mut self, height: u64, commitment: B::Commitment) { if self .sender - .send(Orchestration::Processed { height, digest }) + .send(Orchestration::Processed { height, commitment }) .await .is_err() { diff --git a/consensus/src/marshal/mocks/block.rs b/consensus/src/marshal/mocks/block.rs index 7c43d8cd6e..8034a4fecd 100644 --- a/consensus/src/marshal/mocks/block.rs +++ b/consensus/src/marshal/mocks/block.rs @@ -1,3 +1,4 @@ +use crate::types::CodingCommitment; use bytes::{Buf, BufMut}; use commonware_codec::{varint::UInt, EncodeSize, Error, Read, ReadExt, Write}; use commonware_cryptography::{Committable, Digest, Digestible, Hasher}; @@ -5,7 +6,7 @@ use commonware_cryptography::{Committable, Digest, Digestible, Hasher}; #[derive(Clone, Debug, PartialEq, Eq)] pub struct Block { /// The parent block's digest. - pub parent: D, + pub parent: CodingCommitment, /// The height of the block in the blockchain. pub height: u64, @@ -18,7 +19,11 @@ pub struct Block { } impl Block { - fn compute_digest>(parent: &D, height: u64, timestamp: u64) -> D { + fn compute_digest>( + parent: &CodingCommitment, + height: u64, + timestamp: u64, + ) -> D { let mut hasher = H::new(); hasher.update(parent); hasher.update(&height.to_be_bytes()); @@ -26,7 +31,11 @@ impl Block { hasher.finalize() } - pub fn new>(parent: D, height: u64, timestamp: u64) -> Self { + pub fn new>( + parent: CodingCommitment, + height: u64, + timestamp: u64, + ) -> Self { let digest = Self::compute_digest::(&parent, height, timestamp); Self { parent, @@ -50,7 +59,7 @@ impl Read for Block { type Cfg = (); fn read_cfg(reader: &mut impl Buf, _: &Self::Cfg) -> Result { - let parent = D::read(reader)?; + let parent = CodingCommitment::read(reader)?; let height = UInt::read(reader)?.into(); let timestamp = UInt::read(reader)?.into(); let digest = D::read(reader)?; @@ -77,16 +86,16 @@ impl EncodeSize for Block { impl Digestible for Block { type Digest = D; - fn digest(&self) -> D { + fn digest(&self) -> Self::Digest { self.digest } } impl Committable for Block { - type Commitment = D; + type Commitment = CodingCommitment; - fn commitment(&self) -> D { - self.digest + fn commitment(&self) -> Self::Commitment { + self.parent } } diff --git a/consensus/src/marshal/mod.rs b/consensus/src/marshal/mod.rs index 850d6db891..bf24114d96 100644 --- a/consensus/src/marshal/mod.rs +++ b/consensus/src/marshal/mod.rs @@ -12,7 +12,6 @@ //! The actor interacts with four main components: //! - [crate::Reporter]: Receives ordered, finalized blocks at-least-once //! - [crate::threshold_simplex]: Provides consensus messages -//! - Application: Provides verified blocks //! - [commonware_broadcast::buffered]: Provides uncertified blocks received from the network //! - [commonware_resolver::Resolver]: Provides a backfill mechanism for missing blocks //! @@ -62,6 +61,7 @@ pub mod finalizer; pub use finalizer::Finalizer; pub mod ingress; pub use ingress::mailbox::Mailbox; +pub mod coding; pub mod resolver; #[cfg(test)] @@ -69,890 +69,946 @@ pub mod mocks; #[cfg(test)] mod tests { - use super::{ - actor, - config::Config, - mocks::{application::Application, block::Block}, - resolver::p2p as resolver, - }; - use crate::{ - marshal::ingress::mailbox::Identifier, - threshold_simplex::types::{ - finalize_namespace, notarize_namespace, seed_namespace, Activity, Finalization, - Notarization, Proposal, - }, - types::Round, - Block as _, Reporter, - }; - use commonware_broadcast::buffered; - use commonware_codec::Encode; - use commonware_cryptography::{ - bls12381::{ - dkg::ops::generate_shares, - primitives::{ - group::Share, - ops::{partial_sign_message, threshold_signature_recover}, - poly, - variant::{MinPk, Variant}, - }, - }, - ed25519::{PrivateKey, PublicKey}, - sha256::{Digest as Sha256Digest, Sha256}, - Digestible, Hasher as _, PrivateKeyExt as _, Signer as _, - }; - use commonware_macros::test_traced; - use commonware_p2p::{ - simulated::{self, Link, Network, Oracle}, - utils::requester, - }; - use commonware_resolver::p2p; - use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner}; - use commonware_utils::{NZUsize, NZU64}; - use governor::Quota; - use rand::{seq::SliceRandom, Rng}; - use std::{ - collections::BTreeMap, - num::{NonZeroU32, NonZeroUsize}, - time::Duration, - }; - - type D = Sha256Digest; - type B = Block; - type P = PublicKey; - type V = MinPk; - type Sh = Share; - type E = PrivateKey; - - const PAGE_SIZE: NonZeroUsize = NZUsize!(1024); - const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10); - const NAMESPACE: &[u8] = b"test"; - const NUM_VALIDATORS: u32 = 4; - const QUORUM: u32 = 3; - const NUM_BLOCKS: u64 = 160; - const BLOCKS_PER_EPOCH: u64 = 20; - const LINK: Link = Link { - latency: Duration::from_millis(10), - jitter: Duration::from_millis(1), - success_rate: 1.0, - }; - const UNRELIABLE_LINK: Link = Link { - latency: Duration::from_millis(200), - jitter: Duration::from_millis(50), - success_rate: 0.7, - }; - - async fn setup_validator( - context: deterministic::Context, - oracle: &mut Oracle

, - coordinator: p2p::mocks::Coordinator

, - secret: E, - identity: ::Public, - ) -> ( - Application, - crate::marshal::ingress::mailbox::Mailbox, - ) { - let config = Config { - identity, - mailbox_size: 100, - namespace: NAMESPACE.to_vec(), - view_retention_timeout: 10, - max_repair: 10, - codec_config: (), - partition_prefix: format!("validator-{}", secret.public_key()), - prunable_items_per_section: NZU64!(10), - replay_buffer: NZUsize!(1024), - write_buffer: NZUsize!(1024), - freezer_table_initial_size: 64, - freezer_table_resize_frequency: 10, - freezer_table_resize_chunk_size: 10, - freezer_journal_target_size: 1024, - freezer_journal_compression: None, - freezer_journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE), - immutable_items_per_section: NZU64!(10), - }; - - // Create the resolver - let backfill = oracle.register(secret.public_key(), 1).await.unwrap(); - let resolver_cfg = resolver::Config { - public_key: secret.public_key(), - coordinator, - mailbox_size: config.mailbox_size, - requester_config: requester::Config { - public_key: secret.public_key(), - rate_limit: Quota::per_second(NonZeroU32::new(5).unwrap()), - initial: Duration::from_secs(1), - timeout: Duration::from_secs(2), - }, - fetch_retry_timeout: Duration::from_millis(100), - priority_requests: false, - priority_responses: false, - }; - let resolver = resolver::init(&context, resolver_cfg, backfill); - - // Create a buffered broadcast engine and get its mailbox - let broadcast_config = buffered::Config { - public_key: secret.public_key(), - mailbox_size: config.mailbox_size, - deque_size: 10, - priority: false, - codec_config: (), - }; - let (broadcast_engine, buffer) = buffered::Engine::new(context.clone(), broadcast_config); - let network = oracle.register(secret.public_key(), 2).await.unwrap(); - broadcast_engine.start(network); - - let (actor, mailbox) = actor::Actor::init(context.clone(), config).await; - let application = Application::::default(); - - // Start the application - actor.start(application.clone(), buffer, resolver); - - (application, mailbox) - } - - fn make_finalization(proposal: Proposal, shares: &[Sh], quorum: u32) -> Finalization { - let proposal_msg = proposal.encode(); - - // Generate proposal signature - let proposal_partials: Vec<_> = shares - .iter() - .take(quorum as usize) - .map(|s| { - partial_sign_message::(s, Some(&finalize_namespace(NAMESPACE)), &proposal_msg) - }) - .collect(); - let proposal_signature = - threshold_signature_recover::(quorum, &proposal_partials).unwrap(); - - // Generate seed signature (for the view number) - let seed_msg = proposal.round.encode(); - let seed_partials: Vec<_> = shares - .iter() - .take(quorum as usize) - .map(|s| partial_sign_message::(s, Some(&seed_namespace(NAMESPACE)), &seed_msg)) - .collect(); - let seed_signature = threshold_signature_recover::(quorum, &seed_partials).unwrap(); - - Finalization { - proposal, - proposal_signature, - seed_signature, - } - } - - fn make_notarization(proposal: Proposal, shares: &[Sh], quorum: u32) -> Notarization { - let proposal_msg = proposal.encode(); - - // Generate proposal signature - let proposal_partials: Vec<_> = shares - .iter() - .take(quorum as usize) - .map(|s| { - partial_sign_message::(s, Some(¬arize_namespace(NAMESPACE)), &proposal_msg) - }) - .collect(); - let proposal_signature = - threshold_signature_recover::(quorum, &proposal_partials).unwrap(); - - // Generate seed signature (for the view number) - let seed_msg = proposal.round.encode(); - let seed_partials: Vec<_> = shares - .iter() - .take(quorum as usize) - .map(|s| partial_sign_message::(s, Some(&seed_namespace(NAMESPACE)), &seed_msg)) - .collect(); - let seed_signature = threshold_signature_recover::(quorum, &seed_partials).unwrap(); - - Notarization { - proposal, - proposal_signature, - seed_signature, - } - } - - fn setup_network(context: deterministic::Context) -> Oracle

{ - let (network, oracle) = Network::new( - context.with_label("network"), - simulated::Config { - max_size: 1024 * 1024, - disconnect_on_block: true, - }, - ); - network.start(); - oracle - } - - fn setup_validators_and_shares( - context: &mut deterministic::Context, - ) -> (Vec, Vec

, ::Public, Vec) { - let mut schemes = (0..NUM_VALIDATORS) - .map(|i| PrivateKey::from_seed(i as u64)) - .collect::>(); - schemes.sort_by_key(|s| s.public_key()); - let peers: Vec = schemes.iter().map(|s| s.public_key()).collect(); - - let (identity, shares) = generate_shares::<_, V>(context, None, NUM_VALIDATORS, QUORUM); - let identity = *poly::public::(&identity); - - (schemes, peers, identity, shares) - } - - async fn setup_network_links(oracle: &mut Oracle

, peers: &[P], link: Link) { - for p1 in peers.iter() { - for p2 in peers.iter() { - if p2 == p1 { - continue; - } - oracle - .add_link(p1.clone(), p2.clone(), link.clone()) - .await - .unwrap(); - } - } - } - - #[test_traced("WARN")] - fn test_finalize_good_links() { - for seed in 0..5 { - let result1 = finalize(seed, LINK); - let result2 = finalize(seed, LINK); - - // Ensure determinism - assert_eq!(result1, result2); - } - } - - #[test_traced("WARN")] - fn test_finalize_bad_links() { - for seed in 0..5 { - let result1 = finalize(seed, UNRELIABLE_LINK); - let result2 = finalize(seed, UNRELIABLE_LINK); - - // Ensure determinism - assert_eq!(result1, result2); - } - } - - fn finalize(seed: u64, link: Link) -> String { - let runner = deterministic::Runner::new( - deterministic::Config::new() - .with_seed(seed) - .with_timeout(Some(Duration::from_secs(300))), - ); - runner.start(|mut context| async move { - let mut oracle = setup_network(context.clone()); - let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context); - - // Initialize applications and actors - let mut applications = BTreeMap::new(); - let mut actors = Vec::new(); - - for (i, secret) in schemes.iter().enumerate() { - let (application, actor) = setup_validator( - context.with_label(&format!("validator-{i}")), - &mut oracle, - p2p::mocks::Coordinator::new(peers.clone()), - secret.clone(), - identity, - ) - .await; - applications.insert(peers[i].clone(), application); - actors.push(actor); - } - - // Add links between all peers - setup_network_links(&mut oracle, &peers, link.clone()).await; - - // Generate blocks, skipping the genesis block. - let mut blocks = Vec::::new(); - let mut parent = Sha256::hash(b""); - for i in 1..=NUM_BLOCKS { - let block = B::new::(parent, i, i); - parent = block.digest(); - blocks.push(block); - } - - // Broadcast and finalize blocks in random order - blocks.shuffle(&mut context); - for block in blocks.iter() { - // Skip genesis block - let height = block.height(); - assert!(height > 0, "genesis block should not have been generated"); - - // Calculate the epoch and round for the block - let epoch = height / BLOCKS_PER_EPOCH; - let round = Round::new(epoch, height); - - // Broadcast block by one validator - let actor_index: usize = (height % (NUM_VALIDATORS as u64)) as usize; - let mut actor = actors[actor_index].clone(); - actor.broadcast(block.clone()).await; - actor.verified(round, block.clone()).await; - - // Wait for the block to be broadcast, but due to jitter, we may or may not receive - // the block before continuing. - context.sleep(link.latency).await; - - // Notarize block by the validator that broadcasted it - let proposal = Proposal { - round, - parent: height.checked_sub(1).unwrap(), - payload: block.digest(), - }; - let notarization = make_notarization(proposal.clone(), &shares, QUORUM); - actor - .report(Activity::Notarization(notarization.clone())) - .await; - - // Finalize block by all validators - let fin = make_finalization(proposal, &shares, QUORUM); - for actor in actors.iter_mut() { - // Always finalize 1) the last block in each epoch 2) the last block in the chain. - // Otherwise, finalize randomly. - if height == NUM_BLOCKS - || height % BLOCKS_PER_EPOCH == 0 - || context.gen_bool(0.2) - // 20% chance to finalize randomly - { - actor.report(Activity::Finalization(fin.clone())).await; - } - } - } - - // Check that all applications received all blocks. - let mut finished = false; - while !finished { - // Avoid a busy loop - context.sleep(Duration::from_secs(1)).await; - - // If not all validators have finished, try again - if applications.len() != NUM_VALIDATORS as usize { - continue; - } - finished = true; - for app in applications.values() { - if app.blocks().len() != NUM_BLOCKS as usize { - finished = false; - break; - } - } - } - - // Return state - context.auditor().state() - }) - } - - #[test_traced("WARN")] - fn test_subscribe_basic_block_delivery() { - let runner = deterministic::Runner::timed(Duration::from_secs(60)); - runner.start(|mut context| async move { - let mut oracle = setup_network(context.clone()); - let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context); - - let mut actors = Vec::new(); - for (i, secret) in schemes.iter().enumerate() { - let (_application, actor) = setup_validator( - context.with_label(&format!("validator-{i}")), - &mut oracle, - p2p::mocks::Coordinator::new(vec![]), - secret.clone(), - identity, - ) - .await; - actors.push(actor); - } - let mut actor = actors[0].clone(); - - setup_network_links(&mut oracle, &peers, LINK).await; - - let parent = Sha256::hash(b""); - let block = B::new::(parent, 1, 1); - let commitment = block.digest(); - - let subscription_rx = actor.subscribe(Some(Round::from((0, 1))), commitment).await; - - actor.verified(Round::from((0, 1)), block.clone()).await; - - let proposal = Proposal { - round: Round::new(0, 1), - parent: 0, - payload: commitment, - }; - let notarization = make_notarization(proposal.clone(), &shares, QUORUM); - actor.report(Activity::Notarization(notarization)).await; - - let finalization = make_finalization(proposal, &shares, QUORUM); - actor.report(Activity::Finalization(finalization)).await; - - let received_block = subscription_rx.await.unwrap(); - assert_eq!(received_block.digest(), block.digest()); - assert_eq!(received_block.height(), 1); - }) - } - - #[test_traced("WARN")] - fn test_subscribe_multiple_subscriptions() { - let runner = deterministic::Runner::timed(Duration::from_secs(60)); - runner.start(|mut context| async move { - let mut oracle = setup_network(context.clone()); - let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context); - - let mut actors = Vec::new(); - for (i, secret) in schemes.iter().enumerate() { - let (_application, actor) = setup_validator( - context.with_label(&format!("validator-{i}")), - &mut oracle, - p2p::mocks::Coordinator::new(peers.clone()), - secret.clone(), - identity, - ) - .await; - actors.push(actor); - } - let mut actor = actors[0].clone(); - - setup_network_links(&mut oracle, &peers, LINK).await; - - let parent = Sha256::hash(b""); - let block1 = B::new::(parent, 1, 1); - let block2 = B::new::(block1.digest(), 2, 2); - let commitment1 = block1.digest(); - let commitment2 = block2.digest(); - - let sub1_rx = actor - .subscribe(Some(Round::from((0, 1))), commitment1) - .await; - let sub2_rx = actor - .subscribe(Some(Round::from((0, 2))), commitment2) - .await; - let sub3_rx = actor - .subscribe(Some(Round::from((0, 1))), commitment1) - .await; - - actor.verified(Round::from((0, 1)), block1.clone()).await; - actor.verified(Round::from((0, 2)), block2.clone()).await; - - for (view, block) in [(1, block1.clone()), (2, block2.clone())] { - let proposal = Proposal { - round: Round::new(0, view), - parent: view.checked_sub(1).unwrap(), - payload: block.digest(), - }; - let notarization = make_notarization(proposal.clone(), &shares, QUORUM); - actor.report(Activity::Notarization(notarization)).await; - - let finalization = make_finalization(proposal, &shares, QUORUM); - actor.report(Activity::Finalization(finalization)).await; - } - - let received1_sub1 = sub1_rx.await.unwrap(); - let received2 = sub2_rx.await.unwrap(); - let received1_sub3 = sub3_rx.await.unwrap(); - - assert_eq!(received1_sub1.digest(), block1.digest()); - assert_eq!(received2.digest(), block2.digest()); - assert_eq!(received1_sub3.digest(), block1.digest()); - assert_eq!(received1_sub1.height(), 1); - assert_eq!(received2.height(), 2); - assert_eq!(received1_sub3.height(), 1); - }) - } - - #[test_traced("WARN")] - fn test_subscribe_canceled_subscriptions() { - let runner = deterministic::Runner::timed(Duration::from_secs(60)); - runner.start(|mut context| async move { - let mut oracle = setup_network(context.clone()); - let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context); - - let mut actors = Vec::new(); - for (i, secret) in schemes.iter().enumerate() { - let (_application, actor) = setup_validator( - context.with_label(&format!("validator-{i}")), - &mut oracle, - p2p::mocks::Coordinator::new(peers.clone()), - secret.clone(), - identity, - ) - .await; - actors.push(actor); - } - let mut actor = actors[0].clone(); - - setup_network_links(&mut oracle, &peers, LINK).await; - - let parent = Sha256::hash(b""); - let block1 = B::new::(parent, 1, 1); - let block2 = B::new::(block1.digest(), 2, 2); - let commitment1 = block1.digest(); - let commitment2 = block2.digest(); - - let sub1_rx = actor - .subscribe(Some(Round::from((0, 1))), commitment1) - .await; - let sub2_rx = actor - .subscribe(Some(Round::from((0, 2))), commitment2) - .await; - - drop(sub1_rx); - - actor.verified(Round::from((0, 1)), block1.clone()).await; - actor.verified(Round::from((0, 2)), block2.clone()).await; - - for (view, block) in [(1, block1.clone()), (2, block2.clone())] { - let proposal = Proposal { - round: Round::new(0, view), - parent: view.checked_sub(1).unwrap(), - payload: block.digest(), - }; - let notarization = make_notarization(proposal.clone(), &shares, QUORUM); - actor.report(Activity::Notarization(notarization)).await; - - let finalization = make_finalization(proposal, &shares, QUORUM); - actor.report(Activity::Finalization(finalization)).await; - } - - let received2 = sub2_rx.await.unwrap(); - assert_eq!(received2.digest(), block2.digest()); - assert_eq!(received2.height(), 2); - }) - } - - #[test_traced("WARN")] - fn test_subscribe_blocks_from_different_sources() { - let runner = deterministic::Runner::timed(Duration::from_secs(60)); - runner.start(|mut context| async move { - let mut oracle = setup_network(context.clone()); - let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context); - - let mut actors = Vec::new(); - for (i, secret) in schemes.iter().enumerate() { - let (_application, actor) = setup_validator( - context.with_label(&format!("validator-{i}")), - &mut oracle, - p2p::mocks::Coordinator::new(peers.clone()), - secret.clone(), - identity, - ) - .await; - actors.push(actor); - } - let mut actor = actors[0].clone(); - - setup_network_links(&mut oracle, &peers, LINK).await; - - let parent = Sha256::hash(b""); - let block1 = B::new::(parent, 1, 1); - let block2 = B::new::(block1.digest(), 2, 2); - let block3 = B::new::(block2.digest(), 3, 3); - let block4 = B::new::(block3.digest(), 4, 4); - let block5 = B::new::(block4.digest(), 5, 5); - - let sub1_rx = actor.subscribe(None, block1.digest()).await; - let sub2_rx = actor.subscribe(None, block2.digest()).await; - let sub3_rx = actor.subscribe(None, block3.digest()).await; - let sub4_rx = actor.subscribe(None, block4.digest()).await; - let sub5_rx = actor.subscribe(None, block5.digest()).await; - - // Block1: Broadcasted by the actor - actor.broadcast(block1.clone()).await; - context.sleep(Duration::from_millis(20)).await; - - // Block1: delivered - let received1 = sub1_rx.await.unwrap(); - assert_eq!(received1.digest(), block1.digest()); - assert_eq!(received1.height(), 1); - - // Block2: Verified by the actor - actor.verified(Round::from((0, 2)), block2.clone()).await; - - // Block2: delivered - let received2 = sub2_rx.await.unwrap(); - assert_eq!(received2.digest(), block2.digest()); - assert_eq!(received2.height(), 2); - - // Block3: Notarized by the actor - let proposal3 = Proposal { - round: Round::new(0, 3), - parent: 2, - payload: block3.digest(), - }; - let notarization3 = make_notarization(proposal3.clone(), &shares, QUORUM); - actor.report(Activity::Notarization(notarization3)).await; - actor.verified(Round::from((0, 3)), block3.clone()).await; - - // Block3: delivered - let received3 = sub3_rx.await.unwrap(); - assert_eq!(received3.digest(), block3.digest()); - assert_eq!(received3.height(), 3); - - // Block4: Finalized by the actor - let finalization4 = make_finalization( - Proposal { - round: Round::new(0, 4), - parent: 3, - payload: block4.digest(), - }, - &shares, - QUORUM, - ); - actor.report(Activity::Finalization(finalization4)).await; - actor.verified(Round::from((0, 4)), block4.clone()).await; - - // Block4: delivered - let received4 = sub4_rx.await.unwrap(); - assert_eq!(received4.digest(), block4.digest()); - assert_eq!(received4.height(), 4); - - // Block5: Broadcasted by a remote node (different actor) - let remote_actor = &mut actors[1].clone(); - remote_actor.broadcast(block5.clone()).await; - context.sleep(Duration::from_millis(20)).await; - - // Block5: delivered - let received5 = sub5_rx.await.unwrap(); - assert_eq!(received5.digest(), block5.digest()); - assert_eq!(received5.height(), 5); - }) - } - - #[test_traced("WARN")] - fn test_get_info_basic_queries_present_and_missing() { - let runner = deterministic::Runner::timed(Duration::from_secs(60)); - runner.start(|mut context| async move { - let mut oracle = setup_network(context.clone()); - let (schemes, _peers, identity, shares) = setup_validators_and_shares(&mut context); - - // Single validator actor - let secret = schemes[0].clone(); - let (_application, mut actor) = setup_validator( - context.with_label("validator-0"), - &mut oracle, - p2p::mocks::Coordinator::new(vec![]), - secret, - identity, - ) - .await; - - // Initially, no latest - assert!(actor.get_info(Identifier::Latest).await.is_none()); - - // Before finalization, specific height returns None - assert!(actor.get_info(1).await.is_none()); - - // Create and verify a block, then finalize it - let parent = Sha256::hash(b""); - let block = B::new::(parent, 1, 1); - let digest = block.digest(); - let round = Round::new(0, 1); - actor.verified(round, block.clone()).await; - - let proposal = Proposal { - round, - parent: 0, - payload: digest, - }; - let finalization = make_finalization(proposal, &shares, QUORUM); - actor.report(Activity::Finalization(finalization)).await; - - // Latest should now be the finalized block - assert_eq!(actor.get_info(Identifier::Latest).await, Some((1, digest))); - - // Height 1 now present - assert_eq!(actor.get_info(1).await, Some((1, digest))); - - // Commitment should map to its height - assert_eq!(actor.get_info(&digest).await, Some((1, digest))); - - // Missing height - assert!(actor.get_info(2).await.is_none()); - - // Missing commitment - let missing = Sha256::hash(b"missing"); - assert!(actor.get_info(&missing).await.is_none()); - }) - } - - #[test_traced("WARN")] - fn test_get_info_latest_progression_multiple_finalizations() { - let runner = deterministic::Runner::timed(Duration::from_secs(60)); - runner.start(|mut context| async move { - let mut oracle = setup_network(context.clone()); - let (schemes, _peers, identity, shares) = setup_validators_and_shares(&mut context); - - // Single validator actor - let secret = schemes[0].clone(); - let (_application, mut actor) = setup_validator( - context.with_label("validator-0"), - &mut oracle, - p2p::mocks::Coordinator::new(vec![]), - secret, - identity, - ) - .await; - - // Initially none - assert!(actor.get_info(Identifier::Latest).await.is_none()); - - // Build and finalize heights 1..=3 - let parent0 = Sha256::hash(b""); - let block1 = B::new::(parent0, 1, 1); - let d1 = block1.digest(); - actor.verified(Round::new(0, 1), block1.clone()).await; - let f1 = make_finalization( - Proposal { - round: Round::new(0, 1), - parent: 0, - payload: d1, - }, - &shares, - QUORUM, - ); - actor.report(Activity::Finalization(f1)).await; - let latest = actor.get_info(Identifier::Latest).await; - assert_eq!(latest, Some((1, d1))); - - let block2 = B::new::(d1, 2, 2); - let d2 = block2.digest(); - actor.verified(Round::new(0, 2), block2.clone()).await; - let f2 = make_finalization( - Proposal { - round: Round::new(0, 2), - parent: 1, - payload: d2, - }, - &shares, - QUORUM, - ); - actor.report(Activity::Finalization(f2)).await; - let latest = actor.get_info(Identifier::Latest).await; - assert_eq!(latest, Some((2, d2))); - - let block3 = B::new::(d2, 3, 3); - let d3 = block3.digest(); - actor.verified(Round::new(0, 3), block3.clone()).await; - let f3 = make_finalization( - Proposal { - round: Round::new(0, 3), - parent: 2, - payload: d3, - }, - &shares, - QUORUM, - ); - actor.report(Activity::Finalization(f3)).await; - let latest = actor.get_info(Identifier::Latest).await; - assert_eq!(latest, Some((3, d3))); - }) - } - - #[test_traced("WARN")] - fn test_get_block_by_height_and_latest() { - let runner = deterministic::Runner::timed(Duration::from_secs(60)); - runner.start(|mut context| async move { - let mut oracle = setup_network(context.clone()); - let (schemes, _peers, identity, shares) = setup_validators_and_shares(&mut context); - - let secret = schemes[0].clone(); - let (_application, mut actor) = setup_validator( - context.with_label("validator-0"), - &mut oracle, - p2p::mocks::Coordinator::new(vec![]), - secret, - identity, - ) - .await; - - // Before any finalization, GetBlock::Latest should be None - let latest_block = actor.get_block(Identifier::Latest).await; - assert!(latest_block.is_none()); - - // Finalize a block at height 1 - let parent = Sha256::hash(b""); - let block = B::new::(parent, 1, 1); - let commitment = block.digest(); - let round = Round::new(0, 1); - actor.verified(round, block.clone()).await; - let proposal = Proposal { - round, - parent: 0, - payload: commitment, - }; - let finalization = make_finalization(proposal, &shares, QUORUM); - actor.report(Activity::Finalization(finalization)).await; - - // Get by height - let by_height = actor.get_block(1).await.expect("missing block by height"); - assert_eq!(by_height.height(), 1); - assert_eq!(by_height.digest(), commitment); - - // Get by latest - let by_latest = actor - .get_block(Identifier::Latest) - .await - .expect("missing block by latest"); - assert_eq!(by_latest.height(), 1); - assert_eq!(by_latest.digest(), commitment); - - // Missing height - let by_height = actor.get_block(2).await; - assert!(by_height.is_none()); - }) - } - - #[test_traced("WARN")] - fn test_get_block_by_commitment_from_sources_and_missing() { - let runner = deterministic::Runner::timed(Duration::from_secs(60)); - runner.start(|mut context| async move { - let mut oracle = setup_network(context.clone()); - let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context); - - let secret = schemes[0].clone(); - let (_application, mut actor) = setup_validator( - context.with_label("validator-0"), - &mut oracle, - p2p::mocks::Coordinator::new(peers), - secret, - identity, - ) - .await; - - // 1) From cache via verified - let parent = Sha256::hash(b""); - let ver_block = B::new::(parent, 1, 1); - let ver_commitment = ver_block.digest(); - let round1 = Round::new(0, 1); - actor.verified(round1, ver_block.clone()).await; - let got = actor - .get_block(&ver_commitment) - .await - .expect("missing block from cache"); - assert_eq!(got.digest(), ver_commitment); - - // 2) From finalized archive - let fin_block = B::new::(ver_commitment, 2, 2); - let fin_commitment = fin_block.digest(); - let round2 = Round::new(0, 2); - actor.verified(round2, fin_block.clone()).await; - let proposal = Proposal { - round: round2, - parent: 1, - payload: fin_commitment, - }; - let finalization = make_finalization(proposal, &shares, QUORUM); - actor.report(Activity::Finalization(finalization)).await; - let got = actor - .get_block(&fin_commitment) - .await - .expect("missing block from finalized archive"); - assert_eq!(got.digest(), fin_commitment); - assert_eq!(got.height(), 2); - - // 3) Missing commitment - let missing = Sha256::hash(b"definitely-missing"); - let missing_block = actor.get_block(&missing).await; - assert!(missing_block.is_none()); - }) - } + // use super::{ + // actor, + // config::Config, + // mocks::{application::Application, block::Block}, + // resolver::p2p as resolver, + // }; + // use crate::{ + // marshal::ingress::{ + // coding::{mailbox::ShardMailbox, types::CodedBlock}, + // mailbox::Identifier, + // }, + // threshold_simplex::types::{ + // finalize_namespace, notarize_namespace, seed_namespace, Activity, Finalization, + // Notarization, Proposal, + // }, + // types::{CodingCommitment, Round}, + // Block as _, Reporter, + // }; + // use commonware_broadcast::buffered; + // use commonware_codec::Encode; + // use commonware_coding::ReedSolomon; + // use commonware_cryptography::{ + // bls12381::{ + // dkg::ops::generate_shares, + // primitives::{ + // group::Share, + // ops::{partial_sign_message, threshold_signature_recover}, + // poly, + // variant::{MinPk, Variant}, + // }, + // }, + // ed25519::{PrivateKey, PublicKey}, + // sha256::Sha256, + // Committable, Digestible, Hasher, PrivateKeyExt as _, Signer as _, + // }; + // use commonware_macros::test_traced; + // use commonware_p2p::{ + // simulated::{self, Link, Network, Oracle}, + // utils::requester, + // }; + // use commonware_resolver::p2p; + // use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner}; + // use commonware_utils::{NZUsize, NZU64}; + // use governor::Quota; + // use rand::{seq::SliceRandom, Rng}; + // use std::{ + // collections::BTreeMap, + // num::{NonZeroU32, NonZeroUsize}, + // time::Duration, + // }; + // + // type H = Sha256; + // type D = CodingCommitment; + // type S = ReedSolomon; + // type B = Block<::Digest>; + // type P = PublicKey; + // type V = MinPk; + // type Sh = Share; + // type E = PrivateKey; + // + // const PAGE_SIZE: NonZeroUsize = NZUsize!(1024); + // const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10); + // const NAMESPACE: &[u8] = b"test"; + // const NUM_VALIDATORS: u32 = 4; + // const QUORUM: u32 = 3; + // const NUM_BLOCKS: u64 = 160; + // const BLOCKS_PER_EPOCH: u64 = 20; + // const LINK: Link = Link { + // latency: Duration::from_millis(10), + // jitter: Duration::from_millis(1), + // success_rate: 1.0, + // }; + // const UNRELIABLE_LINK: Link = Link { + // latency: Duration::from_millis(200), + // jitter: Duration::from_millis(50), + // success_rate: 0.7, + // }; + // + // async fn setup_validator( + // context: deterministic::Context, + // oracle: &mut Oracle

, + // coordinator: p2p::mocks::Coordinator

, + // secret: E, + // identity: ::Public, + // ) -> ( + // Application, + // crate::marshal::ingress::mailbox::Mailbox, + // ) { + // let config = Config { + // identity, + // mailbox_size: 100, + // namespace: NAMESPACE.to_vec(), + // view_retention_timeout: 10, + // max_repair: 10, + // codec_config: (), + // partition_prefix: format!("validator-{}", secret.public_key()), + // prunable_items_per_section: NZU64!(10), + // replay_buffer: NZUsize!(1024), + // write_buffer: NZUsize!(1024), + // freezer_table_initial_size: 64, + // freezer_table_resize_frequency: 10, + // freezer_table_resize_chunk_size: 10, + // freezer_journal_target_size: 1024, + // freezer_journal_compression: None, + // freezer_journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE), + // immutable_items_per_section: NZU64!(10), + // }; + // + // // Create the resolver + // let backfill = oracle.register(secret.public_key(), 1).await.unwrap(); + // let resolver_cfg = resolver::Config { + // public_key: secret.public_key(), + // coordinator, + // mailbox_size: config.mailbox_size, + // requester_config: requester::Config { + // public_key: secret.public_key(), + // rate_limit: Quota::per_second(NonZeroU32::new(5).unwrap()), + // initial: Duration::from_secs(1), + // timeout: Duration::from_secs(2), + // }, + // fetch_retry_timeout: Duration::from_millis(100), + // priority_requests: false, + // priority_responses: false, + // }; + // let resolver = resolver::init(&context, resolver_cfg, backfill); + // + // // Create a buffered broadcast engine and get its mailbox + // let broadcast_config = buffered::Config { + // public_key: secret.public_key(), + // mailbox_size: config.mailbox_size, + // deque_size: 10, + // priority: false, + // codec_config: (usize::MAX, usize::MAX), + // }; + // let (broadcast_engine, buffer) = buffered::Engine::new(context.clone(), broadcast_config); + // let network = oracle.register(secret.public_key(), 2).await.unwrap(); + // broadcast_engine.start(network); + // + // let shard_mailbox = + // ShardMailbox::<_, H, _, _>::new(context.with_label("shard_mailbox"), buffer, ()); + // + // let (actor, mailbox) = actor::Actor::init(context.clone(), config).await; + // let application = Application::::default(); + // + // // Start the application + // actor.start(application.clone(), shard_mailbox, resolver); + // + // (application, mailbox) + // } + // + // fn make_finalization(proposal: Proposal, shares: &[Sh], quorum: u32) -> Finalization { + // let proposal_msg = proposal.encode(); + // + // // Generate proposal signature + // let proposal_partials: Vec<_> = shares + // .iter() + // .take(quorum as usize) + // .map(|s| { + // partial_sign_message::(s, Some(&finalize_namespace(NAMESPACE)), &proposal_msg) + // }) + // .collect(); + // let proposal_signature = + // threshold_signature_recover::(quorum, &proposal_partials).unwrap(); + // + // // Generate seed signature (for the view number) + // let seed_msg = proposal.round.encode(); + // let seed_partials: Vec<_> = shares + // .iter() + // .take(quorum as usize) + // .map(|s| partial_sign_message::(s, Some(&seed_namespace(NAMESPACE)), &seed_msg)) + // .collect(); + // let seed_signature = threshold_signature_recover::(quorum, &seed_partials).unwrap(); + // + // Finalization { + // proposal, + // proposal_signature, + // seed_signature, + // } + // } + // + // fn make_notarization(proposal: Proposal, shares: &[Sh], quorum: u32) -> Notarization { + // let proposal_msg = proposal.encode(); + // + // // Generate proposal signature + // let proposal_partials: Vec<_> = shares + // .iter() + // .take(quorum as usize) + // .map(|s| { + // partial_sign_message::(s, Some(¬arize_namespace(NAMESPACE)), &proposal_msg) + // }) + // .collect(); + // let proposal_signature = + // threshold_signature_recover::(quorum, &proposal_partials).unwrap(); + // + // // Generate seed signature (for the view number) + // let seed_msg = proposal.round.encode(); + // let seed_partials: Vec<_> = shares + // .iter() + // .take(quorum as usize) + // .map(|s| partial_sign_message::(s, Some(&seed_namespace(NAMESPACE)), &seed_msg)) + // .collect(); + // let seed_signature = threshold_signature_recover::(quorum, &seed_partials).unwrap(); + // + // Notarization { + // proposal, + // proposal_signature, + // seed_signature, + // } + // } + // + // fn setup_network(context: deterministic::Context) -> Oracle

{ + // let (network, oracle) = Network::new( + // context.with_label("network"), + // simulated::Config { + // max_size: 1024 * 1024, + // disconnect_on_block: true, + // }, + // ); + // network.start(); + // oracle + // } + // + // fn setup_validators_and_shares( + // context: &mut deterministic::Context, + // ) -> (Vec, Vec

, ::Public, Vec) { + // let mut schemes = (0..NUM_VALIDATORS) + // .map(|i| PrivateKey::from_seed(i as u64)) + // .collect::>(); + // schemes.sort_by_key(|s| s.public_key()); + // let peers: Vec = schemes.iter().map(|s| s.public_key()).collect(); + // + // let (identity, shares) = generate_shares::<_, V>(context, None, NUM_VALIDATORS, QUORUM); + // let identity = *poly::public::(&identity); + // + // (schemes, peers, identity, shares) + // } + // + // async fn setup_network_links(oracle: &mut Oracle

, peers: &[P], link: Link) { + // for p1 in peers.iter() { + // for p2 in peers.iter() { + // if p2 == p1 { + // continue; + // } + // oracle + // .add_link(p1.clone(), p2.clone(), link.clone()) + // .await + // .unwrap(); + // } + // } + // } + // + // #[test_traced("WARN")] + // fn test_finalize_good_links() { + // for seed in 0..5 { + // let result1 = finalize(seed, LINK); + // let result2 = finalize(seed, LINK); + // + // // Ensure determinism + // assert_eq!(result1, result2); + // } + // } + // + // #[test_traced("WARN")] + // fn test_finalize_bad_links() { + // for seed in 0..5 { + // let result1 = finalize(seed, UNRELIABLE_LINK); + // let result2 = finalize(seed, UNRELIABLE_LINK); + // + // // Ensure determinism + // assert_eq!(result1, result2); + // } + // } + // + // fn finalize(seed: u64, link: Link) -> String { + // let runner = deterministic::Runner::new( + // deterministic::Config::new() + // .with_seed(seed) + // .with_timeout(Some(Duration::from_secs(400))), + // ); + // runner.start(|mut context| async move { + // let mut oracle = setup_network(context.clone()); + // let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context); + // + // // Initialize applications and actors + // let mut applications = BTreeMap::new(); + // let mut actors = Vec::new(); + // + // for (i, secret) in schemes.iter().enumerate() { + // let (application, actor) = setup_validator( + // context.with_label(&format!("validator-{i}")), + // &mut oracle, + // p2p::mocks::Coordinator::new(peers.clone()), + // secret.clone(), + // identity, + // ) + // .await; + // applications.insert(peers[i].clone(), application); + // actors.push(actor); + // } + // + // // Add links between all peers + // setup_network_links(&mut oracle, &peers, link.clone()).await; + // + // let coding_config = commonware_coding::Config { + // minimum_shards: peers.len().div_ceil(2) as u16, + // extra_shards: (peers.len() / 2) as u16, + // }; + // + // // Generate blocks, skipping the genesis block. + // let mut blocks = Vec::>::new(); + // let mut parent = CodingCommitment::default(); + // for i in 1..=NUM_BLOCKS { + // let block = B::new::(parent, i, i); + // let coded_block = CodedBlock::new(block, coding_config); + // parent = coded_block.commitment(); + // blocks.push(coded_block); + // } + // + // // Broadcast and finalize blocks in random order + // blocks.shuffle(&mut context); + // for block in blocks.iter() { + // // Skip genesis block + // let height = block.height(); + // assert!(height > 0, "genesis block should not have been generated"); + // + // // Calculate the epoch and round for the block + // let epoch = height / BLOCKS_PER_EPOCH; + // let round = Round::new(epoch, height); + // + // // Broadcast block by one validator + // let actor_index: usize = (height % (NUM_VALIDATORS as u64)) as usize; + // let mut actor = actors[actor_index].clone(); + // + // actor.broadcast(block.clone(), peers.clone()).await; + // + // // Wait for the block chunks to be delivered. + // context.sleep(link.latency + link.jitter).await; + // + // // Notarize block by the validator that broadcasted it + // let proposal = Proposal { + // round, + // parent: height.checked_sub(1).unwrap(), + // payload: block.commitment(), + // }; + // let notarization = make_notarization(proposal.clone(), &shares, QUORUM); + // actor + // .report(Activity::Notarization(notarization.clone())) + // .await; + // + // // Finalize block by all validators + // let fin = make_finalization(proposal, &shares, QUORUM); + // for actor in actors.iter_mut() { + // // Always finalize 1) the last block in each epoch 2) the last block in the chain. + // // Otherwise, finalize randomly. + // if height == NUM_BLOCKS + // || height % BLOCKS_PER_EPOCH == 0 + // || context.gen_bool(0.2) + // // 20% chance to finalize randomly + // { + // actor.report(Activity::Finalization(fin.clone())).await; + // } + // } + // } + // + // // Check that all applications received all blocks. + // let mut finished = false; + // while !finished { + // // Avoid a busy loop + // context.sleep(Duration::from_secs(1)).await; + // + // // If not all validators have finished, try again + // if applications.len() != NUM_VALIDATORS as usize { + // continue; + // } + // finished = true; + // for app in applications.values() { + // if app.blocks().len() != NUM_BLOCKS as usize { + // finished = false; + // break; + // } + // } + // } + // + // // Return state + // context.auditor().state() + // }) + // } + // + // #[test_traced("WARN")] + // fn test_subscribe_basic_block_delivery() { + // let runner = deterministic::Runner::timed(Duration::from_secs(60)); + // runner.start(|mut context| async move { + // let mut oracle = setup_network(context.clone()); + // let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context); + // + // let mut actors = Vec::new(); + // for (i, secret) in schemes.iter().enumerate() { + // let (_application, actor) = setup_validator( + // context.with_label(&format!("validator-{i}")), + // &mut oracle, + // p2p::mocks::Coordinator::new(vec![]), + // secret.clone(), + // identity, + // ) + // .await; + // actors.push(actor); + // } + // let mut actor = actors[0].clone(); + // + // setup_network_links(&mut oracle, &peers, LINK).await; + // + // let coding_config = commonware_coding::Config { + // minimum_shards: peers.len().div_ceil(2) as u16, + // extra_shards: (peers.len() / 2) as u16, + // }; + // + // let inner = B::new::(Default::default(), 1, 1); + // let block = CodedBlock::new(inner, coding_config); + // let commitment = block.commitment(); + // + // let subscription_rx = actor.subscribe(Some(Round::from((0, 1))), commitment).await; + // + // actor.broadcast(block.clone(), peers).await; + // + // let proposal = Proposal { + // round: Round::new(0, 1), + // parent: 0, + // payload: commitment, + // }; + // + // let notarization = make_notarization(proposal.clone(), &shares, QUORUM); + // actor.report(Activity::Notarization(notarization)).await; + // + // let finalization = make_finalization(proposal, &shares, QUORUM); + // actor.report(Activity::Finalization(finalization)).await; + // + // let received_block = subscription_rx.await.unwrap(); + // assert_eq!(received_block.digest(), block.digest()); + // assert_eq!(received_block.height(), 1); + // }) + // } + // + // #[test_traced("WARN")] + // fn test_subscribe_multiple_subscriptions() { + // let runner = deterministic::Runner::timed(Duration::from_secs(60)); + // runner.start(|mut context| async move { + // let mut oracle = setup_network(context.clone()); + // let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context); + // + // let mut actors = Vec::new(); + // for (i, secret) in schemes.iter().enumerate() { + // let (_application, actor) = setup_validator( + // context.with_label(&format!("validator-{i}")), + // &mut oracle, + // p2p::mocks::Coordinator::new(peers.clone()), + // secret.clone(), + // identity, + // ) + // .await; + // actors.push(actor); + // } + // let mut actor = actors[0].clone(); + // + // setup_network_links(&mut oracle, &peers, LINK).await; + // + // let coding_config = commonware_coding::Config { + // minimum_shards: peers.len().div_ceil(2) as u16, + // extra_shards: (peers.len() / 2) as u16, + // }; + // + // let parent = CodingCommitment::default(); + // let inner1 = B::new::(parent, 1, 1); + // let block1 = CodedBlock::new(inner1, coding_config); + // let inner2 = B::new::(block1.commitment(), 2, 2); + // let block2 = CodedBlock::new(inner2, coding_config); + // let commitment1 = block1.commitment(); + // let commitment2 = block2.commitment(); + // + // let sub1_rx = actor + // .subscribe(Some(Round::from((0, 1))), commitment1) + // .await; + // let sub2_rx = actor + // .subscribe(Some(Round::from((0, 2))), commitment2) + // .await; + // let sub3_rx = actor + // .subscribe(Some(Round::from((0, 1))), commitment1) + // .await; + // + // actor.broadcast(block1.clone(), peers.clone()).await; + // actor.broadcast(block2.clone(), peers).await; + // + // for (view, block) in [(1, block1.clone()), (2, block2.clone())] { + // let proposal = Proposal { + // round: Round::new(0, view), + // parent: view.checked_sub(1).unwrap(), + // payload: block.commitment(), + // }; + // + // let notarization = make_notarization(proposal.clone(), &shares, QUORUM); + // actor.report(Activity::Notarization(notarization)).await; + // + // let finalization = make_finalization(proposal, &shares, QUORUM); + // actor.report(Activity::Finalization(finalization)).await; + // } + // + // let received1_sub1 = sub1_rx.await.unwrap(); + // let received2 = sub2_rx.await.unwrap(); + // let received1_sub3 = sub3_rx.await.unwrap(); + // + // assert_eq!(received1_sub1.digest(), block1.digest()); + // assert_eq!(received2.digest(), block2.digest()); + // assert_eq!(received1_sub3.digest(), block1.digest()); + // assert_eq!(received1_sub1.height(), 1); + // assert_eq!(received2.height(), 2); + // assert_eq!(received1_sub3.height(), 1); + // }) + // } + // + // #[test_traced("WARN")] + // fn test_subscribe_canceled_subscriptions() { + // let runner = deterministic::Runner::timed(Duration::from_secs(60)); + // runner.start(|mut context| async move { + // let mut oracle = setup_network(context.clone()); + // let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context); + // + // let mut actors = Vec::new(); + // for (i, secret) in schemes.iter().enumerate() { + // let (_application, actor) = setup_validator( + // context.with_label(&format!("validator-{i}")), + // &mut oracle, + // p2p::mocks::Coordinator::new(peers.clone()), + // secret.clone(), + // identity, + // ) + // .await; + // actors.push(actor); + // } + // let mut actor = actors[0].clone(); + // + // setup_network_links(&mut oracle, &peers, LINK).await; + // + // let coding_config = commonware_coding::Config { + // minimum_shards: peers.len().div_ceil(2) as u16, + // extra_shards: (peers.len() / 2) as u16, + // }; + // + // let parent = CodingCommitment::default(); + // let inner1 = B::new::(parent, 1, 1); + // let block1 = CodedBlock::new(inner1, coding_config); + // let inner2 = B::new::(block1.commitment(), 2, 2); + // let block2 = CodedBlock::new(inner2, coding_config); + // let commitment1 = block1.commitment(); + // let commitment2 = block2.commitment(); + // + // let sub1_rx = actor + // .subscribe(Some(Round::from((0, 1))), commitment1) + // .await; + // let sub2_rx = actor + // .subscribe(Some(Round::from((0, 2))), commitment2) + // .await; + // + // drop(sub1_rx); + // + // actor.broadcast(block1.clone(), peers.clone()).await; + // actor.broadcast(block2.clone(), peers).await; + // + // for (view, block) in [(1, block1.clone()), (2, block2.clone())] { + // let proposal = Proposal { + // round: Round::new(0, view), + // parent: view.checked_sub(1).unwrap(), + // payload: block.commitment(), + // }; + // + // let notarization = make_notarization(proposal.clone(), &shares, QUORUM); + // actor.report(Activity::Notarization(notarization)).await; + // + // let finalization = make_finalization(proposal, &shares, QUORUM); + // actor.report(Activity::Finalization(finalization)).await; + // } + // + // let received2 = sub2_rx.await.unwrap(); + // assert_eq!(received2.digest(), block2.digest()); + // assert_eq!(received2.height(), 2); + // }) + // } + // + // #[test_traced("WARN")] + // fn test_subscribe_blocks_from_different_sources() { + // let runner = deterministic::Runner::default(); + // runner.start(|mut context| async move { + // let mut oracle = setup_network(context.clone()); + // let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context); + // + // let mut actors = Vec::new(); + // for (i, secret) in schemes.iter().enumerate() { + // let (_application, actor) = setup_validator( + // context.with_label(&format!("validator-{i}")), + // &mut oracle, + // p2p::mocks::Coordinator::new(peers.clone()), + // secret.clone(), + // identity, + // ) + // .await; + // actors.push(actor); + // } + // let mut actor = actors[0].clone(); + // + // setup_network_links(&mut oracle, &peers, LINK).await; + // + // let coding_config = commonware_coding::Config { + // minimum_shards: peers.len().div_ceil(2) as u16, + // extra_shards: (peers.len() / 2) as u16, + // }; + // + // let parent = CodingCommitment::from((Sha256::hash(b""), Default::default())); + // let inner1 = B::new::(parent, 1, 1); + // let block1 = CodedBlock::new(inner1, coding_config); + // let inner2 = B::new::(block1.commitment(), 2, 2); + // let block2 = CodedBlock::new(inner2, coding_config); + // + // // Block1: Broadcasted by self + // actor.broadcast(block1.clone(), peers.clone()).await; + // context.sleep(Duration::from_millis(20)).await; + // + // let sub1_rx = actor + // .subscribe(Some(Round::from((0, 1))), block1.commitment()) + // .await; + // + // let proposal1 = Proposal { + // round: Round::new(0, 1), + // parent: 0, + // payload: block1.commitment(), + // }; + // let notarization1 = make_notarization(proposal1.clone(), &shares, QUORUM); + // actor.report(Activity::Notarization(notarization1)).await; + // + // // Block1: delivered + // let received1 = sub1_rx.await.unwrap(); + // assert_eq!(received1.digest(), block1.digest()); + // assert_eq!(received1.height(), 1); + // + // // Block2: Broadcasted by a remote node (different actor) + // let remote_actor = &mut actors[1].clone(); + // remote_actor.broadcast(block2.clone(), peers).await; + // context.sleep(Duration::from_millis(40)).await; + // + // // Have the remote actor verify their shard. This will broadcast their shard + // // to the network, meeting the minimum reconstruction threshold for actor #1. + // assert!(remote_actor + // .verify_shard(block2.commitment(), 1) + // .await + // .await + // .unwrap()); + // + // let sub2_rx = actor + // .subscribe(Some(Round::from((0, 2))), block2.commitment()) + // .await; + // + // let proposal2 = Proposal { + // round: Round::new(0, 2), + // parent: 1, + // payload: block2.commitment(), + // }; + // + // context.sleep(Duration::from_millis(40)).await; + // + // let notarization2 = make_notarization(proposal2.clone(), &shares, QUORUM); + // actor.report(Activity::Notarization(notarization2)).await; + // + // // Block2: delivered + // let received2 = sub2_rx.await.unwrap(); + // assert_eq!(received2.digest(), block2.digest()); + // assert_eq!(received2.height(), 2); + // }) + // } + // + // #[test_traced("WARN")] + // fn test_get_info_basic_queries_present_and_missing() { + // let runner = deterministic::Runner::timed(Duration::from_secs(60)); + // runner.start(|mut context| async move { + // let mut oracle = setup_network(context.clone()); + // let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context); + // + // // Single validator actor + // let secret = schemes[0].clone(); + // let (_application, mut actor) = setup_validator( + // context.with_label("validator-0"), + // &mut oracle, + // p2p::mocks::Coordinator::new(vec![]), + // secret, + // identity, + // ) + // .await; + // + // let coding_config = commonware_coding::Config { + // minimum_shards: peers.len().div_ceil(2) as u16, + // extra_shards: (peers.len() / 2) as u16, + // }; + // + // // Initially, no latest + // assert!(actor.get_info(Identifier::Latest).await.is_none()); + // + // // Before finalization, specific height returns None + // assert!(actor.get_info(1).await.is_none()); + // + // // Create and broadcast a block, then finalize it + // let parent = CodingCommitment::from((Sha256::hash(b""), Default::default())); + // let inner = B::new::(parent, 1, 1); + // let block = CodedBlock::::new(inner, coding_config); + // let commitment = block.commitment(); + // let round = Round::new(0, 1); + // + // actor.broadcast(block.clone(), peers).await; + // + // let proposal = Proposal { + // round, + // parent: 0, + // payload: commitment, + // }; + // let finalization = make_finalization(proposal, &shares, QUORUM); + // actor.report(Activity::Finalization(finalization)).await; + // + // // Latest should now be the finalized block + // assert_eq!( + // actor.get_info(Identifier::Latest).await, + // Some((1, commitment)) + // ); + // + // // Height 1 now present + // assert_eq!(actor.get_info(1).await, Some((1, commitment))); + // + // // Commitment should map to its height + // assert_eq!(actor.get_info(&commitment).await, Some((1, commitment))); + // + // // Missing height + // assert!(actor.get_info(2).await.is_none()); + // + // // Missing commitment + // let missing = CodingCommitment::from((Sha256::hash(b"missing"), Default::default())); + // assert!(actor.get_info(&missing).await.is_none()); + // }) + // } + // + // #[test_traced("WARN")] + // fn test_get_info_latest_progression_multiple_finalizations() { + // let runner = deterministic::Runner::timed(Duration::from_secs(60)); + // runner.start(|mut context| async move { + // let mut oracle = setup_network(context.clone()); + // let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context); + // + // // Single validator actor + // let secret = schemes[0].clone(); + // let (_application, mut actor) = setup_validator( + // context.with_label("validator-0"), + // &mut oracle, + // p2p::mocks::Coordinator::new(vec![]), + // secret, + // identity, + // ) + // .await; + // + // let coding_config = commonware_coding::Config { + // minimum_shards: peers.len().div_ceil(2) as u16, + // extra_shards: (peers.len() / 2) as u16, + // }; + // + // // Initially none + // assert!(actor.get_info(Identifier::Latest).await.is_none()); + // + // // Build and finalize heights 1..=3 + // let parent0 = CodingCommitment::from((Sha256::hash(b""), Default::default())); + // let inner1 = B::new::(parent0, 1, 1); + // let block1 = CodedBlock::new(inner1, coding_config); + // let d1 = block1.commitment(); + // actor.broadcast(block1.clone(), peers.clone()).await; + // let f1 = make_finalization( + // Proposal { + // round: Round::new(0, 1), + // parent: 0, + // payload: d1, + // }, + // &shares, + // QUORUM, + // ); + // actor.report(Activity::Finalization(f1)).await; + // let latest = actor.get_info(Identifier::Latest).await; + // assert_eq!(latest, Some((1, d1))); + // + // let inner2 = B::new::(d1, 2, 2); + // let block2 = CodedBlock::new(inner2, coding_config); + // let d2 = block2.commitment(); + // actor.broadcast(block2.clone(), peers.clone()).await; + // let f2 = make_finalization( + // Proposal { + // round: Round::new(0, 2), + // parent: 1, + // payload: d2, + // }, + // &shares, + // QUORUM, + // ); + // actor.report(Activity::Finalization(f2)).await; + // let latest = actor.get_info(Identifier::Latest).await; + // assert_eq!(latest, Some((2, d2))); + // + // let inner3 = B::new::(d2, 3, 3); + // let block3 = CodedBlock::new(inner3, coding_config); + // let d3 = block3.commitment(); + // actor.broadcast(block3.clone(), peers).await; + // let f3 = make_finalization( + // Proposal { + // round: Round::new(0, 3), + // parent: 2, + // payload: d3, + // }, + // &shares, + // QUORUM, + // ); + // actor.report(Activity::Finalization(f3)).await; + // let latest = actor.get_info(Identifier::Latest).await; + // assert_eq!(latest, Some((3, d3))); + // }) + // } + // + // #[test_traced("WARN")] + // fn test_get_block_by_height_and_latest() { + // let runner = deterministic::Runner::timed(Duration::from_secs(60)); + // runner.start(|mut context| async move { + // let mut oracle = setup_network(context.clone()); + // let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context); + // + // let secret = schemes[0].clone(); + // let (_application, mut actor) = setup_validator( + // context.with_label("validator-0"), + // &mut oracle, + // p2p::mocks::Coordinator::new(vec![]), + // secret, + // identity, + // ) + // .await; + // + // let coding_config = commonware_coding::Config { + // minimum_shards: peers.len().div_ceil(2) as u16, + // extra_shards: (peers.len() / 2) as u16, + // }; + // + // // Before any finalization, GetBlock::Latest should be None + // let latest_block = actor.get_block(Identifier::Latest).await; + // assert!(latest_block.is_none()); + // + // // Finalize a block at height 1 + // let parent = CodingCommitment::from((Sha256::hash(b""), Default::default())); + // let inner = B::new::(parent, 1, 1); + // let block = CodedBlock::new(inner, coding_config); + // let commitment = block.commitment(); + // let round = Round::new(0, 1); + // actor.broadcast(block.clone(), peers).await; + // let proposal = Proposal { + // round, + // parent: 0, + // payload: commitment, + // }; + // let finalization = make_finalization(proposal, &shares, QUORUM); + // actor.report(Activity::Finalization(finalization)).await; + // + // // Get by height + // let by_height = actor.get_block(1).await.expect("missing block by height"); + // assert_eq!(by_height.height(), 1); + // assert_eq!(by_height.digest(), block.digest()); + // + // // Get by latest + // let by_latest = actor + // .get_block(Identifier::Latest) + // .await + // .expect("missing block by latest"); + // assert_eq!(by_latest.height(), 1); + // assert_eq!(by_latest.digest(), block.digest()); + // + // // Missing height + // let by_height = actor.get_block(2).await; + // assert!(by_height.is_none()); + // }) + // } + // + // #[test_traced("WARN")] + // fn test_get_block_by_commitment_from_sources_and_missing() { + // let runner = deterministic::Runner::timed(Duration::from_secs(60)); + // runner.start(|mut context| async move { + // let mut oracle = setup_network(context.clone()); + // let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context); + // + // let secret = schemes[0].clone(); + // let (_application, mut actor) = setup_validator( + // context.with_label("validator-0"), + // &mut oracle, + // p2p::mocks::Coordinator::new(peers.clone()), + // secret, + // identity, + // ) + // .await; + // + // let coding_config = commonware_coding::Config { + // minimum_shards: peers.len().div_ceil(2) as u16, + // extra_shards: (peers.len() / 2) as u16, + // }; + // + // // 1) From cache via broadcast + // let parent = CodingCommitment::from((Sha256::hash(b""), Default::default())); + // let b_inner = B::new::(parent, 1, 1); + // let b_block = CodedBlock::new(b_inner, coding_config); + // actor.broadcast(b_block.clone(), peers.clone()).await; + // let got = actor + // .get_block(&b_block.commitment()) + // .await + // .expect("missing block from cache"); + // assert_eq!(got.digest(), b_block.digest()); + // + // // 2) From finalized archive + // let fin_inner = B::new::(b_block.commitment(), 2, 2); + // let fin_block = CodedBlock::new(fin_inner, coding_config); + // let round2 = Round::new(0, 2); + // actor.broadcast(fin_block.clone(), peers).await; + // let proposal = Proposal { + // round: round2, + // parent: 1, + // payload: fin_block.commitment(), + // }; + // let finalization = make_finalization(proposal, &shares, QUORUM); + // actor.report(Activity::Finalization(finalization)).await; + // let got = actor + // .get_block(&fin_block.commitment()) + // .await + // .expect("missing block from finalized archive"); + // assert_eq!(got.digest(), fin_block.digest()); + // assert_eq!(got.height(), 2); + // + // // 3) Missing commitment + // let missing = CodingCommitment::from((Sha256::hash(b"missing"), Default::default())); + // let missing_block = actor.get_block(&missing).await; + // assert!(missing_block.is_none()); + // }) + // } } diff --git a/consensus/src/types.rs b/consensus/src/types.rs index c0159cca60..0118af6da4 100644 --- a/consensus/src/types.rs +++ b/consensus/src/types.rs @@ -1,8 +1,12 @@ //! Consensus types shared across the crate. use bytes::{Buf, BufMut}; -use commonware_codec::{EncodeSize, Error, Read, ReadExt, Write}; -use std::fmt::Display; +use commonware_codec::{Encode, EncodeSize, Error, FixedSize, Read, ReadExt, Write}; +use commonware_coding::Config as CodingConfig; +use commonware_cryptography::Digest; +use commonware_utils::{Array, Span}; +use rand_core::CryptoRngCore; +use std::{fmt::Display, ops::Deref}; /// Epoch is the type used to represent a distinct set of validators. /// @@ -71,6 +75,117 @@ impl Display for Round { } } +const CODING_COMMITMENT_SIZE: usize = 32 + CodingConfig::SIZE; + +/// A [Digest] containing a coding commitment and encoded [CodingConfig]. +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct CodingCommitment([u8; CODING_COMMITMENT_SIZE]); + +impl CodingCommitment { + /// Extracts the [CodingConfig] from this [CodingCommitment]. + pub fn config(&self) -> CodingConfig { + let mut buf = &self.0[32..]; + CodingConfig::read(&mut buf).expect("CodingCommitment always contains a valid config") + } + + /// Extracts a [Digest] from this [CodingCommitment]. + /// + /// ## Panics + /// + /// Panics if the [Digest]'s [FixedSize::SIZE] is > 32 bytes. + pub fn inner(&self) -> D { + const { + if D::SIZE > 32 { + panic!("Cannot extract Digest with size > 32 from CodingCommitment"); + } + } + + D::read(&mut self.0[..D::SIZE].as_ref()) + .expect("CodingCommitment always contains a valid digest") + } +} + +impl Digest for CodingCommitment { + fn random(rng: &mut R) -> Self { + let mut buf = [0u8; CODING_COMMITMENT_SIZE]; + rng.fill_bytes(&mut buf); + Self(buf) + } +} + +impl Write for CodingCommitment { + fn write(&self, buf: &mut impl bytes::BufMut) { + buf.put_slice(&self.0); + } +} + +impl FixedSize for CodingCommitment { + const SIZE: usize = CODING_COMMITMENT_SIZE; +} + +impl Read for CodingCommitment { + type Cfg = (); + + fn read_cfg( + buf: &mut impl bytes::Buf, + _cfg: &Self::Cfg, + ) -> Result { + let mut arr = [0u8; CODING_COMMITMENT_SIZE]; + buf.copy_to_slice(&mut arr); + Ok(CodingCommitment(arr)) + } +} + +impl AsRef<[u8]> for CodingCommitment { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl Deref for CodingCommitment { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::fmt::Display for CodingCommitment { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", commonware_utils::hex(self.as_ref())) + } +} + +impl std::fmt::Debug for CodingCommitment { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", commonware_utils::hex(self.as_ref())) + } +} + +impl Default for CodingCommitment { + fn default() -> Self { + Self([0u8; CODING_COMMITMENT_SIZE]) + } +} + +impl From<(D, CodingConfig)> for CodingCommitment { + fn from((digest, config): (D, CodingConfig)) -> Self { + const { + if D::SIZE > 32 { + panic!("Cannot create CodingCommitment from Digest with size > 32"); + } + } + + let mut buf = [0u8; CODING_COMMITMENT_SIZE]; + buf[..D::SIZE].copy_from_slice(&digest); + buf[32..].copy_from_slice(&config.encode()); + Self(buf) + } +} + +impl Span for CodingCommitment {} +impl Array for CodingCommitment {} + #[cfg(test)] mod tests { use super::*;