Skip to content

Commit

Permalink
Single shard tracking State cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
staffik committed Jan 14, 2025
1 parent 1885afa commit 73c6ce4
Show file tree
Hide file tree
Showing 18 changed files with 245 additions and 45 deletions.
179 changes: 166 additions & 13 deletions chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@ use std::{fmt, io};

use near_chain_configs::GCConfig;
use near_chain_primitives::Error;
use near_epoch_manager::shard_tracker::ShardTracker;
use near_epoch_manager::EpochManagerAdapter;
use near_primitives::block::Block;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::get_block_shard_uid;
use near_primitives::state_sync::{StateHeaderKey, StatePartKey};
use near_primitives::types::{BlockHeight, BlockHeightDelta, EpochId, NumBlocks, ShardId};
use near_primitives::types::{
AccountId, BlockHeight, BlockHeightDelta, EpochId, NumBlocks, ShardId,
};
use near_primitives::utils::{get_block_shard_id, get_outcome_id_block_hash, index_to_bytes};
use near_store::adapter::trie_store::get_shard_uid_mapping;
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter};
use near_store::{DBCol, KeyForStateChanges, ShardTries, ShardUId};
use near_store::{DBCol, KeyForStateChanges, ShardTries, ShardUId, StoreUpdate};

use crate::sharding::cares_about_shard_this_or_next_epoch;
use crate::types::RuntimeAdapter;
use crate::{metrics, Chain, ChainStore, ChainStoreAccess, ChainStoreUpdate};

