Skip to content

Commit

Permalink
feat(consensus): smooth transition to p2p syncing (BFT-515) (#3075)
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 authored Oct 11, 2024
1 parent 038c397 commit 5d339b4
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 4 deletions.
58 changes: 55 additions & 3 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, scope, time};
use zksync_consensus_executor::{self as executor, attestation};
use zksync_consensus_roles::{attester, validator};
use zksync_consensus_storage::BlockStore;
use zksync_consensus_storage::{BlockStore, PersistentBlockStore as _};
use zksync_dal::consensus_dal;
use zksync_node_sync::{fetcher::FetchedBlock, sync_action::ActionQueueSender, SyncState};
use zksync_types::L2BlockNumber;
Expand All @@ -21,6 +21,10 @@ use crate::{
storage::{self, ConnectionPool},
};

/// If less than TEMPORARY_FETCHER_THRESHOLD certificates are missing,
/// the temporary fetcher will stop fetching blocks.
pub(crate) const TEMPORARY_FETCHER_THRESHOLD: u64 = 10;

/// External node.
pub(super) struct EN {
pub(super) pool: ConnectionPool,
Expand Down Expand Up @@ -120,6 +124,20 @@ impl EN {
.wrap("Store::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });

// Run the temporary fetcher until the certificates are backfilled.
// Temporary fetcher should be removed once json RPC syncing is fully deprecated.
s.spawn_bg({
let store = store.clone();
async {
let store = store;
self.temporary_block_fetcher(ctx, &store).await?;
tracing::info!(
"temporary block fetcher finished, switching to p2p fetching only"
);
Ok(())
}
});

let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone()))
.await
.wrap("BlockStore::new()")?;
Expand Down Expand Up @@ -358,8 +376,42 @@ impl EN {
}
}

/// Fetches blocks from the main node directly, until the certificates
/// are backfilled. This allows for smooth transition from json RPC to p2p block syncing.
pub(crate) async fn temporary_block_fetcher(
&self,
ctx: &ctx::Ctx,
store: &Store,
) -> ctx::Result<()> {
const MAX_CONCURRENT_REQUESTS: usize = 30;
scope::run!(ctx, |ctx, s| async {
let (send, mut recv) = ctx::channel::bounded(MAX_CONCURRENT_REQUESTS);
s.spawn(async {
let Some(mut next) = store.next_block(ctx).await? else {
return Ok(());
};
while store.persisted().borrow().next().0 + TEMPORARY_FETCHER_THRESHOLD < next.0 {
let n = L2BlockNumber(next.0.try_into().context("overflow")?);
self.sync_state.wait_for_main_node_block(ctx, n).await?;
send.send(ctx, s.spawn(self.fetch_block(ctx, n))).await?;
next = next.next();
}
drop(send);
Ok(())
});
while let Ok(block) = recv.recv_or_disconnected(ctx).await? {
store
.queue_next_fetched_block(ctx, block.join(ctx).await?)
.await
.wrap("queue_next_fetched_block()")?;
}
Ok(())
})
.await
}

