Skip to content

Commit

Permalink
WIP: DataColumnsByRange RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
Tumas committed May 17, 2024
1 parent 038b0b5 commit 02a186d
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 16 deletions.
2 changes: 1 addition & 1 deletion eth2_libp2p
48 changes: 42 additions & 6 deletions p2p/src/block_sync_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,9 @@ impl<P: Preset> BlockSyncService<P> {
self.sync_manager.block_by_root_request_finished(block_root);
self.request_blobs_and_blocks_if_ready()?;
}
//TODO(feature/eip-7594)
P2pToSync::DataColumnsByRangeRequestFinished(request_id) => {
// self.sync_manager.blobs_by_range_request_finished(request_id);
// self.request_blobs_and_blocks_if_ready()?;
self.sync_manager.data_columns_by_range_request_finished(request_id);
self.request_blobs_and_blocks_if_ready()?;
}
}
}
Expand Down Expand Up @@ -391,15 +390,26 @@ impl<P: Preset> BlockSyncService<P> {
target,
start_slot,
count,
ref data_columns,
..
} = batch;

let peer = self.sync_manager.retry_batch(request_id, &batch);

if let Some(peer_id) = peer {
match target {
// todo!(feature/eip7594)
SyncTarget::DataColumnSidecar => {}
SyncTarget::DataColumnSidecar => {
let data_columns = data_columns.clone().unwrap_or_default();

SyncToP2p::RequestDataColumnsByRange(
request_id,
peer_id,
start_slot,
count,
data_columns,
)
.send(&self.sync_to_p2p_tx);
}
SyncTarget::BlobSidecar => {
SyncToP2p::RequestBlobsByRange(request_id, peer_id, start_slot, count)
.send(&self.sync_to_p2p_tx);
Expand Down Expand Up @@ -435,9 +445,20 @@ impl<P: Preset> BlockSyncService<P> {
self.retry_sync_batches(expired_batches)
}

fn request_expired_data_column_range_requests(&mut self) -> Result<()> {
let expired_batches = self
.sync_manager
.expired_data_column_range_batches()
.map(|(batch, _)| batch)
.collect();

self.retry_sync_batches(expired_batches)
}

fn request_blobs_and_blocks_if_ready(&mut self) -> Result<()> {
self.request_expired_blob_range_requests()?;
self.request_expired_block_range_requests()?;
self.request_expired_data_column_range_requests()?;

if !self.sync_manager.ready_to_request_blocks_by_range() {
return Ok(());
Expand Down Expand Up @@ -493,12 +514,27 @@ impl<P: Preset> BlockSyncService<P> {
start_slot,
count,
target,
ref data_columns,
..
} = batch;

match target {
//TODO(feature/eip-7594)
SyncTarget::DataColumnSidecar => {}
SyncTarget::DataColumnSidecar => {
let data_columns = data_columns.clone().unwrap_or_default();

self.sync_manager
.add_data_columns_request_by_range(request_id, batch);

SyncToP2p::RequestDataColumnsByRange(
request_id,
peer_id,
start_slot,
count,
data_columns,
)
.send(&self.sync_to_p2p_tx);
}
SyncTarget::BlobSidecar => {
self.sync_manager
.add_blob_request_by_range(request_id, batch);
Expand Down
11 changes: 9 additions & 2 deletions p2p/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ use futures::channel::{mpsc::UnboundedSender, oneshot::Sender};
use log::debug;
use operation_pools::PoolRejectionReason;
use serde::Serialize;
use ssz::ContiguousList;
use types::{
altair::containers::{SignedContributionAndProof, SyncCommitteeMessage},
combined::{BeaconState, SignedBeaconBlock},
deneb::containers::{BlobIdentifier, BlobSidecar},
eip7594::{DataColumnIdentifier, DataColumnSidecar},
eip7594::{ColumnIndex, DataColumnIdentifier, DataColumnSidecar, NumberOfColumns},
nonstandard::Phase,
phase0::{
containers::{
Expand Down Expand Up @@ -127,7 +128,13 @@ impl SyncToMetrics {

pub enum SyncToP2p {
PruneReceivedBlocks,
// RequestDataColumnsByRange(RequestId, PeerId, Slot, u64),
RequestDataColumnsByRange(
RequestId,
PeerId,
Slot,
u64,
Arc<ContiguousList<ColumnIndex, NumberOfColumns>>,
),
RequestDataColumnsByRoot(RequestId, PeerId, Vec<DataColumnIdentifier>),
RequestBlobsByRange(RequestId, PeerId, Slot, u64),
RequestBlobsByRoot(RequestId, PeerId, Vec<BlobIdentifier>),
Expand Down
35 changes: 33 additions & 2 deletions p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use prometheus_client::registry::Registry;
use prometheus_metrics::Metrics;
use slog::{o, Drain as _, Logger};
use slog_stdlog::StdLog;
use ssz::SszHash as _;
use ssz::{ContiguousList, SszHash as _};
use std_ext::ArcExt as _;
use thiserror::Error;
use typenum::Unsigned as _;
Expand All @@ -46,7 +46,10 @@ use types::{
capella::containers::SignedBlsToExecutionChange,
combined::SignedBeaconBlock,
deneb::containers::{BlobIdentifier, BlobSidecar},
eip7594::{DataColumnIdentifier, DataColumnSidecar, DATA_COLUMN_SIDECAR_SUBNET_COUNT},
eip7594::{
ColumnIndex, DataColumnIdentifier, DataColumnSidecar, NumberOfColumns,
DATA_COLUMN_SIDECAR_SUBNET_COUNT,
},
nonstandard::{Phase, WithStatus},
phase0::{
consts::{FAR_FUTURE_EPOCH, GENESIS_EPOCH},
Expand Down Expand Up @@ -441,6 +444,9 @@ impl<P: Preset> Network<P> {
SyncToP2p::RequestDataColumnsByRoot(request_id, peer_id, identifiers) => {
self.request_data_columns_by_root(request_id, peer_id, identifiers);
}
SyncToP2p::RequestDataColumnsByRange(request_id, peer_id, start_slot, count, columns) => {
self.request_data_columns_by_range(request_id, peer_id, start_slot, count, columns);
}
SyncToP2p::RequestPeerStatus(request_id, peer_id) => {
self.request_peer_status(request_id, peer_id);
}
Expand Down Expand Up @@ -2196,6 +2202,31 @@ impl<P: Preset> Network<P> {
self.request(peer_id, request_id, Request::BlocksByRoot(request));
}

fn request_data_columns_by_range(
&mut self,
request_id: RequestId,
peer_id: PeerId,
start_slot: Slot,
count: u64,
columns: Arc<ContiguousList<ColumnIndex, NumberOfColumns>>,
) {
// TODO: is count capped in eth2_libp2p?
let request = DataColumnsByRangeRequest {
start_slot,
count,
columns,
};

self.log(
Level::Debug,
format_args!(
"sending DataColumnsByRange request (request_id: {request_id} peer_id: {peer_id}, request: {request:?})",
),
);

self.request(peer_id, request_id, Request::DataColumnsByRange(request));
}

fn request_data_columns_by_root(
&self,
request_id: RequestId,
Expand Down
54 changes: 49 additions & 5 deletions p2p/src/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ use itertools::Itertools as _;
use log::{log, Level};
use prometheus_metrics::Metrics;
use rand::{prelude::SliceRandom, seq::IteratorRandom as _, thread_rng};
use ssz::ContiguousList;
use typenum::Unsigned as _;
use types::{
config::Config,
deneb::containers::BlobIdentifier,
eip7594::DataColumnIdentifier,
eip7594::{ColumnIndex, DataColumnIdentifier, NumberOfColumns},
phase0::primitives::{Epoch, Slot, H256},
preset::Preset,
};
Expand Down Expand Up @@ -56,13 +57,14 @@ pub enum SyncTarget {
DataColumnSidecar,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SyncBatch {
pub target: SyncTarget,
pub direction: SyncDirection,
pub peer_id: PeerId,
pub start_slot: Slot,
pub count: u64,
pub data_columns: Option<Arc<ContiguousList<ColumnIndex, NumberOfColumns>>>,
}

pub struct SyncManager {
Expand Down Expand Up @@ -134,11 +136,13 @@ impl SyncManager {
peer_id,
start_slot: batch.start_slot,
count: batch.count,
data_columns: batch.data_columns.clone(),
};

match batch.target {
// TODO(feature/eip7594)
SyncTarget::DataColumnSidecar => {}
SyncTarget::DataColumnSidecar => {
self.add_data_columns_request_by_range(request_id, batch)
}
SyncTarget::BlobSidecar => self.add_blob_request_by_range(request_id, batch),
SyncTarget::Block => self.add_block_request_by_range(request_id, batch),
}
Expand Down Expand Up @@ -193,6 +197,8 @@ impl SyncManager {
peer_id,
start_slot,
count,
// TODO(feature/eip7594)
data_columns: None,
};

self.log_with_feature(format_args!("back sync batch built: {batch:?})"));
Expand Down Expand Up @@ -310,7 +316,18 @@ impl SyncManager {
max_slot = start_slot + count - 1;

if config.is_eip7594_fork(misc::compute_epoch_at_slot::<P>(start_slot)) {
// TODO(feature/eip7594)
// TODO(feature/eip7594): figure out slot range and data columns
//
// if data_column_serve_range_slot < max_slot {
// sync_batches.push(SyncBatch {
// target: SyncTarget::BlobSidecar,
// direction: SyncDirection::Forward,
// peer_id,
// start_slot,
// count,
// data_columns: None,
// });
// }
} else {
if blob_serve_range_slot < max_slot {
sync_batches.push(SyncBatch {
Expand All @@ -319,16 +336,19 @@ impl SyncManager {
peer_id,
start_slot,
count,
data_columns: None,
});
}
}

// TODO(feature/eip7594): refactor SyncBatch to Enum instead of struct with options
sync_batches.push(SyncBatch {
target: SyncTarget::Block,
direction: SyncDirection::Forward,
peer_id,
start_slot,
count,
data_columns: None,
});
}

Expand All @@ -355,6 +375,18 @@ impl SyncManager {
.ready_to_request_by_root(&block_root, peer_id)
}

pub fn add_data_columns_request_by_range(&mut self, request_id: RequestId, batch: SyncBatch) {
self.log_with_feature(format_args!(
"add blob request by range (request_id: {}, peer_id: {}, range: {:?})",
request_id,
batch.peer_id,
(batch.start_slot..(batch.start_slot + batch.count)),
));

self.data_column_requests
.add_request_by_range(request_id, batch)
}

pub fn add_blob_request_by_range(&mut self, request_id: RequestId, batch: SyncBatch) {
self.log_with_feature(format_args!(
"add blob request by range (request_id: {}, peer_id: {}, range: {:?})",
Expand Down Expand Up @@ -464,6 +496,12 @@ impl SyncManager {
));
}

pub fn data_columns_by_range_request_finished(&mut self, request_id: RequestId) {
self.log_with_feature(format_args!(
"request data columns by range finished (request_id: {request_id:?})",
));
}

/// Log a message with peer count information.
fn log(&self, level: Level, message: impl Display) {
log!(
Expand Down Expand Up @@ -590,6 +628,12 @@ impl SyncManager {
self.block_requests.expired_range_batches()
}

pub fn expired_data_column_range_batches(
&mut self,
) -> impl Iterator<Item = (SyncBatch, Instant)> + '_ {
self.data_column_requests.expired_range_batches()
}

pub fn outdated_peers(&mut self, status: StatusMessage) -> Vec<PeerId> {
if let Some(chain) = self.chain_with_max_peer_count() {
let status_chain = ChainId::from(&status);
Expand Down

0 comments on commit 02a186d

Please sign in to comment.