Skip to content

Commit

Permalink
Handle DataColumnsByRoot RPC request (no storage)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tumas committed May 13, 2024
1 parent 8207171 commit a24ede3
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 2 deletions.
25 changes: 25 additions & 0 deletions fork_choice_control/src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use thiserror::Error;
use types::{
combined::{BeaconState, SignedBeaconBlock},
deneb::containers::{BlobIdentifier, BlobSidecar},
eip7594::{DataColumnIdentifier, DataColumnSidecar},
nonstandard::{PayloadStatus, Phase, WithStatus},
phase0::{
containers::{Attestation, Checkpoint, SignedAggregateAndProof},
Expand Down Expand Up @@ -459,6 +460,21 @@ where
.collect()
}

pub fn data_column_sidecars_by_ids(
&self,
data_column_ids: impl IntoIterator<Item = DataColumnIdentifier> + Send,
) -> Result<Vec<Arc<DataColumnSidecar<P>>>> {
let snapshot = self.snapshot();

// TODO(feature/eip7594): data columns from storage
let data_columns = data_column_ids
.into_iter()
.filter_map(|data_column_id| snapshot.cached_data_column_sidecar_by_id(data_column_id))
.collect_vec();

Ok(data_columns)
}

pub fn preprocessed_state_at_current_slot(&self) -> Result<Arc<BeaconState<P>>> {
let store = self.store_snapshot();
let head = store.head();
Expand Down Expand Up @@ -885,6 +901,15 @@ impl<P: Preset, W> Snapshot<'_, P, W> {
) -> Option<Arc<BlobSidecar<P>>> {
self.store_snapshot.cached_blob_sidecar_by_id(blob_id)
}

#[must_use]
pub(crate) fn cached_data_column_sidecar_by_id(
&self,
data_column_id: DataColumnIdentifier,
) -> Option<Arc<DataColumnSidecar<P>>> {
self.store_snapshot
.cached_data_column_sidecar_by_id(data_column_id)
}
}

#[derive(Debug, Error)]
Expand Down
10 changes: 10 additions & 0 deletions fork_choice_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,16 @@ impl<P: Preset> Store<P> {
self.blob_cache.get(blob_id)
}

#[must_use]
pub fn cached_data_column_sidecar_by_id(
&self,
data_column_id: DataColumnIdentifier,
) -> Option<Arc<DataColumnSidecar<P>>> {
self.data_column_cache
.get(&data_column_id)
.map(|(sidecar, _)| (*sidecar).clone_arc())
}

#[must_use]
pub const fn justified_checkpoint(&self) -> Checkpoint {
self.justified_checkpoint
Expand Down
59 changes: 57 additions & 2 deletions p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use eth2_libp2p::{
rpc::{
methods::{
BlobsByRangeRequest, BlobsByRootRequest, BlocksByRangeRequest, BlocksByRootRequest,
DataColumnsByRangeRequest, DataColumnsByRootRequest,
DataColumnsByRangeRequest, DataColumnsByRootRequest, MaxRequestDataColumnSidecars,
},
GoodbyeReason, StatusMessage,
},
Expand All @@ -40,6 +40,7 @@ use slog_stdlog::StdLog;
use ssz::SszHash as _;
use std_ext::ArcExt as _;
use thiserror::Error;
use typenum::Unsigned as _;
use types::{
altair::containers::{SignedContributionAndProof, SyncCommitteeMessage},
capella::containers::SignedBlsToExecutionChange,
Expand Down Expand Up @@ -1159,13 +1160,67 @@ impl<P: Preset> Network<P> {
peer_request_id: PeerRequestId,
request: DataColumnsByRootRequest,
) {
// TODO(feature/eip7549): implement this
self.log(
Level::Debug,
format_args!(
"received DataColumnsByRoot request (peer_id: {peer_id}, request: {request:?})"
),
);

let DataColumnsByRootRequest { data_column_ids } = request;

let controller = self.controller.clone_arc();
let network_to_service_tx = self.network_to_service_tx.clone();

// TODO(feature/eip7549): MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS
let connected_peers = self.network_globals.connected_peers();
let target_peers = self.target_peers;

self.dedicated_executor
.spawn(async move {
// > Clients MAY limit the number of blocks and sidecars in the response.
let data_column_ids = data_column_ids.into_iter().take(
MaxRequestDataColumnSidecars::USIZE
);

let data_column_sidecars = controller.data_column_sidecars_by_ids(data_column_ids)?;

for data_column_sidecar in data_column_sidecars {
log(
Level::Debug,
connected_peers,
target_peers,
format_args!(
"sending DataColumnsSidecarsByRoot response chunk \
(peer_request_id: {peer_request_id:?}, peer_id: {peer_id}, data_column_sidecar: {data_column_sidecar:?})",
),
);

ServiceInboundMessage::SendResponse(
peer_id,
peer_request_id,
Box::new(Response::DataColumnsByRoot(Some(data_column_sidecar))),
)
.send(&network_to_service_tx);
}

log(
Level::Debug,
connected_peers,
target_peers,
"terminating DataColumnsByRoot response stream",
);

ServiceInboundMessage::SendResponse(
peer_id,
peer_request_id,
Box::new(Response::DataColumnsByRoot(None)),
)
.send(&network_to_service_tx);

Ok::<_, anyhow::Error>(())
})
.detach();
}

fn handle_data_columns_by_range_request(
Expand Down

0 comments on commit a24ede3

Please sign in to comment.