Skip to content

Commit dcae530

Browse files
committed
api cleanup - verify shard directly
1 parent 2edcf84 commit dcae530

File tree

3 files changed

+145
-131
lines changed

3 files changed

+145
-131
lines changed

consensus/src/marshal/actor.rs

Lines changed: 114 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use super::{
99
},
1010
};
1111
use crate::{
12-
marshal::ingress::coding::{CodedBlock, Shard, ShardLayer},
12+
marshal::ingress::coding::{CodedBlock, ShardLayer},
1313
threshold_simplex::types::{Finalization, Notarization},
1414
types::Round,
1515
Block, Reporter,
@@ -45,14 +45,10 @@ struct BlockSubscription<B: Block> {
4545
_aborter: Aborter,
4646
}
4747

48-
/// A struct that holds multiple subscriptions for a chunk.
49-
struct ChunkSubscription<B, H>
50-
where
51-
B: Block<Digest = H::Digest, Commitment = H::Digest>,
52-
H: Hasher,
53-
{
48+
/// A struct that holds multiple subscriptions for a shard's validity check.
49+
struct ShardValiditySubscription {
5450
/// The subscribers that are waiting for the chunk
55-
subscribers: Vec<oneshot::Sender<Shard<CodedBlock<B, H>, H>>>,
51+
subscribers: Vec<oneshot::Sender<bool>>,
5652
/// Aborter that aborts the waiter future when dropped
5753
_aborter: Aborter,
5854
}
@@ -106,8 +102,8 @@ where
106102

107103
// Outstanding subscriptions for blocks
108104
block_subscriptions: BTreeMap<B::Commitment, BlockSubscription<B>>,
109-
// Outstanding subscriptions for chunks
110-
chunk_subscriptions: BTreeMap<B::Commitment, ChunkSubscription<B, H>>,
105+
// Outstanding subscriptions for shard validity checks
106+
shard_validity_subscriptions: BTreeMap<(B::Commitment, u16), ShardValiditySubscription>,
111107

112108
// ---------- Storage ----------
113109
// Prunable cache
@@ -248,7 +244,7 @@ where
248244
codec_config: config.codec_config,
249245
last_processed_round: Round::new(0, 0),
250246
block_subscriptions: BTreeMap::new(),
251-
chunk_subscriptions: BTreeMap::new(),
247+
shard_validity_subscriptions: BTreeMap::new(),
252248
cache,
253249
finalizations_by_height,
254250
finalized_blocks,
@@ -300,8 +296,7 @@ where
300296

301297
// Create a local pool for waiter futures
302298
let mut block_waiters = AbortablePool::<(B::Commitment, CodedBlock<B, H>)>::default();
303-
let mut chunk_waiters =
304-
AbortablePool::<((B::Commitment, u16), Shard<CodedBlock<B, H>, H>)>::default();
299+
let mut chunk_waiters = AbortablePool::<((B::Commitment, u16), bool)>::default();
305300

306301
// Handle messages
307302
loop {
@@ -310,7 +305,7 @@ where
310305
bs.subscribers.retain(|tx| !tx.is_canceled());
311306
!bs.subscribers.is_empty()
312307
});
313-
self.chunk_subscriptions.retain(|_, cs| {
308+
self.shard_validity_subscriptions.retain(|_, cs| {
314309
cs.subscribers.retain(|tx| !tx.is_canceled());
315310
!cs.subscribers.is_empty()
316311
});
@@ -322,7 +317,13 @@ where
322317
let Ok((commitment, block)) = result else {
323318
continue; // Aborted future
324319
};
325-
self.notify_subscribers(commitment, &block).await;
320+
self.notify_block_subscribers(commitment, &block).await;
321+
},
322+
result = chunk_waiters.next_completed() => {
323+
let Ok(((commitment, index), valid)) = result else {
324+
continue; // Aborted future
325+
};
326+
self.notify_shard_validity_subscribers(commitment, index, valid).await;
326327
},
327328
// Handle consensus before finalizer or backfiller
328329
mailbox_message = self.mailbox.next() => {
@@ -331,83 +332,11 @@ where
331332
return;
332333
};
333334
match message {
334-
Message::Broadcast { coding_commitment, config, chunks } => {
335-
shard_layer.broadcast_chunks(coding_commitment, config, chunks).await;
336-
}
337-
Message::Notarize { notarization } => {
338-
let commitment = notarization.proposal.payload;
339-
let index = notarization.proposal_signature.index as u16;
340-
shard_layer.try_broadcast_shard(commitment, index).await;
341-
}
342-
Message::Notarization { notarization } => {
343-
let round = notarization.round();
344-
let commitment = notarization.proposal.payload;
345-
346-
// Store notarization by view
347-
self.cache.put_notarization(round, commitment, notarization.clone()).await;
348-
349-
// Search for block locally, otherwise fetch it remotely
350-
if let Some(block) = self.find_block(&mut shard_layer, commitment).await {
351-
// If found, persist the block
352-
self.cache_block(round, commitment, block).await;
353-
} else {
354-
debug!(?round, "notarized block missing");
355-
resolver.fetch(Request::<CodedBlock<B, H>>::Notarized { round }).await;
356-
}
357-
}
358-
Message::Finalize { finalization } => {
359-
let commitment = finalization.proposal.payload;
360-
let index = finalization.proposal_signature.index as u16;
361-
shard_layer.try_broadcast_shard(commitment, index).await;
362-
}
363-
Message::Finalization { finalization } => {
364-
// Cache finalization by round
365-
let round = finalization.round();
366-
let commitment = finalization.proposal.payload;
367-
368-
self.cache.put_finalization(round, commitment, finalization.clone()).await;
369-
370-
// Search for block locally, otherwise fetch it remotely
371-
if let Some(block) = self.find_block(&mut shard_layer, commitment).await {
372-
// If found, persist the block
373-
let height = block.height();
374-
self.finalize(height, commitment, block, Some(finalization), &mut notifier_tx).await;
375-
debug!(?round, height, "finalized block stored");
376-
} else {
377-
// Otherwise, fetch the block from the network.
378-
debug!(?round, ?commitment, "finalized block missing");
379-
resolver.fetch(Request::<CodedBlock<B, H>>::Block(commitment)).await;
380-
}
381-
}
382335
Message::Get { commitment, response } => {
383336
// Check for block locally
384337
let result = self.find_block(&mut shard_layer, commitment).await;
385338
let _ = response.send(result.map(CodedBlock::take_inner));
386339
}
387-
Message::SubscribeChunk { commitment, index, response } => {
388-
// Check for chunk locally
389-
if let Some(shard) = shard_layer.get_chunk(commitment, index).await {
390-
let _ = response.send(shard);
391-
continue;
392-
}
393-
394-
match self.chunk_subscriptions.entry(commitment) {
395-
Entry::Occupied(mut entry) => {
396-
entry.get_mut().subscribers.push(response);
397-
}
398-
Entry::Vacant(entry) => {
399-
let (tx, rx) = oneshot::channel();
400-
shard_layer.subscribe_chunk(commitment, index, tx).await;
401-
let aborter = chunk_waiters.push(async move {
402-
((commitment, index), rx.await.expect("shard subscriber closed"))
403-
});
404-
entry.insert(ChunkSubscription {
405-
subscribers: vec![response],
406-
_aborter: aborter,
407-
});
408-
}
409-
}
410-
}
411340
Message::Subscribe { round, commitment, response } => {
412341
// Check for block locally
413342
if let Some(block) = self.find_block(&mut shard_layer, commitment).await {
@@ -454,6 +383,80 @@ where
454383
}
455384
}
456385
}
386+
Message::Broadcast { coding_commitment, config, chunks } => {
387+
shard_layer.broadcast_chunks(coding_commitment, config, chunks).await;
388+
}
389+
Message::VerifyShard { commitment, index, response } => {
390+
// Check for chunk locally
391+
if let Some(shard) = shard_layer.get_chunk(commitment, index).await {
392+
let _ = response.send(shard.verify(index, &commitment));
393+
continue;
394+
}
395+
396+
match self.shard_validity_subscriptions.entry((commitment, index)) {
397+
Entry::Occupied(mut entry) => {
398+
entry.get_mut().subscribers.push(response);
399+
}
400+
Entry::Vacant(entry) => {
401+
let (tx, rx) = oneshot::channel();
402+
shard_layer.subscribe_chunk(commitment, index, tx).await;
403+
let aborter = chunk_waiters.push(async move {
404+
let shard = rx.await.expect("shard subscriber closed");
405+
let valid = shard.verify(index, &commitment);
406+
((commitment, index), valid)
407+
});
408+
entry.insert(ShardValiditySubscription {
409+
subscribers: vec![response],
410+
_aborter: aborter,
411+
});
412+
}
413+
}
414+
}
415+
Message::Notarize { notarization } => {
416+
let commitment = notarization.proposal.payload;
417+
let index = notarization.proposal_signature.index as u16;
418+
shard_layer.try_broadcast_shard(commitment, index).await;
419+
}
420+
Message::Notarization { notarization } => {
421+
let round = notarization.round();
422+
let commitment = notarization.proposal.payload;
423+
424+
// Store notarization by view
425+
self.cache.put_notarization(round, commitment, notarization.clone()).await;
426+
427+
// Search for block locally, otherwise fetch it remotely
428+
if let Some(block) = self.find_block(&mut shard_layer, commitment).await {
429+
// If found, persist the block
430+
self.cache_block(round, commitment, block).await;
431+
} else {
432+
debug!(?round, "notarized block missing");
433+
resolver.fetch(Request::<CodedBlock<B, H>>::Notarized { round }).await;
434+
}
435+
}
436+
Message::Finalize { finalization } => {
437+
let commitment = finalization.proposal.payload;
438+
let index = finalization.proposal_signature.index as u16;
439+
shard_layer.try_broadcast_shard(commitment, index).await;
440+
}
441+
Message::Finalization { finalization } => {
442+
// Cache finalization by round
443+
let round = finalization.round();
444+
let commitment = finalization.proposal.payload;
445+
446+
self.cache.put_finalization(round, commitment, finalization.clone()).await;
447+
448+
// Search for block locally, otherwise fetch it remotely
449+
if let Some(block) = self.find_block(&mut shard_layer, commitment).await {
450+
// If found, persist the block
451+
let height = block.height();
452+
self.finalize(height, commitment, block, Some(finalization), &mut notifier_tx).await;
453+
debug!(?round, height, "finalized block stored");
454+
} else {
455+
// Otherwise, fetch the block from the network.
456+
debug!(?round, ?commitment, "finalized block missing");
457+
resolver.fetch(Request::<CodedBlock<B, H>>::Block(commitment)).await;
458+
}
459+
}
457460
}
458461
},
459462
// Handle finalizer messages next
@@ -675,14 +678,35 @@ where
675678
// -------------------- Waiters --------------------
676679

