Skip to content

Commit

Permalink
Data column cache
Browse files Browse the repository at this point in the history
  • Loading branch information
povi committed May 17, 2024
1 parent 02a186d commit 4745714
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 11 deletions.
76 changes: 76 additions & 0 deletions fork_choice_store/src/data_column_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use std::{collections::HashMap, sync::Arc};

use std_ext::ArcExt as _;
use types::{
eip7594::{DataColumnIdentifier, DataColumnSidecar},
nonstandard::DataColumnSidecarWithId,
phase0::primitives::Slot,
preset::Preset,
};

const DATA_COLUMN_RETAIN_DURATION_IN_SLOTS: Slot = 2;

#[derive(Clone, Default)]
pub struct DataColumnCache<P: Preset> {
data_columns: HashMap<DataColumnIdentifier, (Arc<DataColumnSidecar<P>>, Slot, bool)>,
}

impl<P: Preset> DataColumnCache<P> {
pub fn get(&self, data_column_id: DataColumnIdentifier) -> Option<Arc<DataColumnSidecar<P>>> {
Some(self.data_columns.get(&data_column_id)?.0.clone_arc())
}

pub fn has_unpersisted_data_column_sidecars(&self) -> bool {
self.data_columns
.iter()
.any(|(_, (_, _, persisted))| !persisted)
}

pub fn insert(&mut self, data_column_sidecar: Arc<DataColumnSidecar<P>>) {
let slot = data_column_sidecar.signed_block_header.message.slot;
let identifier = data_column_sidecar.as_ref().into();

self.data_columns
.insert(identifier, (data_column_sidecar, slot, false));
}

pub fn mark_persisted_data_columns(
&mut self,
persisted_data_column_ids: Vec<DataColumnIdentifier>,
) {
for data_column_id in persisted_data_column_ids {
self.data_columns
.entry(data_column_id)
.and_modify(|entry| entry.2 = true);
}
}

pub fn on_slot(&mut self, slot: Slot) {
self.data_columns.retain(|_, (_, data_column_slot, _)| {
*data_column_slot + DATA_COLUMN_RETAIN_DURATION_IN_SLOTS >= slot
});
}

pub fn prune_finalized(&mut self, finalized_slot: Slot) {
self.data_columns
.retain(|_, (_, slot, _)| finalized_slot <= *slot);
}

pub fn size(&self) -> usize {
self.data_columns.values().len()
}

pub fn unpersisted_data_column_sidecars(
&self,
) -> impl Iterator<Item = DataColumnSidecarWithId<P>> + '_ {
self.data_columns
.iter()
.filter(|(_, (_, _, persisted))| !persisted)
.map(
|(data_column_id, (data_column_sidecar, _, _))| DataColumnSidecarWithId {
data_column_sidecar: data_column_sidecar.clone_arc(),
data_column_id: *data_column_id,
},
)
}
}
1 change: 1 addition & 0 deletions fork_choice_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub use crate::{
};

mod blob_cache;
mod data_column_cache;
mod error;
mod misc;
mod segment;
Expand Down
19 changes: 8 additions & 11 deletions fork_choice_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use unwrap_none::UnwrapNone as _;

use crate::{
blob_cache::BlobCache,
data_column_cache::DataColumnCache,
error::Error,
misc::{
AggregateAndProofAction, AggregateAndProofOrigin, ApplyBlockChanges, ApplyTickChanges,
Expand Down Expand Up @@ -216,7 +217,7 @@ pub struct Store<P: Preset> {
HashMap<H256, ContiguousList<KzgCommitment, P::MaxBlobCommitmentsPerBlock>>,
>,
blob_cache: BlobCache<P>,
data_column_cache: HashMap<DataColumnIdentifier, (Arc<DataColumnSidecar<P>>, Slot)>,
data_column_cache: DataColumnCache<P>,
rejected_block_roots: HashSet<H256>,
finished_initial_forward_sync: bool,
}
Expand Down Expand Up @@ -286,7 +287,7 @@ impl<P: Preset> Store<P> {
accepted_blob_sidecars: HashMap::default(),
accepted_data_column_sidecars: HashMap::default(),
blob_cache: BlobCache::default(),
data_column_cache: HashMap::default(),
data_column_cache: DataColumnCache::default(),
rejected_block_roots: HashSet::default(),
finished_initial_forward_sync,
}
Expand Down Expand Up @@ -347,9 +348,7 @@ impl<P: Preset> Store<P> {
&self,
data_column_id: DataColumnIdentifier,
) -> Option<Arc<DataColumnSidecar<P>>> {
self.data_column_cache
.get(&data_column_id)
.map(|(sidecar, _)| (*sidecar).clone_arc())
self.data_column_cache.get(data_column_id)
}

#[must_use]
Expand Down Expand Up @@ -2018,6 +2017,8 @@ impl<P: Preset> Store<P> {
self.update_head_segment_id();

self.blob_cache.on_slot(new_tick.slot);
// TODO(feature/eip-7594): uncomment this after implementing persistence
// self.data_column_cache.on_slot(new_tick.slot);

let changes = if self.reorganized(old_head_segment_id) {
ApplyTickChanges::Reorganized {
Expand Down Expand Up @@ -2286,10 +2287,7 @@ impl<P: Preset> Store<P> {

commitments.insert(block_root, data_sidecar.kzg_commitments.clone());

let identifier = data_sidecar.as_ref().into();

self.data_column_cache
.insert(identifier, (data_sidecar, block_header.slot));
self.data_column_cache.insert(data_sidecar);
}

fn insert_block(&mut self, chain_link: ChainLink<P>) -> Result<()> {
Expand Down Expand Up @@ -2603,8 +2601,7 @@ impl<P: Preset> Store<P> {
//
// Data columns must be stored for much longer period than finalization.
// However, that should be done in persistence layer.
self.data_column_cache
.retain(|_, (_, slot)| finalized_slot <= *slot);
self.data_column_cache.prune_finalized(finalized_slot);
self.prune_checkpoint_states();
self.preprocessed_states.prune(finalized_slot);
self.aggregate_and_proof_supersets
Expand Down
7 changes: 7 additions & 0 deletions types/src/nonstandard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
containers::{BlobIdentifier, BlobSidecar},
primitives::{Blob, KzgCommitment, KzgProof},
},
eip7594::{DataColumnIdentifier, DataColumnSidecar},
phase0::{
containers::Attestation,
primitives::{Uint256, UnixSeconds, ValidatorIndex, H256},
Expand Down Expand Up @@ -186,6 +187,12 @@ pub struct BlobSidecarWithId<P: Preset> {
pub blob_id: BlobIdentifier,
}

#[derive(Clone, Debug)]
pub struct DataColumnSidecarWithId<P: Preset> {
pub data_column_sidecar: Arc<DataColumnSidecar<P>>,
pub data_column_id: DataColumnIdentifier,
}

#[derive(Clone, Copy)]
pub struct Participation {
pub previous: ParticipationFlags,
Expand Down

0 comments on commit 4745714

Please sign in to comment.