Skip to content

Commit 2ee3517

Browse files
committed
feat(consensus): define marshal-specific reporter trait
1 parent fe0dcf4 commit 2ee3517

File tree

1 file changed

+63
-2
lines changed

1 file changed

+63
-2
lines changed

consensus/src/marshal/mod.rs

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,71 @@ pub mod ingress;
6262
pub use ingress::mailbox::Mailbox;
6363
pub mod resolver;
6464

65-
use crate::{simplex::signing_scheme::Scheme, types::Epoch, Block};
66-
use futures::channel::oneshot;
65+
use crate::{simplex::signing_scheme::Scheme, types::Epoch, Block, Reporter};
66+
use futures::{
67+
channel::oneshot,
68+
future::{self, join, Either},
69+
};
6770
use std::sync::Arc;
6871

72+
#[derive(Clone)]
73+
pub struct Reporters<L, R> {
74+
left: L,
75+
right: R,
76+
}
77+
78+
impl<L, R> From<(L, R)> for Reporters<L, R>
79+
where
80+
L: Reporter,
81+
R: Reporter,
82+
{
83+
fn from((left, right): (L, R)) -> Self {
84+
Self { left, right }
85+
}
86+
}
87+
88+
impl<B, L, R> Reporter for Reporters<L, R>
89+
where
90+
B: Block,
91+
L: Reporter<Activity = Update<B>>,
92+
R: Reporter<Activity = Update<B>>,
93+
{
94+
type Activity = Update<B>;
95+
96+
async fn report(&mut self, activity: Self::Activity) {
97+
let (to_left, to_right, to_await) = match activity {
98+
Update::Tip(height, digest) => {
99+
let to_left = Update::Tip(height, digest.clone());
100+
let to_right = Update::Tip(height, digest.clone());
101+
let to_await = Either::Left(future::ready(()));
102+
(to_left, to_right, to_await)
103+
}
104+
Update::Block(block, report_ack) => {
105+
let (to_left, left_ack) = {
106+
let (tx, rx) = oneshot::channel();
107+
let msg = Update::Block(block.clone(), tx);
108+
(msg, rx)
109+
};
110+
let (to_right, right_ack) = {
111+
let (tx, rx) = oneshot::channel();
112+
let msg = Update::Block(block.clone(), tx);
113+
(msg, rx)
114+
};
115+
let to_await = Either::Right(async move {
116+
if let Ok(((), ())) = future::try_join(left_ack, right_ack).await {
117+
let _ = report_ack.send(());
118+
} else {
119+
drop(report_ack);
120+
}
121+
});
122+
(to_left, to_right, to_await)
123+
}
124+
};
125+
join(self.left.report(to_left), self.right.report(to_right)).await;
126+
to_await.await
127+
}
128+
}
129+
69130
/// Supplies the signing scheme the marshal should use for a given epoch.
70131
pub trait SchemeProvider: Clone + Send + Sync + 'static {
71132
/// The signing scheme to provide.

0 commit comments

Comments
 (0)