/// Fetches blocks from the main node in range `[cursor.next()..end)`.
pub(super) async fn fetch_blocks(
async fn fetch_blocks(
&self,
ctx: &ctx::Ctx,
queue: &mut storage::PayloadQueue,
Expand All @@ -373,7 +425,7 @@ impl EN {
s.spawn(async {
let send = send;
while end.map_or(true, |end| next < end) {
let n = L2BlockNumber(next.0.try_into().unwrap());
let n = L2BlockNumber(next.0.try_into().context("overflow")?);
self.sync_state.wait_for_main_node_block(ctx, n).await?;
send.send(ctx, s.spawn(self.fetch_block(ctx, n))).await?;
next = next.next();
Expand Down
24 changes: 24 additions & 0 deletions core/node/consensus/src/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,30 @@ impl Store {
async fn conn(&self, ctx: &ctx::Ctx) -> ctx::Result<Connection> {
self.pool.connection(ctx).await.wrap("connection")
}

/// Number of the next block to queue.
pub(crate) async fn next_block(
&self,
ctx: &ctx::Ctx,
) -> ctx::OrCanceled<Option<validator::BlockNumber>> {
Ok(sync::lock(ctx, &self.block_payloads)
.await?
.as_ref()
.map(|p| p.next()))
}

/// Queues the next block.
pub(crate) async fn queue_next_fetched_block(
&self,
ctx: &ctx::Ctx,
block: FetchedBlock,
) -> ctx::Result<()> {
let mut payloads = sync::lock(ctx, &self.block_payloads).await?.into_async();
if let Some(payloads) = &mut *payloads {
payloads.send(block).await.context("payloads.send()")?;
}
Ok(())
}
}

impl PersistedBlockState {
Expand Down
39 changes: 38 additions & 1 deletion core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ use zksync_types::{
};
use zksync_web3_decl::client::{Client, DynClient, L2};

use crate::{en, storage::ConnectionPool};
use crate::{
en,
storage::{ConnectionPool, Store},
};

/// Fake StateKeeper for tests.
#[derive(Debug)]
Expand Down Expand Up @@ -417,6 +420,40 @@ impl StateKeeper {
.await
}

pub async fn run_temporary_fetcher(
self,
ctx: &ctx::Ctx,
client: Box<DynClient<L2>>,
) -> ctx::Result<()> {
scope::run!(ctx, |ctx, s| async {
let payload_queue = self
.pool
.connection(ctx)
.await
.wrap("connection()")?
.new_payload_queue(ctx, self.actions_sender, self.sync_state.clone())
.await
.wrap("new_payload_queue()")?;
let (store, runner) = Store::new(
ctx,
self.pool.clone(),
Some(payload_queue),
Some(client.clone()),
)
.await
.wrap("Store::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });
en::EN {
pool: self.pool.clone(),
client,
sync_state: self.sync_state.clone(),
}
.temporary_block_fetcher(ctx, &store)
.await
})
.await
}

/// Runs consensus node for the external node.
pub async fn run_consensus(
self,
Expand Down
132 changes: 132 additions & 0 deletions core/node/consensus/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ use zksync_consensus_storage::{BlockStore, PersistentBlockStore};
use zksync_dal::consensus_dal;
use zksync_test_account::Account;
use zksync_types::ProtocolVersionId;
use zksync_web3_decl::namespaces::EnNamespaceClient as _;

use crate::{
en::TEMPORARY_FETCHER_THRESHOLD,
mn::run_main_node,
storage::{ConnectionPool, Store},
testonly,
Expand Down Expand Up @@ -665,6 +667,136 @@ async fn test_p2p_fetcher_backfill_certs(
.unwrap();
}

// Test temporary fetcher fetching blocks if a lot of certs are missing.
#[test_casing(8, Product((FROM_SNAPSHOT,VERSIONS,PREGENESIS)))]
#[tokio::test]
async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId, pregenesis: bool) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
// We force certs to be missing on EN by having 1 of the validators permanently offline.
// This way no blocks will be finalized at all, so no one will have certs.
let setup = Setup::new(rng, 2);
let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

scope::run!(ctx, |ctx, s| async {
tracing::info!("Spawn validator.");
let validator_pool = ConnectionPool::test(from_snapshot, version).await;
let (mut validator, runner) =
testonly::StateKeeper::new(ctx, validator_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(run_main_node(
ctx,
validator_cfg.config.clone(),
validator_cfg.secrets.clone(),
validator_pool.clone(),
));
// API server needs at least 1 L1 batch to start.
validator.seal_batch().await;
let client = validator.connect(ctx).await?;

// Wait for the consensus to be initialized.
while ctx.wait(client.consensus_global_config()).await??.is_none() {
ctx.sleep(time::Duration::milliseconds(100)).await?;
}

let node_pool = ConnectionPool::test(from_snapshot, version).await;

tracing::info!("Run centralized fetcher, so that there is a lot of certs missing.");
scope::run!(ctx, |ctx, s| async {
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(node.run_fetcher(ctx, client.clone()));
validator
.push_random_blocks(rng, account, TEMPORARY_FETCHER_THRESHOLD as usize + 1)
.await;
node_pool
.wait_for_payload(ctx, validator.last_block())
.await?;
Ok(())
})
.await
.unwrap();

tracing::info!(
"Run p2p fetcher. Blocks should be fetched by the temporary fetcher anyway."
);
scope::run!(ctx, |ctx, s| async {
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(node.run_consensus(ctx, client.clone(), node_cfg.clone()));
validator.push_random_blocks(rng, account, 5).await;
node_pool
.wait_for_payload(ctx, validator.last_block())
.await?;
Ok(())
})
.await
.unwrap();
Ok(())
})
.await
.unwrap();
}

// Test that temporary fetcher terminates once enough blocks have certs.
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_temporary_fetcher_termination(from_snapshot: bool, version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let pregenesis = true;
let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

scope::run!(ctx, |ctx, s| async {
tracing::info!("Spawn validator.");
let validator_pool = ConnectionPool::test(from_snapshot, version).await;
let (mut validator, runner) =
testonly::StateKeeper::new(ctx, validator_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(run_main_node(
ctx,
validator_cfg.config.clone(),
validator_cfg.secrets.clone(),
validator_pool.clone(),
));
// API server needs at least 1 L1 batch to start.
validator.seal_batch().await;
let client = validator.connect(ctx).await?;

let node_pool = ConnectionPool::test(from_snapshot, version).await;

// Run the EN so the consensus is initialized on EN and wait for it to sync.
scope::run!(ctx, |ctx, s| async {
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(node.run_consensus(ctx, client.clone(), node_cfg.clone()));
validator.push_random_blocks(rng, account, 5).await;
node_pool
.wait_for_payload(ctx, validator.last_block())
.await?;
Ok(())
})
.await
.unwrap();

// Run the temporary fetcher. It should terminate immediately, since EN is synced.
let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?;
s.spawn_bg(runner.run(ctx));
node.run_temporary_fetcher(ctx, client).await?;

Ok(())
})
.await
.unwrap();
}

#[test_casing(4, Product((VERSIONS,PREGENESIS)))]
#[tokio::test]
async fn test_with_pruning(version: ProtocolVersionId, pregenesis: bool) {
Expand Down

0 comments on commit 5d339b4

Please sign in to comment.