Expand Down Expand Up @@ -41,10 +46,21 @@ impl fmt::Debug for GCMode {
/// TODO - the reset_data_pre_state_sync function seems to also be used in
/// production code. It's used in update_sync_status <- handle_sync_needed <- run_sync_step
impl Chain {
pub fn clear_data(&mut self, gc_config: &GCConfig) -> Result<(), Error> {
pub fn clear_data(
&mut self,
gc_config: &GCConfig,
me: Option<&AccountId>,
) -> Result<(), Error> {
let runtime_adapter = self.runtime_adapter.clone();
let epoch_manager = self.epoch_manager.clone();
self.mut_chain_store().clear_data(gc_config, runtime_adapter, epoch_manager)
let shard_tracker = self.shard_tracker.clone();
self.mut_chain_store().clear_data(
gc_config,
runtime_adapter,
epoch_manager,
&shard_tracker,
me,
)
}

pub fn reset_data_pre_state_sync(&mut self, sync_hash: CryptoHash) -> Result<(), Error> {
Expand Down Expand Up @@ -137,6 +153,8 @@ impl ChainStore {
gc_config: &GCConfig,
runtime_adapter: Arc<dyn RuntimeAdapter>,
epoch_manager: Arc<dyn EpochManagerAdapter>,
shard_tracker: &ShardTracker,
me: Option<&AccountId>,
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "garbage_collection", "clear_data").entered();
let tries = runtime_adapter.get_tries();
Expand Down Expand Up @@ -206,19 +224,46 @@ impl ChainStore {
if prev_block_refcount > 1 {
// Block of `prev_hash` starts a Fork, stopping
break;
} else if prev_block_refcount == 1 {
debug_assert_eq!(blocks_current_height.len(), 1);
chain_store_update.clear_block_data(
epoch_manager.as_ref(),
*block_hash,
GCMode::Canonical(tries.clone()),
)?;
gc_blocks_remaining -= 1;
} else {
}
if prev_block_refcount < 1 {
return Err(Error::GCError(
"block on canonical chain shouldn't have refcount 0".into(),
));
}
debug_assert_eq!(blocks_current_height.len(), 1);

let tracked_shards_in_gced_epoch_to_check_for_cleanup = if !shard_tracker
.tracks_all_shards()
&& epoch_manager.is_last_block_in_finished_epoch(block_hash)?
{
Some(get_tracked_shards_in_past_epoch(
&chain_store_update,
&epoch_manager,
block_hash,
)?)
} else {
None
};

chain_store_update.clear_block_data(
epoch_manager.as_ref(),
*block_hash,
GCMode::Canonical(tries.clone()),
)?;
gc_blocks_remaining -= 1;

if let Some(potential_shards_for_cleanup) =
tracked_shards_in_gced_epoch_to_check_for_cleanup
{
gc_state(
&mut chain_store_update,
block_hash,
potential_shards_for_cleanup,
&epoch_manager,
shard_tracker,
me,
)?;
}
}
chain_store_update.update_tail(height)?;
chain_store_update.commit()?;
Expand Down Expand Up @@ -1031,3 +1076,111 @@ impl<'a> ChainStoreUpdate<'a> {
self.merge(store_update);
}
}

fn get_tracked_shards_in_past_epoch(
chain_store_update: &ChainStoreUpdate,
epoch_manager: &Arc<dyn EpochManagerAdapter>,
past_epoch_block_hash: &CryptoHash,
) -> Result<Vec<ShardUId>, Error> {
let block = chain_store_update
.get_block(past_epoch_block_hash)
.expect("block data is not expected to be already cleaned");
let epoch_id = block.header().epoch_id();
let shard_layout = epoch_manager.get_shard_layout(epoch_id).expect("epoch id must exist");
let mut tracked_shards = vec![];
for shard_uid in shard_layout.shard_uids() {
if chain_store_update
.store()
.exists(DBCol::TrieChanges, &get_block_shard_uid(&past_epoch_block_hash, &shard_uid))?
{
tracked_shards.push(shard_uid);
}
}
Ok(tracked_shards)
}

/// State cleanup for single shard tracking. Removes State of shards that are no longer in use.
///
/// Potential shards for cleanup are within the set of shards that were tracked between `previous_gc_tail` and `new_gc_tail`.
/// We filter out shards that were tracked between `new_gc_tail` and `head`, as these need to wait for gc to process them.
/// We do not clean up shards that we care about this or next epoch (relative to `head`).
///
/// With ReshardingV3 we use State mapping strategy (see `DBCol::StateShardUIdMapping`).
/// Therefore, we look at what ShardUIds were and are no longer in use as a DB key prefix for the State column.
fn gc_state(
chain_store_update: &mut ChainStoreUpdate,
last_block_hash_in_gced_epoch: &CryptoHash,
tracked_shards_in_gced_epoch: Vec<ShardUId>,
epoch_manager: &Arc<dyn EpochManagerAdapter>,
shard_tracker: &ShardTracker,
me: Option<&AccountId>,
) -> Result<(), Error> {
if tracked_shards_in_gced_epoch.is_empty() || shard_tracker.tracks_all_shards() {
return Ok(());
}
let store = chain_store_update.store();
let mut potential_shards_to_cleanup: HashSet<ShardUId> = tracked_shards_in_gced_epoch
.iter()
.map(|shard_uid| get_shard_uid_mapping(store, *shard_uid))
.collect();

let last_block_hash = chain_store_update.head()?.last_block_hash;
let last_block_info = epoch_manager.get_block_info(&last_block_hash)?;
let current_shard_layout = epoch_manager.get_shard_layout(last_block_info.epoch_id())?;
for shard_uid in current_shard_layout.shard_uids() {
if !cares_about_shard_this_or_next_epoch(
me,
last_block_info.prev_hash(),
shard_uid.shard_id(),
true,
shard_tracker,
) {
continue;
}
let mapped_shard_uid = get_shard_uid_mapping(store, shard_uid);
potential_shards_to_cleanup.remove(&mapped_shard_uid);
}

let mut block_info = last_block_info;
loop {
if potential_shards_to_cleanup.is_empty() {
return Ok(());
}
let epoch_first_block_info =
epoch_manager.get_block_info(block_info.epoch_first_block())?;
let prev_epoch_last_block_hash = epoch_first_block_info.prev_hash();
if prev_epoch_last_block_hash == last_block_hash_in_gced_epoch {
break;
}
block_info = epoch_manager.get_block_info(prev_epoch_last_block_hash)?;
let shard_layout = epoch_manager.get_shard_layout(block_info.epoch_id())?;
for shard_uid in shard_layout.shard_uids() {
if !store
.exists(DBCol::TrieChanges, &get_block_shard_uid(&block_info.hash(), &shard_uid))?
{
continue;
}
let mapped_shard_uid = get_shard_uid_mapping(store, shard_uid);
potential_shards_to_cleanup.remove(&mapped_shard_uid);
}
}
let shards_to_cleanup = potential_shards_to_cleanup;

let mut shard_uid_mappings_to_remove = vec![];
for kv in store.iter_ser::<ShardUId>(DBCol::StateShardUIdMapping) {
let (child_shard_uid_bytes, parent_shard_uid) = kv?;
if shards_to_cleanup.contains(&parent_shard_uid) {
shard_uid_mappings_to_remove.push(child_shard_uid_bytes);
}
}
let mut trie_store_update = store.trie_store().store_update();
for shard_uid_prefix in shards_to_cleanup {
trie_store_update.delete_shard_uid_prefixed_state(shard_uid_prefix);
}
let mut store_update: StoreUpdate = trie_store_update.into();
for child_shard_uid_bytes in shard_uid_mappings_to_remove {
store_update.delete(DBCol::StateShardUIdMapping, &child_shard_uid_bytes);
}
chain_store_update.merge(store_update);
Ok(())
}
14 changes: 14 additions & 0 deletions chain/chain/src/sharding.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use near_epoch_manager::shard_tracker::ShardTracker;
use near_primitives::hash::CryptoHash;
use near_primitives::types::{AccountId, ShardId};
use rand::seq::SliceRandom;
use rand::SeedableRng;
use rand_chacha::ChaCha20Rng;
Expand All @@ -13,6 +15,18 @@ pub fn shuffle_receipt_proofs<ReceiptProofType>(
receipt_proofs.shuffle(&mut rng);
}

pub fn cares_about_shard_this_or_next_epoch(
account_id: Option<&AccountId>,
parent_hash: &CryptoHash,
shard_id: ShardId,
is_me: bool,
shard_tracker: &ShardTracker,
) -> bool {
// TODO(robin-near): I think we only need the shard_tracker if is_me is false.
shard_tracker.care_about_shard(account_id, parent_hash, shard_id, is_me)
|| shard_tracker.will_care_about_shard(account_id, parent_hash, shard_id, is_me)
}

#[cfg(test)]
mod tests {
use crate::sharding::shuffle_receipt_proofs;
Expand Down
13 changes: 1 addition & 12 deletions chain/chunks/src/logic.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use near_chain::sharding::cares_about_shard_this_or_next_epoch;
use near_chain::ChainStoreAccess;
use near_chain::{
types::EpochManagerAdapter, validate::validate_chunk_proofs, BlockHeader, Chain, ChainStore,
Expand Down Expand Up @@ -37,18 +38,6 @@ pub fn need_part(
Ok(Some(&epoch_manager.get_part_owner(&epoch_id, part_ord)?) == me)
}

pub fn cares_about_shard_this_or_next_epoch(
account_id: Option<&AccountId>,
parent_hash: &CryptoHash,
shard_id: ShardId,
is_me: bool,
shard_tracker: &ShardTracker,
) -> bool {
// TODO(robin-near): I think we only need the shard_tracker if is_me is false.
shard_tracker.care_about_shard(account_id, parent_hash, shard_id, is_me)
|| shard_tracker.will_care_about_shard(account_id, parent_hash, shard_id, is_me)
}

pub fn get_shards_cares_about_this_or_next_epoch(
account_id: Option<&AccountId>,
is_me: bool,
Expand Down
4 changes: 2 additions & 2 deletions chain/chunks/src/shards_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ use crate::adapter::ShardsManagerRequestFromClient;
use crate::chunk_cache::{EncodedChunksCache, EncodedChunksCacheEntry};
use crate::client::ShardsManagerResponse;
use crate::logic::{
cares_about_shard_this_or_next_epoch, chunk_needs_to_be_fetched_from_archival,
decode_encoded_chunk, make_outgoing_receipts_proofs,
chunk_needs_to_be_fetched_from_archival, decode_encoded_chunk, make_outgoing_receipts_proofs,
make_partial_encoded_chunk_from_owned_parts_and_needed_receipts, need_part, need_receipt,
};
use crate::metrics;
Expand All @@ -96,6 +95,7 @@ use near_async::time::Duration;
use near_async::time::{self, Clock};
use near_chain::byzantine_assert;
use near_chain::near_chain_primitives::error::Error::DBNotFoundErr;
use near_chain::sharding::cares_about_shard_this_or_next_epoch;
use near_chain::signature_verification::{
verify_chunk_header_signature_with_epoch_manager,
verify_chunk_header_signature_with_epoch_manager_and_parts,
Expand Down
5 changes: 2 additions & 3 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use near_chain::chain::{
VerifyBlockHashAndSignatureResult,
};
use near_chain::orphan::OrphanMissingChunks;
use near_chain::sharding::cares_about_shard_this_or_next_epoch;
use near_chain::state_snapshot_actor::SnapshotCallbacks;
use near_chain::test_utils::format_hash;
use near_chain::types::PrepareTransactionsChunkContext;
Expand All @@ -36,9 +37,7 @@ use near_chain::{
use near_chain_configs::{ClientConfig, MutableValidatorSigner, UpdateableClientConfig};
use near_chunks::adapter::ShardsManagerRequestFromClient;
use near_chunks::client::ShardedTransactionPool;
use near_chunks::logic::{
cares_about_shard_this_or_next_epoch, decode_encoded_chunk, persist_chunk,
};
use near_chunks::logic::{decode_encoded_chunk, persist_chunk};
use near_chunks::shards_manager_actor::ShardsManagerActor;
use near_client_primitives::debug::ChunkProduction;
use near_client_primitives::types::{Error, StateSyncStatus};
Expand Down
16 changes: 15 additions & 1 deletion chain/client/src/gc_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use near_async::messaging::Actor;
#[cfg(feature = "test_features")]
use near_async::messaging::Handler;
use near_chain::{types::RuntimeAdapter, ChainStore, ChainStoreAccess};
use near_chain_configs::GCConfig;
use near_chain_configs::{GCConfig, MutableValidatorSigner};
use near_epoch_manager::shard_tracker::ShardTracker;
use near_epoch_manager::EpochManagerAdapter;
use near_primitives::types::BlockHeight;
use near_store::{metadata::DbKind, Store};
Expand All @@ -18,6 +19,8 @@ pub struct GCActor {
store: ChainStore,
runtime_adapter: Arc<dyn RuntimeAdapter>,
epoch_manager: Arc<dyn EpochManagerAdapter>,
shard_tracker: ShardTracker,
validator_signer: MutableValidatorSigner,
gc_config: GCConfig,
is_archive: bool,
/// In some tests we may want to temporarily disable GC
Expand All @@ -30,6 +33,8 @@ impl GCActor {
genesis_height: BlockHeight,
runtime_adapter: Arc<dyn RuntimeAdapter>,
epoch_manager: Arc<dyn EpochManagerAdapter>,
shard_tracker: ShardTracker,
validator_signer: MutableValidatorSigner,
gc_config: GCConfig,
is_archive: bool,
) -> Self {
Expand All @@ -38,20 +43,27 @@ impl GCActor {
runtime_adapter,
gc_config,
epoch_manager,
shard_tracker,
validator_signer,
is_archive,
no_gc: false,
}
}

fn clear_data(&mut self) -> Result<(), near_chain::Error> {
let signer = self.validator_signer.get();
let me = signer.as_ref().map(|signer| signer.validator_id());
// A RPC node should do regular garbage collection.
if !self.is_archive {
return self.store.clear_data(
&self.gc_config,
self.runtime_adapter.clone(),
self.epoch_manager.clone(),
&self.shard_tracker,
me,
);
}
debug_assert!(self.shard_tracker.tracks_all_shards());

// An archival node with split storage should perform garbage collection
// on the hot storage. In order to determine if split storage is enabled
Expand All @@ -64,6 +76,8 @@ impl GCActor {
&self.gc_config,
self.runtime_adapter.clone(),
self.epoch_manager.clone(),
&self.shard_tracker,
me,
);
}

Expand Down
Loading

0 comments on commit 73c6ce4

Please sign in to comment.