Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Make the consensus manager API resemble a classic channel interface #3233

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
65 changes: 25 additions & 40 deletions rs/p2p/artifact_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ use std::{
time::Duration,
};
use tokio::{
sync::mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
sync::mpsc::{Sender, UnboundedReceiver},
time::timeout,
};
use tracing::instrument;

type ArtifactEventSender<Artifact> = UnboundedSender<UnvalidatedArtifactMutation<Artifact>>;

/// Metrics for a client artifact processor.
struct ArtifactProcessorMetrics {
/// The processing time histogram.
Expand Down Expand Up @@ -136,45 +134,33 @@ pub fn run_artifact_processor<Artifact: IdentifiableArtifact>(
time_source: Arc<dyn TimeSource>,
metrics_registry: MetricsRegistry,
client: Box<dyn ArtifactProcessor<Artifact>>,
send_advert: Sender<ArtifactTransmit<Artifact>>,
outbound_tx: Sender<ArtifactTransmit<Artifact>>,
inbound_tx: UnboundedReceiver<UnvalidatedArtifactMutation<Artifact>>,
initial_artifacts: Vec<Artifact>,
) -> (Box<dyn JoinGuard>, ArtifactEventSender<Artifact>) {
// Making this channel bounded can be problematic since we don't have true multiplexing
// of P2P messages.
// Possible scenario is - adverts+chunks arrive on the same channel, slow consensus
// will result on slow consuption of chunks. Slow consumption of chunks will in turn
// result in slower consumptions of adverts. Ideally adverts are consumed at rate
// independent of consensus.
#[allow(clippy::disallowed_methods)]
let (sender, receiver) = unbounded_channel();
) -> Box<dyn JoinGuard> {
let shutdown = Arc::new(AtomicBool::new(false));

// Spawn the processor thread
let shutdown_cl = shutdown.clone();
let handle = ThreadBuilder::new()
.name(format!("{}_Processor", Artifact::NAME))
.spawn(move || {
for artifact in initial_artifacts {
let _ = send_advert.blocking_send(ArtifactTransmit::Deliver(ArtifactWithOpt {
let _ = outbound_tx.blocking_send(ArtifactTransmit::Deliver(ArtifactWithOpt {
artifact,
is_latency_sensitive: false,
}));
}
process_messages(
time_source,
client,
send_advert,
receiver,
outbound_tx,
inbound_tx,
ArtifactProcessorMetrics::new(metrics_registry, Artifact::NAME.to_string()),
shutdown_cl,
);
})
.unwrap();

(
Box::new(ArtifactProcessorJoinGuard::new(handle, shutdown)),
sender,
)
Box::new(ArtifactProcessorJoinGuard::new(handle, shutdown))
}

// The artifact processor thread loop
Expand Down Expand Up @@ -243,7 +229,8 @@ const ARTIFACT_MANAGER_TIMER_DURATION_MSEC: u64 = 200;
pub fn create_ingress_handlers<
PoolIngress: MutablePool<SignedIngress> + Send + Sync + ValidatedPoolReader<SignedIngress> + 'static,
>(
send_advert: Sender<ArtifactTransmit<SignedIngress>>,
outbound_tx: Sender<ArtifactTransmit<SignedIngress>>,
inbound_tx: UnboundedReceiver<UnvalidatedArtifactMutation<SignedIngress>>,
time_source: Arc<dyn TimeSource>,
ingress_pool: Arc<RwLock<PoolIngress>>,
ingress_handler: Arc<
Expand All @@ -254,19 +241,16 @@ pub fn create_ingress_handlers<
+ Sync,
>,
metrics_registry: MetricsRegistry,
) -> (
UnboundedSender<UnvalidatedArtifactMutation<SignedIngress>>,
Box<dyn JoinGuard>,
) {
) -> Box<dyn JoinGuard> {
let client = IngressProcessor::new(ingress_pool.clone(), ingress_handler);
let (jh, sender) = run_artifact_processor(
run_artifact_processor(
time_source.clone(),
metrics_registry,
Box::new(client),
send_advert,
outbound_tx,
inbound_tx,
vec![],
);
(sender, jh)
)
}

/// Starts the event loop that pools consensus for updates on what needs to be replicated.
Expand All @@ -275,25 +259,23 @@ pub fn create_artifact_handler<
Pool: MutablePool<Artifact> + Send + Sync + ValidatedPoolReader<Artifact> + 'static,
C: PoolMutationsProducer<Pool, Mutations = <Pool as MutablePool<Artifact>>::Mutations> + 'static,
>(
send_advert: Sender<ArtifactTransmit<Artifact>>,
outbound_tx: Sender<ArtifactTransmit<Artifact>>,
inbound_rx: UnboundedReceiver<UnvalidatedArtifactMutation<Artifact>>,
change_set_producer: C,
time_source: Arc<dyn TimeSource>,
pool: Arc<RwLock<Pool>>,
metrics_registry: MetricsRegistry,
) -> (
UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
Box<dyn JoinGuard>,
) {
) -> Box<dyn JoinGuard> {
let inital_artifacts: Vec<_> = pool.read().unwrap().get_all_for_broadcast().collect();
let client = Processor::new(pool, change_set_producer);
let (jh, sender) = run_artifact_processor(
run_artifact_processor(
time_source.clone(),
metrics_registry,
Box::new(client),
send_advert,
outbound_tx,
inbound_rx,
inital_artifacts,
);
(sender, jh)
)
}

// TODO: make it private, it is used only for tests outside of this crate
Expand Down Expand Up @@ -472,11 +454,14 @@ mod tests {

let time_source = Arc::new(SysTimeSource::new());
let (send_tx, mut send_rx) = tokio::sync::mpsc::channel(100);
#[allow(clippy::disallowed_methods)]
let (_, inbound_rx) = tokio::sync::mpsc::unbounded_channel();
run_artifact_processor::<DummyArtifact>(
time_source,
MetricsRegistry::default(),
Box::new(DummyProcessor),
send_tx,
inbound_rx,
(0..10).map(Into::into).collect(),
);

Expand Down
37 changes: 29 additions & 8 deletions rs/p2p/consensus_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use phantom_newtype::AmountOf;
use tokio::{
runtime::Handle,
sync::{
mpsc::{Receiver, UnboundedSender},
mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender},
watch,
},
};
Expand All @@ -28,15 +28,20 @@ mod sender;
type StartConsensusManagerFn =
Box<dyn FnOnce(Arc<dyn Transport>, watch::Receiver<SubnetTopology>) -> Vec<Shutdown>>;

pub struct ConsensusManagerBuilder {
const MAX_OUTBOUND_CHANNEL_SIZE: usize = 100_000;

pub type AbortableBroadcastSender<T> = Sender<ArtifactTransmit<T>>;
pub type AbortableBroadcastReceiver<T> = UnboundedReceiver<UnvalidatedArtifactMutation<T>>;

pub struct AbortableBroadcastChannelBuilder {
log: ReplicaLogger,
metrics_registry: MetricsRegistry,
rt_handle: Handle,
clients: Vec<StartConsensusManagerFn>,
router: Option<Router>,
}

impl ConsensusManagerBuilder {
impl AbortableBroadcastChannelBuilder {
pub fn new(log: ReplicaLogger, rt_handle: Handle, metrics_registry: MetricsRegistry) -> Self {
Self {
log,
Expand All @@ -47,18 +52,32 @@ impl ConsensusManagerBuilder {
}
}

pub fn add_client<
rumenov marked this conversation as resolved.
Show resolved Hide resolved
/// Creates a channel for the corresponding artifact. Tha channel is used to broadcast artifacts within the subnet.
pub fn abortable_broadcast_channel<
Artifact: IdentifiableArtifact,
WireArtifact: PbArtifact,
F: FnOnce(Arc<dyn Transport>) -> D + 'static,
D: ArtifactAssembler<Artifact, WireArtifact>,
>(
&mut self,
outbound_artifacts_rx: Receiver<ArtifactTransmit<Artifact>>,
inbound_artifacts_tx: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
(assembler, assembler_router): (F, Router),
slot_limit: usize,
) -> (
AbortableBroadcastSender<Artifact>,
AbortableBroadcastReceiver<Artifact>,
// TODO: remove this by introducing a new channel from the http handler into the processor
UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
) {
let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(MAX_OUTBOUND_CHANNEL_SIZE);
// Making this channel bounded can be problematic since we don't have true multiplexing
// of P2P messages.
// Possible scenario is - adverts+chunks arrive on the same channel, slow consensus
// will result on slow consuption of chunks. Slow consumption of chunks will in turn
// result in slower consumptions of adverts. Ideally adverts are consumed at rate
// independent of consensus.
#[allow(clippy::disallowed_methods)]
let (inbound_tx, inbound_rx) = tokio::sync::mpsc::unbounded_channel();

assert!(uri_prefix::<WireArtifact>()
.chars()
.all(char::is_alphabetic));
Expand All @@ -68,14 +87,15 @@ impl ConsensusManagerBuilder {
let rt_handle = self.rt_handle.clone();
let metrics_registry = self.metrics_registry.clone();

let inbound_tx_c = inbound_tx.clone();
let builder = move |transport: Arc<dyn Transport>, topology_watcher| {
start_consensus_manager(
log,
&metrics_registry,
rt_handle,
outbound_artifacts_rx,
outbound_rx,
adverts_from_peers_rx,
inbound_artifacts_tx,
inbound_tx,
assembler(transport.clone()),
transport,
topology_watcher,
Expand All @@ -92,6 +112,7 @@ impl ConsensusManagerBuilder {
);

self.clients.push(Box::new(builder));
(outbound_tx, inbound_rx, inbound_tx_c)
}

pub fn router(&mut self) -> Router {
Expand Down
1 change: 1 addition & 0 deletions rs/p2p/consensus_manager/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ mod tests {

fn new() -> Self {
let (_, adverts_received) = tokio::sync::mpsc::channel(100);
#[allow(clippy::disallowed_methods)]
let (sender, unvalidated_artifact_receiver) = tokio::sync::mpsc::unbounded_channel();
let (_, topology_watcher) = watch::channel(SubnetTopology::default());
let artifact_assembler =
Expand Down
29 changes: 15 additions & 14 deletions rs/p2p/test_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,30 +449,31 @@ pub fn start_consensus_manager(
processor: TestConsensus<U64Artifact>,
) -> (
Box<dyn JoinGuard>,
ic_consensus_manager::ConsensusManagerBuilder,
ic_consensus_manager::AbortableBroadcastChannelBuilder,
) {
let _enter = rt_handle.enter();
let pool = Arc::new(RwLock::new(processor));
let (artifact_processor_jh, artifact_manager_event_rx, artifact_sender) =
start_test_processor(pool.clone(), pool.clone().read().unwrap().clone());
let bouncer_factory = Arc::new(pool.clone().read().unwrap().clone());
let mut cm1 = ic_consensus_manager::ConsensusManagerBuilder::new(
let downloader = FetchArtifact::new(
log.clone(),
rt_handle.clone(),
pool.clone(),
bouncer_factory,
MetricsRegistry::default(),
);
let downloader = FetchArtifact::new(
log,
rt_handle,
pool,
bouncer_factory,

let mut cm1 = ic_consensus_manager::AbortableBroadcastChannelBuilder::new(
log.clone(),
rt_handle.clone(),
MetricsRegistry::default(),
);
cm1.add_client(
artifact_manager_event_rx,
artifact_sender,
downloader,
usize::MAX,
let (outbound_tx, inbound_rx, _) = cm1.abortable_broadcast_channel(downloader, usize::MAX);

let artifact_processor_jh = start_test_processor(
outbound_tx,
inbound_rx,
pool.clone(),
pool.clone().read().unwrap().clone(),
);
(artifact_processor_jh, cm1)
}
Expand Down
40 changes: 17 additions & 23 deletions rs/p2p/test_utils/src/turmoil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ pub fn add_transport_to_sim<F>(

async move {
let metrics_registry = MetricsRegistry::default();
let mut consensus_builder = ic_consensus_manager::ConsensusManagerBuilder::new(
let mut consensus_builder = ic_consensus_manager::AbortableBroadcastChannelBuilder::new(
log.clone(),
tokio::runtime::Handle::current(),
metrics_registry,
Expand All @@ -370,25 +370,22 @@ pub fn add_transport_to_sim<F>(
};

let _artifact_processor_jh = if let Some(consensus) = consensus_manager_clone {
let (artifact_processor_jh, artifact_manager_event_rx, artifact_sender) =
start_test_processor(
consensus.clone(),
consensus.clone().read().unwrap().clone(),
);
let bouncer_factory = Arc::new(consensus.clone().read().unwrap().clone());

let downloader = FetchArtifact::new(
log.clone(),
tokio::runtime::Handle::current(),
consensus,
consensus.clone(),
bouncer_factory,
MetricsRegistry::default(),
);
consensus_builder.add_client(
artifact_manager_event_rx,
artifact_sender,
downloader,
usize::MAX,
let (outbound_tx, inbound_tx, _) =
consensus_builder.abortable_broadcast_channel(downloader, usize::MAX);

let artifact_processor_jh = start_test_processor(
outbound_tx,
inbound_tx,
consensus.clone(),
consensus.clone().read().unwrap().clone(),
);
router = Some(router.unwrap_or_default().merge(consensus_builder.router()));

Expand Down Expand Up @@ -442,22 +439,19 @@ pub fn waiter_fut(

#[allow(clippy::type_complexity)]
pub fn start_test_processor(
outbound_tx: mpsc::Sender<ArtifactTransmit<U64Artifact>>,
inbound_rx: mpsc::UnboundedReceiver<UnvalidatedArtifactMutation<U64Artifact>>,
pool: Arc<RwLock<TestConsensus<U64Artifact>>>,
change_set_producer: TestConsensus<U64Artifact>,
) -> (
Box<dyn JoinGuard>,
mpsc::Receiver<ArtifactTransmit<U64Artifact>>,
mpsc::UnboundedSender<UnvalidatedArtifactMutation<U64Artifact>>,
) {
let (tx, rx) = tokio::sync::mpsc::channel(1000);
) -> Box<dyn JoinGuard> {
let time_source = Arc::new(SysTimeSource::new());
let client = ic_artifact_manager::Processor::new(pool, change_set_producer);
let (jh, sender) = run_artifact_processor(
run_artifact_processor(
time_source,
MetricsRegistry::default(),
Box::new(client),
tx,
outbound_tx,
inbound_rx,
vec![],
);
(jh, rx, sender)
)
}
Loading
Loading