Skip to content

Commit

Permalink
Handle DataColumnsByRange request
Browse files Browse the repository at this point in the history
  • Loading branch information
Tumas committed May 14, 2024
1 parent b749590 commit ccb3a06
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 5 deletions.
25 changes: 24 additions & 1 deletion fork_choice_control/src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ use fork_choice_store::{AggregateAndProofOrigin, AttestationOrigin, ChainLink, S
use helper_functions::misc;
use itertools::Itertools as _;
use serde::Serialize;
use ssz::ContiguousList;
use std_ext::ArcExt;
use thiserror::Error;
use types::{
combined::{BeaconState, SignedBeaconBlock},
deneb::containers::{BlobIdentifier, BlobSidecar},
eip7594::{DataColumnIdentifier, DataColumnSidecar},
eip7594::{ColumnIndex, DataColumnIdentifier, DataColumnSidecar, NumberOfColumns},
nonstandard::{PayloadStatus, Phase, WithStatus},
phase0::{
containers::{Attestation, Checkpoint, SignedAggregateAndProof},
Expand Down Expand Up @@ -475,6 +476,28 @@ where
Ok(data_columns)
}

pub fn data_column_sidecars_by_range(
&self,
range: Range<Slot>,
columns: &ContiguousList<ColumnIndex, NumberOfColumns>,
) -> Result<Vec<Arc<DataColumnSidecar<P>>>> {
let canonical_chain_blocks = self.blocks_by_range(range)?;

let data_column_ids = canonical_chain_blocks
.iter()
.filter_map(|BlockWithRoot { block, root }| {
block.message().body().post_deneb().map(|_| {
columns.iter().map(|index| DataColumnIdentifier {
index: *index,
block_root: *root,
})
})
})
.flatten();

self.data_column_sidecars_by_ids(data_column_ids)
}

pub fn preprocessed_state_at_current_slot(&self) -> Result<Arc<BeaconState<P>>> {
let store = self.store_snapshot();
let head = store.head();
Expand Down
107 changes: 103 additions & 4 deletions p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ use types::{
};

use crate::{
back_sync::Data,
messages::{
ApiToP2p, P2pToAttestationVerifier, P2pToSlasher, P2pToSync, P2pToValidator,
ServiceInboundMessage, ServiceOutboundMessage, SubnetServiceToP2p, SyncToP2p,
Expand Down Expand Up @@ -1252,14 +1251,88 @@ impl<P: Preset> Network<P> {
peer_request_id: PeerRequestId,
request: DataColumnsByRangeRequest,
) -> Result<()> {
// TODO(feature/eip7549): implement this
self.log(
Level::Debug,
format_args!(
"received DataColumnsByRange request (peer_id: {peer_id}, request: {request:?})"
),
);

let DataColumnsByRangeRequest {
start_slot,
count,
columns,
} = request;

let controller = self.controller.clone_arc();

// TODO(feature/eip7594): MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS
// Let data_column_serve_range be
// [max(current_epoch - MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS, EIP7594_FORK_EPOCH), current_epoch].
let start_slot = start_slot.max(misc::compute_start_slot_at_epoch::<P>(
self.controller.chain_config().eip7594_fork_epoch,
));

// > Clients MAY limit the number of blocks and sidecars in the response.
let difference = count.min(MaxRequestDataColumnSidecars::U64);

let current_slot = self.controller.head_slot();
let end_slot = start_slot
.checked_add(difference)
.ok_or(Error::EndSlotOverflow {
start_slot,
difference,
})?
.min(current_slot);

let network_to_service_tx = self.network_to_service_tx.clone();
let connected_peers = self.network_globals.connected_peers();
let target_peers = self.target_peers;

self.dedicated_executor
.spawn(async move {
let mut data_column_sidecars = controller.data_column_sidecars_by_range(start_slot..end_slot, &columns)?;

// The following data column sidecars, where they exist, MUST be sent in (slot, column_index) order.
data_column_sidecars.sort_by_key(|sidecar| (sidecar.slot(), sidecar.index));

for data_column_sidecar in data_column_sidecars {
log(
Level::Debug,
connected_peers,
target_peers,
format_args!(
"sending DataColumnsSidecarsByRange response chunk \
(peer_request_id: {peer_request_id:?}, peer_id: {peer_id}, data_column_sidecar: {data_column_sidecar:?})",
),
);

ServiceInboundMessage::SendResponse(
peer_id,
peer_request_id,
Box::new(Response::DataColumnsByRange(Some(data_column_sidecar))),
)
.send(&network_to_service_tx);
}

log(
Level::Debug,
connected_peers,
target_peers,
"terminating BlobSidecarsByRange response stream",
);

ServiceInboundMessage::SendResponse(
peer_id,
peer_request_id,
Box::new(Response::BlobsByRange(None)),
)
.send(&network_to_service_tx);

Ok::<_, anyhow::Error>(())
})
.detach();

Ok(())
}

Expand Down Expand Up @@ -1569,8 +1642,34 @@ impl<P: Preset> Network<P> {
),
);
}
// TODO(feature/eip7594)
Response::DataColumnsByRange(Some(data_column_sidecar)) => {}
// TODO(feature/eip7594): This appears to be unfinished.
// > Before consuming the next response chunk, the response reader SHOULD verify the
// > data column sidecar is well-formatted, has valid inclusion proof, and is correct w.r.t. the expected KZG commitments
Response::DataColumnsByRange(Some(data_column_sidecar)) => {
self.log(
Level::Debug,
format_args!(
"received DataColumnsByRange response chunk \
(request_id: {request_id}, peer_id: {peer_id}, data_column_sidecar.slot: {:?})",
data_column_sidecar.signed_block_header.message.slot,
),
);

let data_column_identifier = data_column_sidecar.as_ref().into();
let data_column_sidecar_slot = data_column_sidecar.signed_block_header.message.slot;

if self.register_new_received_data_column_sidecar(
data_column_identifier,
data_column_sidecar_slot,
) {
let block_seen = self
.received_block_roots
.contains_key(&data_column_identifier.block_root);

P2pToSync::RequestedDataColumnSidecar(data_column_sidecar, block_seen, peer_id)
.send(&self.channels.p2p_to_sync_tx);
}
}
Response::DataColumnsByRange(None) => {
self.log_with_feature(format_args!(
"peer {peer_id} terminated DataColumnsByRange response stream for request_id: {request_id}",
Expand Down

0 comments on commit ccb3a06

Please sign in to comment.