677680
/// Notify any subscribers for the given commitment with the provided block.
678-
async fn notify_subscribers(&mut self, commitment: B::Commitment, block: &CodedBlock<B, H>) {
681+
async fn notify_block_subscribers(
682+
&mut self,
683+
commitment: B::Commitment,
684+
block: &CodedBlock<B, H>,
685+
) {
679686
if let Some(mut bs) = self.block_subscriptions.remove(&commitment) {
680687
for subscriber in bs.subscribers.drain(..) {
681688
let _ = subscriber.send(block.clone().take_inner());
682689
}
683690
}
684691
}
685692

693+
// Notify any subscribers waiting for shard validity.
694+
async fn notify_shard_validity_subscribers(
695+
&mut self,
696+
commitment: B::Commitment,
697+
index: u16,
698+
valid: bool,
699+
) {
700+
if let Some(mut cs) = self
701+
.shard_validity_subscriptions
702+
.remove(&(commitment, index))
703+
{
704+
for subscriber in cs.subscribers.drain(..) {
705+
let _ = subscriber.send(valid);
706+
}
707+
}
708+
}
709+
686710
// -------------------- Prunable Storage --------------------
687711

688712
/// Add a notarized block to the prunable archive.
@@ -692,7 +716,7 @@ where
692716
commitment: B::Commitment,
693717
block: CodedBlock<B, H>,
694718
) {
695-
self.notify_subscribers(commitment, &block).await;
719+
self.notify_block_subscribers(commitment, &block).await;
696720
self.cache.put_block(round, commitment, block).await;
697721
}
698722

@@ -733,7 +757,7 @@ where
733757
finalization: Option<Finalization<V, B::Commitment>>,
734758
notifier: &mut mpsc::Sender<()>,
735759
) {
736-
self.notify_subscribers(commitment, &block).await;
760+
self.notify_block_subscribers(commitment, &block).await;
737761

738762
// In parallel, update the finalized blocks and finalizations archives
739763
if let Err(e) = try_join!(

0 commit comments

Comments
 (0)