diff --git a/chain/chain/src/garbage_collection.rs b/chain/chain/src/garbage_collection.rs index ea574264f55..d05bbe9b451 100644 --- a/chain/chain/src/garbage_collection.rs +++ b/chain/chain/src/garbage_collection.rs @@ -4,16 +4,21 @@ use std::{fmt, io}; use near_chain_configs::GCConfig; use near_chain_primitives::Error; +use near_epoch_manager::shard_tracker::ShardTracker; use near_epoch_manager::EpochManagerAdapter; use near_primitives::block::Block; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::get_block_shard_uid; use near_primitives::state_sync::{StateHeaderKey, StatePartKey}; -use near_primitives::types::{BlockHeight, BlockHeightDelta, EpochId, NumBlocks, ShardId}; +use near_primitives::types::{ + AccountId, BlockHeight, BlockHeightDelta, EpochId, NumBlocks, ShardId, +}; use near_primitives::utils::{get_block_shard_id, get_outcome_id_block_hash, index_to_bytes}; +use near_store::adapter::trie_store::get_shard_uid_mapping; use near_store::adapter::{StoreAdapter, StoreUpdateAdapter}; -use near_store::{DBCol, KeyForStateChanges, ShardTries, ShardUId}; +use near_store::{DBCol, KeyForStateChanges, ShardTries, ShardUId, StoreUpdate}; +use crate::sharding::cares_about_shard_this_or_next_epoch; use crate::types::RuntimeAdapter; use crate::{metrics, Chain, ChainStore, ChainStoreAccess, ChainStoreUpdate}; @@ -41,10 +46,21 @@ impl fmt::Debug for GCMode { /// TODO - the reset_data_pre_state_sync function seems to also be used in /// production code. It's used in update_sync_status <- handle_sync_needed <- run_sync_step impl Chain { - pub fn clear_data(&mut self, gc_config: &GCConfig) -> Result<(), Error> { + pub fn clear_data( + &mut self, + gc_config: &GCConfig, + me: Option<&AccountId>, + ) -> Result<(), Error> { let runtime_adapter = self.runtime_adapter.clone(); let epoch_manager = self.epoch_manager.clone(); - self.mut_chain_store().clear_data(gc_config, runtime_adapter, epoch_manager) + let shard_tracker = self.shard_tracker.clone(); + self.mut_chain_store().clear_data( + gc_config, + runtime_adapter, + epoch_manager, + &shard_tracker, + me, + ) } pub fn reset_data_pre_state_sync(&mut self, sync_hash: CryptoHash) -> Result<(), Error> { @@ -137,6 +153,8 @@ impl ChainStore { gc_config: &GCConfig, runtime_adapter: Arc, epoch_manager: Arc, + shard_tracker: &ShardTracker, + me: Option<&AccountId>, ) -> Result<(), Error> { let _span = tracing::debug_span!(target: "garbage_collection", "clear_data").entered(); let tries = runtime_adapter.get_tries(); @@ -206,19 +224,46 @@ impl ChainStore { if prev_block_refcount > 1 { // Block of `prev_hash` starts a Fork, stopping break; - } else if prev_block_refcount == 1 { - debug_assert_eq!(blocks_current_height.len(), 1); - chain_store_update.clear_block_data( - epoch_manager.as_ref(), - *block_hash, - GCMode::Canonical(tries.clone()), - )?; - gc_blocks_remaining -= 1; - } else { + } + if prev_block_refcount < 1 { return Err(Error::GCError( "block on canonical chain shouldn't have refcount 0".into(), )); } + debug_assert_eq!(blocks_current_height.len(), 1); + + let tracked_shards_in_gced_epoch_to_check_for_cleanup = if !shard_tracker + .tracks_all_shards() + && epoch_manager.is_last_block_in_finished_epoch(block_hash)? + { + Some(get_tracked_shards_in_past_epoch( + &chain_store_update, + &epoch_manager, + block_hash, + )?) + } else { + None + }; + + chain_store_update.clear_block_data( + epoch_manager.as_ref(), + *block_hash, + GCMode::Canonical(tries.clone()), + )?; + gc_blocks_remaining -= 1; + + if let Some(potential_shards_for_cleanup) = + tracked_shards_in_gced_epoch_to_check_for_cleanup + { + gc_state( + &mut chain_store_update, + block_hash, + potential_shards_for_cleanup, + &epoch_manager, + shard_tracker, + me, + )?; + } } chain_store_update.update_tail(height)?; chain_store_update.commit()?; @@ -1031,3 +1076,111 @@ impl<'a> ChainStoreUpdate<'a> { self.merge(store_update); } } + +fn get_tracked_shards_in_past_epoch( + chain_store_update: &ChainStoreUpdate, + epoch_manager: &Arc, + past_epoch_block_hash: &CryptoHash, +) -> Result, Error> { + let block = chain_store_update + .get_block(past_epoch_block_hash) + .expect("block data is not expected to be already cleaned"); + let epoch_id = block.header().epoch_id(); + let shard_layout = epoch_manager.get_shard_layout(epoch_id).expect("epoch id must exist"); + let mut tracked_shards = vec![]; + for shard_uid in shard_layout.shard_uids() { + if chain_store_update + .store() + .exists(DBCol::TrieChanges, &get_block_shard_uid(&past_epoch_block_hash, &shard_uid))? + { + tracked_shards.push(shard_uid); + } + } + Ok(tracked_shards) +} + +/// State cleanup for single shard tracking. Removes State of shards that are no longer in use. +/// +/// Potential shards for cleanup are within the set of shards that were tracked between `previous_gc_tail` and `new_gc_tail`. +/// We filter out shards that were tracked between `new_gc_tail` and `head`, as these need to wait for gc to process them. +/// We do not clean up shards that we care about this or next epoch (relative to `head`). +/// +/// With ReshardingV3 we use State mapping strategy (see `DBCol::StateShardUIdMapping`). +/// Therefore, we look at what ShardUIds were and are no longer in use as a DB key prefix for the State column. +fn gc_state( + chain_store_update: &mut ChainStoreUpdate, + last_block_hash_in_gced_epoch: &CryptoHash, + tracked_shards_in_gced_epoch: Vec, + epoch_manager: &Arc, + shard_tracker: &ShardTracker, + me: Option<&AccountId>, +) -> Result<(), Error> { + if tracked_shards_in_gced_epoch.is_empty() || shard_tracker.tracks_all_shards() { + return Ok(()); + } + let store = chain_store_update.store(); + let mut potential_shards_to_cleanup: HashSet = tracked_shards_in_gced_epoch + .iter() + .map(|shard_uid| get_shard_uid_mapping(store, *shard_uid)) + .collect(); + + let last_block_hash = chain_store_update.head()?.last_block_hash; + let last_block_info = epoch_manager.get_block_info(&last_block_hash)?; + let current_shard_layout = epoch_manager.get_shard_layout(last_block_info.epoch_id())?; + for shard_uid in current_shard_layout.shard_uids() { + if !cares_about_shard_this_or_next_epoch( + me, + last_block_info.prev_hash(), + shard_uid.shard_id(), + true, + shard_tracker, + ) { + continue; + } + let mapped_shard_uid = get_shard_uid_mapping(store, shard_uid); + potential_shards_to_cleanup.remove(&mapped_shard_uid); + } + + let mut block_info = last_block_info; + loop { + if potential_shards_to_cleanup.is_empty() { + return Ok(()); + } + let epoch_first_block_info = + epoch_manager.get_block_info(block_info.epoch_first_block())?; + let prev_epoch_last_block_hash = epoch_first_block_info.prev_hash(); + if prev_epoch_last_block_hash == last_block_hash_in_gced_epoch { + break; + } + block_info = epoch_manager.get_block_info(prev_epoch_last_block_hash)?; + let shard_layout = epoch_manager.get_shard_layout(block_info.epoch_id())?; + for shard_uid in shard_layout.shard_uids() { + if !store + .exists(DBCol::TrieChanges, &get_block_shard_uid(&block_info.hash(), &shard_uid))? + { + continue; + } + let mapped_shard_uid = get_shard_uid_mapping(store, shard_uid); + potential_shards_to_cleanup.remove(&mapped_shard_uid); + } + } + let shards_to_cleanup = potential_shards_to_cleanup; + + let mut shard_uid_mappings_to_remove = vec![]; + for kv in store.iter_ser::(DBCol::StateShardUIdMapping) { + let (child_shard_uid_bytes, parent_shard_uid) = kv?; + if shards_to_cleanup.contains(&parent_shard_uid) { + shard_uid_mappings_to_remove.push(child_shard_uid_bytes); + } + } + let mut trie_store_update = store.trie_store().store_update(); + for shard_uid_prefix in shards_to_cleanup { + trie_store_update.delete_shard_uid_prefixed_state(shard_uid_prefix); + } + let mut store_update: StoreUpdate = trie_store_update.into(); + for child_shard_uid_bytes in shard_uid_mappings_to_remove { + store_update.delete(DBCol::StateShardUIdMapping, &child_shard_uid_bytes); + } + chain_store_update.merge(store_update); + Ok(()) +} diff --git a/chain/chain/src/sharding.rs b/chain/chain/src/sharding.rs index 7ca053391ed..cf87b2c8c75 100644 --- a/chain/chain/src/sharding.rs +++ b/chain/chain/src/sharding.rs @@ -1,4 +1,6 @@ +use near_epoch_manager::shard_tracker::ShardTracker; use near_primitives::hash::CryptoHash; +use near_primitives::types::{AccountId, ShardId}; use rand::seq::SliceRandom; use rand::SeedableRng; use rand_chacha::ChaCha20Rng; @@ -13,6 +15,18 @@ pub fn shuffle_receipt_proofs( receipt_proofs.shuffle(&mut rng); } +pub fn cares_about_shard_this_or_next_epoch( + account_id: Option<&AccountId>, + parent_hash: &CryptoHash, + shard_id: ShardId, + is_me: bool, + shard_tracker: &ShardTracker, +) -> bool { + // TODO(robin-near): I think we only need the shard_tracker if is_me is false. + shard_tracker.care_about_shard(account_id, parent_hash, shard_id, is_me) + || shard_tracker.will_care_about_shard(account_id, parent_hash, shard_id, is_me) +} + #[cfg(test)] mod tests { use crate::sharding::shuffle_receipt_proofs; diff --git a/chain/chunks/src/logic.rs b/chain/chunks/src/logic.rs index 935579b1b6b..d1e6d4e8363 100644 --- a/chain/chunks/src/logic.rs +++ b/chain/chunks/src/logic.rs @@ -1,3 +1,4 @@ +use near_chain::sharding::cares_about_shard_this_or_next_epoch; use near_chain::ChainStoreAccess; use near_chain::{ types::EpochManagerAdapter, validate::validate_chunk_proofs, BlockHeader, Chain, ChainStore, @@ -37,18 +38,6 @@ pub fn need_part( Ok(Some(&epoch_manager.get_part_owner(&epoch_id, part_ord)?) == me) } -pub fn cares_about_shard_this_or_next_epoch( - account_id: Option<&AccountId>, - parent_hash: &CryptoHash, - shard_id: ShardId, - is_me: bool, - shard_tracker: &ShardTracker, -) -> bool { - // TODO(robin-near): I think we only need the shard_tracker if is_me is false. - shard_tracker.care_about_shard(account_id, parent_hash, shard_id, is_me) - || shard_tracker.will_care_about_shard(account_id, parent_hash, shard_id, is_me) -} - pub fn get_shards_cares_about_this_or_next_epoch( account_id: Option<&AccountId>, is_me: bool, diff --git a/chain/chunks/src/shards_manager_actor.rs b/chain/chunks/src/shards_manager_actor.rs index cad5e2e7f1b..b01c3779f4d 100644 --- a/chain/chunks/src/shards_manager_actor.rs +++ b/chain/chunks/src/shards_manager_actor.rs @@ -82,8 +82,7 @@ use crate::adapter::ShardsManagerRequestFromClient; use crate::chunk_cache::{EncodedChunksCache, EncodedChunksCacheEntry}; use crate::client::ShardsManagerResponse; use crate::logic::{ - cares_about_shard_this_or_next_epoch, chunk_needs_to_be_fetched_from_archival, - decode_encoded_chunk, make_outgoing_receipts_proofs, + chunk_needs_to_be_fetched_from_archival, decode_encoded_chunk, make_outgoing_receipts_proofs, make_partial_encoded_chunk_from_owned_parts_and_needed_receipts, need_part, need_receipt, }; use crate::metrics; @@ -96,6 +95,7 @@ use near_async::time::Duration; use near_async::time::{self, Clock}; use near_chain::byzantine_assert; use near_chain::near_chain_primitives::error::Error::DBNotFoundErr; +use near_chain::sharding::cares_about_shard_this_or_next_epoch; use near_chain::signature_verification::{ verify_chunk_header_signature_with_epoch_manager, verify_chunk_header_signature_with_epoch_manager_and_parts, diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index acb3591272f..29b127c24dd 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -22,6 +22,7 @@ use near_chain::chain::{ VerifyBlockHashAndSignatureResult, }; use near_chain::orphan::OrphanMissingChunks; +use near_chain::sharding::cares_about_shard_this_or_next_epoch; use near_chain::state_snapshot_actor::SnapshotCallbacks; use near_chain::test_utils::format_hash; use near_chain::types::PrepareTransactionsChunkContext; @@ -36,9 +37,7 @@ use near_chain::{ use near_chain_configs::{ClientConfig, MutableValidatorSigner, UpdateableClientConfig}; use near_chunks::adapter::ShardsManagerRequestFromClient; use near_chunks::client::ShardedTransactionPool; -use near_chunks::logic::{ - cares_about_shard_this_or_next_epoch, decode_encoded_chunk, persist_chunk, -}; +use near_chunks::logic::{decode_encoded_chunk, persist_chunk}; use near_chunks::shards_manager_actor::ShardsManagerActor; use near_client_primitives::debug::ChunkProduction; use near_client_primitives::types::{Error, StateSyncStatus}; diff --git a/chain/client/src/gc_actor.rs b/chain/client/src/gc_actor.rs index a4490e6ad48..6ba06c644d3 100644 --- a/chain/client/src/gc_actor.rs +++ b/chain/client/src/gc_actor.rs @@ -4,7 +4,8 @@ use near_async::messaging::Actor; #[cfg(feature = "test_features")] use near_async::messaging::Handler; use near_chain::{types::RuntimeAdapter, ChainStore, ChainStoreAccess}; -use near_chain_configs::GCConfig; +use near_chain_configs::{GCConfig, MutableValidatorSigner}; +use near_epoch_manager::shard_tracker::ShardTracker; use near_epoch_manager::EpochManagerAdapter; use near_primitives::types::BlockHeight; use near_store::{metadata::DbKind, Store}; @@ -18,6 +19,8 @@ pub struct GCActor { store: ChainStore, runtime_adapter: Arc, epoch_manager: Arc, + shard_tracker: ShardTracker, + validator_signer: MutableValidatorSigner, gc_config: GCConfig, is_archive: bool, /// In some tests we may want to temporarily disable GC @@ -30,6 +33,8 @@ impl GCActor { genesis_height: BlockHeight, runtime_adapter: Arc, epoch_manager: Arc, + shard_tracker: ShardTracker, + validator_signer: MutableValidatorSigner, gc_config: GCConfig, is_archive: bool, ) -> Self { @@ -38,20 +43,27 @@ impl GCActor { runtime_adapter, gc_config, epoch_manager, + shard_tracker, + validator_signer, is_archive, no_gc: false, } } fn clear_data(&mut self) -> Result<(), near_chain::Error> { + let signer = self.validator_signer.get(); + let me = signer.as_ref().map(|signer| signer.validator_id()); // A RPC node should do regular garbage collection. if !self.is_archive { return self.store.clear_data( &self.gc_config, self.runtime_adapter.clone(), self.epoch_manager.clone(), + &self.shard_tracker, + me, ); } + debug_assert!(self.shard_tracker.tracks_all_shards()); // An archival node with split storage should perform garbage collection // on the hot storage. In order to determine if split storage is enabled @@ -64,6 +76,8 @@ impl GCActor { &self.gc_config, self.runtime_adapter.clone(), self.epoch_manager.clone(), + &self.shard_tracker, + me, ); } diff --git a/chain/client/src/test_utils/test_env.rs b/chain/client/src/test_utils/test_env.rs index 5dcf2d55449..c857ac2e267 100644 --- a/chain/client/src/test_utils/test_env.rs +++ b/chain/client/src/test_utils/test_env.rs @@ -97,14 +97,17 @@ impl TestEnv { // runs gc let runtime_adapter = self.clients[id].chain.runtime_adapter.clone(); let epoch_manager = self.clients[id].chain.epoch_manager.clone(); + let shard_tracker = self.clients[id].chain.shard_tracker.clone(); let gc_config = self.clients[id].config.gc.clone(); + let signer = self.clients[id].validator_signer.get(); + let me = signer.as_ref().map(|signer| signer.validator_id()); // A RPC node should do regular garbage collection. if !self.clients[id].config.archive { self.clients[id] .chain .mut_chain_store() - .clear_data(&gc_config, runtime_adapter, epoch_manager) + .clear_data(&gc_config, runtime_adapter, epoch_manager, &shard_tracker, me) .unwrap(); } else { // An archival node with split storage should perform garbage collection @@ -117,7 +120,7 @@ impl TestEnv { self.clients[id] .chain .mut_chain_store() - .clear_data(&gc_config, runtime_adapter, epoch_manager) + .clear_data(&gc_config, runtime_adapter, epoch_manager, &shard_tracker, me) .unwrap(); } else { // An archival node with legacy storage or in the midst of migration to split diff --git a/chain/epoch-manager/src/shard_tracker.rs b/chain/epoch-manager/src/shard_tracker.rs index 55c9331f5dd..0551b2e43dd 100644 --- a/chain/epoch-manager/src/shard_tracker.rs +++ b/chain/epoch-manager/src/shard_tracker.rs @@ -206,6 +206,10 @@ impl ShardTracker { } } } + + pub fn tracks_all_shards(&self) -> bool { + matches!(self.tracked_config, TrackedConfig::AllShards) + } } #[cfg(test)] diff --git a/core/store/src/adapter/flat_store.rs b/core/store/src/adapter/flat_store.rs index d5200e038f2..a82de857838 100644 --- a/core/store/src/adapter/flat_store.rs +++ b/core/store/src/adapter/flat_store.rs @@ -279,6 +279,7 @@ impl<'a> FlatStoreUpdateAdapter<'a> { // helper fn remove_range_by_shard_uid(&mut self, shard_uid: ShardUId, col: DBCol) { + assert!(col != DBCol::State, "can't range delete State column"); let key_from = shard_uid.to_bytes(); let key_to = ShardUId::get_upper_bound_db_key(&key_from); self.store_update.delete_range(col, &key_from, &key_to); diff --git a/core/store/src/adapter/trie_store.rs b/core/store/src/adapter/trie_store.rs index 1671acdeef1..85e060b6513 100644 --- a/core/store/src/adapter/trie_store.rs +++ b/core/store/src/adapter/trie_store.rs @@ -168,6 +168,15 @@ impl<'a> TrieStoreUpdateAdapter<'a> { ) } + /// Remove State of any shard that uses `shard_uid_db_key_prefix` as database key prefix. + /// That is potentially State of any descendant of the shard with the given `ShardUId`. + /// Use with caution, as it might potentially remove the State of a descendant shard that is still in use! + pub fn delete_shard_uid_prefixed_state(&mut self, shard_uid_db_key_prefix: ShardUId) { + let key_from = shard_uid_db_key_prefix.to_bytes(); + let key_to = ShardUId::get_upper_bound_db_key(&key_from); + self.store_update.delete_range(DBCol::State, &key_from, &key_to); + } + pub fn delete_all_state(&mut self) { self.store_update.delete_all(DBCol::State) } diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index 10579acda8a..751382270c9 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -617,10 +617,11 @@ impl StoreUpdate { self.transaction.delete_all(column); } - /// Deletes the given key range from the database including `from` - /// and excluding `to` keys. + /// Deletes the given key range from the database including `from` and excluding `to` keys. + /// + /// Be aware when using with `DBCol::State`! Keys prefixed with a `ShardUId` might be used + /// by a descendant shard. See `DBCol::StateShardUIdMapping` for more context. pub fn delete_range(&mut self, column: DBCol, from: &[u8], to: &[u8]) { - assert!(column != DBCol::State, "can't range delete State column"); self.transaction.delete_range(column, from.to_vec(), to.to_vec()); } diff --git a/integration-tests/src/test_loop/builder.rs b/integration-tests/src/test_loop/builder.rs index 5ae516c598d..8a05df3d69d 100644 --- a/integration-tests/src/test_loop/builder.rs +++ b/integration-tests/src/test_loop/builder.rs @@ -748,6 +748,8 @@ impl TestLoopBuilder { chain_genesis.height, runtime_adapter.clone(), epoch_manager.clone(), + shard_tracker.clone(), + validator_signer.clone(), client_config.gc.clone(), client_config.archive, ); diff --git a/integration-tests/src/test_loop/tests/congestion_control_genesis_bootstrap.rs b/integration-tests/src/test_loop/tests/congestion_control_genesis_bootstrap.rs index 7a9f84c3e75..58ca46d120e 100644 --- a/integration-tests/src/test_loop/tests/congestion_control_genesis_bootstrap.rs +++ b/integration-tests/src/test_loop/tests/congestion_control_genesis_bootstrap.rs @@ -66,7 +66,9 @@ fn test_congestion_control_genesis_bootstrap() { fn check_genesis_congestion_info_in_store(client: &mut Client) { let gc_config = client.config.gc.clone(); - client.chain.clear_data(&gc_config).unwrap(); + let signer = client.validator_signer.get(); + let me = signer.as_ref().map(|signer| signer.validator_id()); + client.chain.clear_data(&gc_config, me).unwrap(); let infos = near_store::get_genesis_congestion_infos(client.chain.chain_store().store()) .unwrap() diff --git a/integration-tests/src/test_loop/tests/resharding_v3.rs b/integration-tests/src/test_loop/tests/resharding_v3.rs index c9413ba5841..1f78ec266a9 100644 --- a/integration-tests/src/test_loop/tests/resharding_v3.rs +++ b/integration-tests/src/test_loop/tests/resharding_v3.rs @@ -566,8 +566,6 @@ fn shard_sequence_to_schedule(mut shard_sequence: Vec) -> Vec