Skip to content

Commit

Permalink
refactor: Stream genesis record processing
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Mar 24, 2023
1 parent 8864299 commit e3d9630
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 45 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
actix = "0.13.0"
actix-rt = "2.2.0"
anyhow = "1.0.51"
base64 = "0.11"
bigdecimal = "=0.1.0"
Expand Down
4 changes: 2 additions & 2 deletions database/src/adapters/access_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub async fn handle_access_keys(
}

pub(crate) async fn store_access_keys_from_genesis(
pool: &actix_diesel::Database<PgConnection>,
pool: actix_diesel::Database<PgConnection>,
access_keys_models: Vec<models::access_keys::AccessKey>,
) -> anyhow::Result<()> {
info!(
Expand All @@ -161,7 +161,7 @@ pub(crate) async fn store_access_keys_from_genesis(
diesel::insert_into(schema::access_keys::table)
.values(access_keys_models.clone())
.on_conflict_do_nothing()
.execute_async(pool),
.execute_async(&pool),
10,
"Failed to store AccessKeys from genesis".to_string(),
&access_keys_models
Expand Down
4 changes: 2 additions & 2 deletions database/src/adapters/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ pub async fn get_lockup_account_ids_at_block_height(
}

pub(crate) async fn store_accounts_from_genesis(
pool: &actix_diesel::Database<PgConnection>,
pool: actix_diesel::Database<PgConnection>,
accounts_models: Vec<models::accounts::Account>,
) -> anyhow::Result<()> {
info!(
Expand All @@ -265,7 +265,7 @@ pub(crate) async fn store_accounts_from_genesis(
diesel::insert_into(schema::accounts::table)
.values(accounts_models.clone())
.on_conflict_do_nothing()
.execute_async(pool),
.execute_async(&pool),
10,
"Failed to store Accounts from genesis".to_string(),
&accounts_models
Expand Down
148 changes: 109 additions & 39 deletions database/src/adapters/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,125 @@ use near_chain_configs::{Genesis, GenesisValidationMode};
use crate::adapters::access_keys::store_access_keys_from_genesis;
use crate::adapters::accounts::store_accounts_from_genesis;

/// This is an ugly hack that allows to execute an async body on a specified actix runtime.
/// You should only call it from a separate thread!
///
/// ```ignore
/// async fn some_async_function() {
/// let current_actix_system = actix::System::current();
/// tokio::tasks::spawn_blocking(move || {
/// let x = vec![0, 1, 2];
/// x.map(|i| {
/// block_on(current_actix_system, async move {
/// reqwest::get(...).await.text().await
/// })
/// });
/// }
/// }
fn block_on<Fut, T>(
actix_arbiter: &actix_rt::ArbiterHandle,
f: Fut,
) -> Result<T, std::sync::mpsc::RecvError>
where
T: Send + 'static,
Fut: std::future::Future<Output = T> + Send + 'static,
{
let (tx, rx) = std::sync::mpsc::channel();
actix_arbiter.spawn(async move {
let result = f.await;
let _ = tx.send(result);
});
rx.recv()
}

/// Iterates over GenesisRecords and stores selected ones (Accounts, AccessKeys)
/// to database.
/// Separately stores records divided in portions by 5000 to optimize
/// memory usage and minimize database queries
pub async fn store_genesis_records(
pool: &Database<PgConnection>,
pool: Database<PgConnection>,
genesis_file_path: String,
) -> anyhow::Result<()> {
let mut accounts_to_store: Vec<crate::models::accounts::Account> = vec![];
let mut access_keys_to_store: Vec<crate::models::access_keys::AccessKey> = vec![];
tracing::info!(
target: crate::EXPLORER_DATABASE,
"Storing genesis records to database...",
);

let genesis = Genesis::from_file(genesis_file_path, GenesisValidationMode::Full);

let genesis_height = genesis.config.genesis_height;

for record in genesis.records.0 {
if accounts_to_store.len() == 5_000 {
let mut accounts_to_store_chunk = vec![];
std::mem::swap(&mut accounts_to_store, &mut accounts_to_store_chunk);
store_accounts_from_genesis(pool, accounts_to_store_chunk).await?;
}

if access_keys_to_store.len() == 5_000 {
let mut access_keys_to_store_chunk = vec![];
std::mem::swap(&mut access_keys_to_store, &mut access_keys_to_store_chunk);
store_access_keys_from_genesis(pool, access_keys_to_store_chunk).await?;
}

match record {
near_primitives::state_record::StateRecord::Account { account_id, .. } => {
accounts_to_store.push(crate::models::accounts::Account::new_from_genesis(
&account_id,
genesis_height,
));
// Remember the current actix runtime thread in order to be able to
// schedule async function on it from the thread that processes genesis in
// a blocking way.
let actix_system = actix::System::current();
// Spawn the blocking genesis processing on a separate thread
tokio::task::spawn_blocking(move || {
let actix_arbiter = actix_system.arbiter();

let mut accounts_to_store: Vec<crate::models::accounts::Account> = vec![];
let mut access_keys_to_store: Vec<crate::models::access_keys::AccessKey> = vec![];

genesis.for_each_record(|record| {
if accounts_to_store.len() == 5_000 {
let mut accounts_to_store_chunk = vec![];
std::mem::swap(&mut accounts_to_store, &mut accounts_to_store_chunk);
let pool = pool.clone();
block_on(
actix_arbiter,
store_accounts_from_genesis(pool, accounts_to_store_chunk),
)
.expect("storing accounts from genesis failed")
.expect("storing accounts from genesis failed");
}
near_primitives::state_record::StateRecord::AccessKey {
account_id,
public_key,
access_key,
} => {
access_keys_to_store.push(crate::models::access_keys::AccessKey::from_genesis(
&public_key,
&account_id,
&access_key,
genesis_height,
));
if access_keys_to_store.len() == 5_000 {
let mut access_keys_to_store_chunk = vec![];
std::mem::swap(&mut access_keys_to_store, &mut access_keys_to_store_chunk);
let pool = pool.clone();
block_on(
actix_arbiter,
store_access_keys_from_genesis(pool, access_keys_to_store_chunk),
)
.expect("storing access keys from genesis failed")
.expect("storing access keys from genesis failed");
}
_ => {}
};
}

store_accounts_from_genesis(pool, accounts_to_store).await?;
store_access_keys_from_genesis(pool, access_keys_to_store).await?;
match record {
near_primitives::state_record::StateRecord::Account { account_id, .. } => {
accounts_to_store.push(crate::models::accounts::Account::new_from_genesis(
account_id,
genesis_height,
));
}
near_primitives::state_record::StateRecord::AccessKey {
account_id,
public_key,
access_key,
} => {
access_keys_to_store.push(crate::models::access_keys::AccessKey::from_genesis(
public_key,
account_id,
access_key,
genesis_height,
));
}
_ => {}
};
});

let fut = || async move {
store_accounts_from_genesis(pool.clone(), accounts_to_store).await?;
store_access_keys_from_genesis(pool, access_keys_to_store).await?;
anyhow::Result::<()>::Ok(())
};
block_on(actix_arbiter, fut())
.expect("storing leftover accounts and access keys from genesis failed")
.expect("storing leftover accounts and access keys from genesis failed");
})
.await?;

tracing::info!(
target: crate::EXPLORER_DATABASE,
"Genesis records has been stored.",
);
Ok(())
}
2 changes: 1 addition & 1 deletion indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ async fn main() -> anyhow::Result<()> {

if opts.start_options() == &StartOptions::FromGenesis {
let genesis_file_path = download_genesis_file(&opts).await?;
adapters::genesis::store_genesis_records(&pool, genesis_file_path).await?;
adapters::genesis::store_genesis_records(pool.clone(), genesis_file_path).await?;
}

let config: near_lake_framework::LakeConfig = opts.to_lake_config().await;
Expand Down

0 comments on commit e3d9630

Please sign in to comment.