Skip to content

Commit

Permalink
Chore: merge gc with archive node.
Browse files Browse the repository at this point in the history
  • Loading branch information
vldm committed Jan 12, 2022
1 parent c7670b4 commit 5ab9021
Show file tree
Hide file tree
Showing 21 changed files with 271 additions and 180 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/src/rpc_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ pub enum TokenAccountsFilter {
mod tests {
use super::*;
use crate::rpc_config::RpcTokenAccountsFilter;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::commitment_config::CommitmentConfig;

#[test]
fn test_build_request_json() {
Expand Down
2 changes: 1 addition & 1 deletion core/src/evm_rpc_impl/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::str::FromStr;

use sha3::{Digest, Keccak256};
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::keyed_account::KeyedAccount;

use crate::rpc::JsonRpcRequestProcessor;
Expand Down
2 changes: 1 addition & 1 deletion core/src/evm_services/block_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
time::Duration,
};

use evm_state::{Block, ChangedState};
use evm_state::Block;

pub type EvmRecorderReceiver = Receiver<Block>;
pub type EvmRecorderSender = Sender<Block>;
Expand Down
7 changes: 3 additions & 4 deletions core/src/evm_services/state_recorder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use solana_ledger::blockstore::Blockstore;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -9,7 +8,7 @@ use std::{
time::Duration,
};

use evm_state::{Block, ChangedState, Storage};
use evm_state::{ChangedState, Storage, H256};

pub type EvmStateRecorderReceiver = Receiver<(H256, ChangedState)>;
pub type EvmStateRecorderSender = Sender<(H256, ChangedState)>;
Expand All @@ -22,8 +21,8 @@ impl EvmStateRecorderService {
#[allow(clippy::new_ret_no_self)]
pub fn new(
evm_recorder_receiver: EvmStateRecorderReceiver,
exit: &Arc<AtomicBool>,
archive: Storage,
exit: &Arc<AtomicBool>,
) -> Self {
let exit = exit.clone();
let thread_hdl = Builder::new()
Expand All @@ -33,7 +32,7 @@ impl EvmStateRecorderService {
break;
}
if let Err(RecvTimeoutError::Disconnected) =
write_evm_record(&archive, &evm_recorder_receiver)
Self::write_evm_record(&archive, &evm_recorder_receiver)
{
break;
}
Expand Down
42 changes: 25 additions & 17 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ pub struct ValidatorConfig {
pub validator_exit: Arc<RwLock<ValidatorExit>>,
pub no_wait_for_vote_to_start_leader: bool,
pub verify_evm_state: bool,
pub enable_evm_archive: bool,
}

impl Default for ValidatorConfig {
Expand Down Expand Up @@ -195,7 +194,6 @@ impl Default for ValidatorConfig {
validator_exit: Arc::new(RwLock::new(ValidatorExit::default())),
no_wait_for_vote_to_start_leader: true,
verify_evm_state: false,
enable_evm_archive: false,
}
}
}
Expand Down Expand Up @@ -424,6 +422,7 @@ impl Validator {
config.enforce_ulimit_nofile,
&start_progress,
config.no_poh_speed_test,
evm_state_archive.clone(),
);

*start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
Expand Down Expand Up @@ -900,6 +899,12 @@ impl Validator {
.expect("evm_block_recorder_service");
}

if let Some(evm_state_recorder_service) = self.evm_state_recorder_service {
evm_state_recorder_service
.join()
.expect("evm_state_recorder_service");
}

if let Some(s) = self.snapshot_packager_service {
s.join().expect("snapshot_packager_service");
}
Expand Down Expand Up @@ -1059,6 +1064,7 @@ fn new_banks_from_ledger(
enforce_ulimit_nofile: bool,
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
no_poh_speed_test: bool,
evm_archive: Option<evm_state::Storage>,
) -> (
GenesisConfig,
BankForks,
Expand Down Expand Up @@ -1154,7 +1160,7 @@ fn new_banks_from_ledger(
blockstore.clone(),
exit,
config.rpc_config.enable_cpi_and_log_storage,
config.enable_evm_archive,
evm_archive,
)
} else {
TransactionHistoryServices::default()
Expand Down Expand Up @@ -1342,7 +1348,7 @@ fn initialize_rpc_transaction_history_services(
blockstore: Arc<Blockstore>,
exit: &Arc<AtomicBool>,
enable_cpi_and_log_storage: bool,
archive_evm_state: bool,
archive_evm_state: Option<evm_state::Storage>,
) -> TransactionHistoryServices {
let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
let (transaction_status_sender, transaction_status_receiver) = unbounded();
Expand Down Expand Up @@ -1381,19 +1387,21 @@ fn initialize_rpc_transaction_history_services(
exit,
));

let (evm_state_recorder_service, evm_state_recorder_sender) = if archive_evm_state {
let (evm_state_recorder_sender, evm_state_recorder_receiver) = unbounded();
let evm_state_recorder_sender = Some(evm_state_recorder_sender);
(
Some(EvmStateRecorderService::new(
evm_state_recorder_receiver,
exit,
)),
evm_state_recorder_sender,
)
} else {
(None, None)
};
let (evm_state_recorder_service, evm_state_recorder_sender) =
if let Some(archive) = archive_evm_state {
let (evm_state_recorder_sender, evm_state_recorder_receiver) = unbounded();
let evm_state_recorder_sender = Some(evm_state_recorder_sender);
(
Some(EvmStateRecorderService::new(
evm_state_recorder_receiver,
archive,
exit,
)),
evm_state_recorder_sender,
)
} else {
(None, None)
};

TransactionHistoryServices {
transaction_status_sender,
Expand Down
5 changes: 5 additions & 0 deletions core/tests/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ mod tests {
DEFINE_SNAPSHOT_VERSION_PARAMETERIZED_TEST_FUNCTIONS!(V1_4_0, Testnet, V1_4_0_Testnet);
DEFINE_SNAPSHOT_VERSION_PARAMETERIZED_TEST_FUNCTIONS!(V1_4_0, MainnetBeta, V1_4_0_MainnetBeta);

DEFINE_SNAPSHOT_VERSION_PARAMETERIZED_TEST_FUNCTIONS!(V1_5_0, Development, V1_5_0_Development);
DEFINE_SNAPSHOT_VERSION_PARAMETERIZED_TEST_FUNCTIONS!(V1_5_0, Devnet, V1_5_0_Devnet);
DEFINE_SNAPSHOT_VERSION_PARAMETERIZED_TEST_FUNCTIONS!(V1_5_0, Testnet, V1_5_0_Testnet);
DEFINE_SNAPSHOT_VERSION_PARAMETERIZED_TEST_FUNCTIONS!(V1_5_0, MainnetBeta, V1_5_0_MainnetBeta);

struct SnapshotTestConfig {
accounts_dir: TempDir,
snapshot_dir: TempDir,
Expand Down
1 change: 0 additions & 1 deletion evm-utils/evm-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ pub struct RPCLogFilter {
pub topics: Option<Vec<Option<RPCTopicFilter>>>,
}


#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct RPCLog {
Expand Down
4 changes: 2 additions & 2 deletions evm-utils/evm-state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ secp256k1 = { version = "0.19.0", features = ["recovery", "global-context"] }
rand2 = { version = "=0.6.1", package = "rand" }
rocksdb = { git = "https://github.com/velas/rust-rocksdb", branch = "transaction", version = "0.17.0", default-features = false }
# rocksdb = { version = "0.16.0", default-features = false }
# triedb = { git = "https://github.com/velas/triedb", branch = "fix/remove-before-add", features = ["rocksdb"] }
triedb = { path = "../../../triedb", features = ["rocksdb"] }
triedb = { git = "https://github.com/velas/triedb", branch = "feat/gc-simple", features = ["rocksdb"] }
# triedb = { path = "../../../triedb", features = ["rocksdb"] }

primitive-types = "0.8.0"
keccak-hash = "0.6"
Expand Down
4 changes: 2 additions & 2 deletions evm-utils/evm-state/benches/bench_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ fn fill_new_db_then_backup(c: &mut Criterion) {
|b, _params| {
b.iter_batched(
|| {
let evm_state = EvmState::load_from(&dir, persist_state.clone())
let evm_state = EvmState::load_from(&dir, persist_state.clone(), false)
.expect("Unable to create new EVM state in temporary directory");

let empty_dir = tempdir().unwrap();
Expand Down Expand Up @@ -229,7 +229,7 @@ fn fill_new_db_then_backup_and_then_backup_again(c: &mut Criterion) {
|b, _params| {
b.iter_batched(
|| {
let evm_state = EvmState::load_from(&dir, persist_state.clone())
let evm_state = EvmState::load_from(&dir, persist_state.clone(), false)
.expect("Unable to create new EVM state in temporary directory");
let _ = evm_state.make_backup().unwrap();
let mut state = match evm_state {
Expand Down
17 changes: 13 additions & 4 deletions evm-utils/evm-state/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,15 @@ pub enum EvmPersistState {
Committed(Committed),
Incomming(Incomming), // Usually bank will never try to freeze banks with persist state.
}
impl EvmPersistState {
pub fn last_root(&self) -> H256 {
match self {
EvmPersistState::Committed(c) => c.block.state_root,
EvmPersistState::Incomming(i) => i.state_root,
}
}
}

impl Default for EvmPersistState {
fn default() -> Self {
Self::Incomming(Incomming::default())
Expand All @@ -529,7 +538,7 @@ impl EvmState {
fs::create_dir(&evm_state)?;
}

Self::load_from(evm_state, Incomming::default())
Self::load_from(evm_state, Incomming::default(), false)
}

pub fn new_from_genesis(
Expand All @@ -556,6 +565,7 @@ impl EvmState {
Self::load_from(
evm_state,
Incomming::new(1, root_hash, H256::zero(), timestamp, version),
false,
)
}

Expand Down Expand Up @@ -584,12 +594,11 @@ impl EvmState {
pub fn load_from<P: AsRef<Path>>(
path: P,
evm_persist_feilds: impl Into<EvmPersistState>,
gc_enabled: bool,
) -> Result<Self, anyhow::Error> {
info!("Open EVM storage {}", path.as_ref().display());

let kvs = KVS::open_persistent(
path, true, // enable gc
)?;
let kvs = KVS::open_persistent(path, gc_enabled)?;

Ok(match evm_persist_feilds.into() {
EvmPersistState::Incomming(i) => EvmBackend::new(i, kvs).into(),
Expand Down
50 changes: 25 additions & 25 deletions evm-utils/evm-state/src/storage/inspectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,61 +179,61 @@ pub mod memorizer {
pub mod streamer {
use super::*;

pub struct AccountsStreamer {
pub struct AccountsStreamer<'a> {
pub source: Storage,
pub destination: Storage,
pub destinations: &'a [Storage],
}

impl TrieInspector for AccountsStreamer {
impl<'a> TrieInspector for AccountsStreamer<'a> {
fn inspect_node<Data: AsRef<[u8]>>(&self, trie_key: H256, node: Data) -> Result<bool> {
let destination = self.destination.db();
destination.put(trie_key, node)?;
for destination in self.destinations {
destination.db().put(trie_key, node.as_ref())?;
}
Ok(false)
}
}

impl DataInspector<H256, Account> for AccountsStreamer {
impl<'a> DataInspector<H256, Account> for AccountsStreamer<'a> {
fn inspect_data(&self, _key: H256, account: Account) -> Result<()> {
let source = self.source.borrow();
let destination = self.destination.borrow();

// - Account Storage
let walker = Walker::new_raw(
source,
StoragesKeysStreamer::new(destination),
StoragesKeysStreamer::new(self.destinations),
NoopInspector,
);
walker.traverse(account.storage_root)?;

// - Account Code
let code_hash = account.code_hash;
if let Some(code_data) = self.source.get::<Codes>(code_hash) {
self.destination.set::<Codes>(code_hash, code_data);
} else {
assert_eq!(code_hash, Code::empty().hash());
for destination in self.destinations {
// - Account Code
let code_hash = account.code_hash;
if let Some(code_data) = self.source.get::<Codes>(code_hash) {
destination.set::<Codes>(code_hash, code_data);
} else {
assert_eq!(code_hash, Code::empty().hash());
}
}

Ok(())
}
}

pub struct StoragesKeysStreamer<Destination> {
destination: Destination,
pub struct StoragesKeysStreamer<'a> {
destinations: &'a [Storage],
}

impl<Destination> StoragesKeysStreamer<Destination> {
fn new(destination: Destination) -> Self {
Self { destination }
impl<'a> StoragesKeysStreamer<'a> {
fn new(destinations: &'a [Storage]) -> Self {
Self { destinations }
}
}

impl<Destination> TrieInspector for StoragesKeysStreamer<Destination>
where
Destination: Borrow<rocksdb::DB>,
{
impl<'a> TrieInspector for StoragesKeysStreamer<'a> {
fn inspect_node<Data: AsRef<[u8]>>(&self, trie_key: H256, node: Data) -> Result<bool> {
let destination = self.destination.borrow();
destination.put(trie_key, node)?;
for destination in self.destinations {
destination.db().put(trie_key, node.as_ref())?;
}
Ok(true)
}
}
Expand Down
Loading

0 comments on commit 5ab9021

Please sign in to comment.