Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

validator: Add CLI args to control rocksdb threadpool sizes #4214

Merged
merged 6 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ledger-tool/src/ledger_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ pub fn open_blockstore(
recovery_mode: wal_recovery_mode.clone(),
enforce_ulimit_nofile,
column_options: LedgerColumnOptions::default(),
steviez marked this conversation as resolved.
Show resolved Hide resolved
..BlockstoreOptions::default()
},
) {
Ok(blockstore) => blockstore,
Expand Down
9 changes: 5 additions & 4 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
blockstore_meta::*,
blockstore_metrics::BlockstoreRpcApiMetrics,
blockstore_options::{
AccessType, BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL,
BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL,
},
blockstore_processor::BlockstoreProcessorError,
leader_schedule_cache::LeaderScheduleCache,
Expand Down Expand Up @@ -90,7 +90,9 @@ pub mod blockstore_purge;
use static_assertions::const_assert_eq;
pub use {
crate::{
blockstore_db::BlockstoreError,
blockstore_db::{
default_num_compaction_threads, default_num_flush_threads, BlockstoreError,
},
blockstore_meta::{OptimisticSlotMetaVersioned, SlotMeta},
blockstore_metrics::BlockstoreInsertionMetrics,
},
Expand Down Expand Up @@ -4961,10 +4963,9 @@ pub fn create_new_ledger(
let blockstore = Blockstore::open_with_options(
ledger_path,
BlockstoreOptions {
access_type: AccessType::Primary,
recovery_mode: None,
enforce_ulimit_nofile: false,
column_options: column_options.clone(),
..BlockstoreOptions::default()
},
)?;
let ticks_per_slot = genesis_config.ticks_per_slot;
Expand Down
47 changes: 33 additions & 14 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use {
fs,
marker::PhantomData,
mem,
num::NonZeroUsize,
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Expand Down Expand Up @@ -415,13 +416,13 @@ pub(crate) struct Rocks {

impl Rocks {
pub(crate) fn open(path: PathBuf, options: BlockstoreOptions) -> Result<Rocks> {
let access_type = options.access_type.clone();
// let access_type = options.access_type.clone();
steviez marked this conversation as resolved.
Show resolved Hide resolved
let recovery_mode = options.recovery_mode.clone();

fs::create_dir_all(&path)?;

// Use default database options
let mut db_options = get_db_options(&access_type);
let mut db_options = get_db_options(&options);
if let Some(recovery_mode) = recovery_mode {
db_options.set_wal_recovery_mode(recovery_mode.into());
}
Expand All @@ -430,7 +431,7 @@ impl Rocks {
let column_options = Arc::from(options.column_options);

// Open the database
let db = match access_type {
let db = match options.access_type {
AccessType::Primary | AccessType::PrimaryForMaintenance => {
DB::open_cf_descriptors(&db_options, &path, cf_descriptors)?
}
Expand All @@ -452,7 +453,7 @@ impl Rocks {
let rocks = Rocks {
db,
path,
access_type,
access_type: options.access_type,
oldest_slot,
column_options,
write_batch_perf_status: PerfSamplingStatus::default(),
Expand Down Expand Up @@ -1991,28 +1992,34 @@ fn process_cf_options_advanced<C: 'static + Column + ColumnName>(
}
}

fn get_db_options(access_type: &AccessType) -> Options {
fn get_db_options(blockstore_options: &BlockstoreOptions) -> Options {
let mut options = Options::default();

// Create missing items to support a clean start
options.create_if_missing(true);
options.create_missing_column_families(true);

// Per the docs, a good value for this is the number of cores on the machine
options.increase_parallelism(num_cpus::get() as i32);

// rocksdb builds two threadpools: low and high priority. The low priority
// pool is used for compactions whereas the high priority pool is used for
// memtable flushes. Separate pools are created so that compactions are
// unable to stall memtable flushes (which could stall memtable writes).
let mut env = rocksdb::Env::new().unwrap();
// While a compaction is ongoing, all the background threads
// could be used by the compaction. This can stall writes which
// need to flush the memtable. Add some high-priority background threads
// which can service these writes.
env.set_high_priority_background_threads(4);
env.set_low_priority_background_threads(
blockstore_options.num_rocksdb_compaction_threads.get() as i32,
);
env.set_high_priority_background_threads(
blockstore_options.num_rocksdb_flush_threads.get() as i32
);
options.set_env(&env);
// The value set in max_background_jobs can grow but not shrink threadpool
// sizes. So, set this value to 2 (the default value and 1 low / 1 high) to
// avoid rocksdb from messing with the sizes we previously configured.
options.set_max_background_jobs(2);
bw-solana marked this conversation as resolved.
Show resolved Hide resolved

// Set max total wal size to 4G.
options.set_max_total_wal_size(4 * 1024 * 1024 * 1024);

if should_disable_auto_compactions(access_type) {
if should_disable_auto_compactions(&blockstore_options.access_type) {
options.set_disable_auto_compactions(true);
}

Expand All @@ -2024,6 +2031,18 @@ fn get_db_options(access_type: &AccessType) -> Options {
options
}

/// The default number of threads to use for rocksdb compaction in the rocksdb
/// low priority threadpool
pub fn default_num_compaction_threads() -> NonZeroUsize {
NonZeroUsize::new(num_cpus::get()).expect("thread count is non-zero")
bw-solana marked this conversation as resolved.
Show resolved Hide resolved
}

/// The default number of threads to use for rocksdb memtable flushes in the
/// rocksdb high priority threadpool
pub fn default_num_flush_threads() -> NonZeroUsize {
NonZeroUsize::new((num_cpus::get() / 4).max(1)).expect("thread count is non-zero")
}

// Returns whether automatic compactions should be disabled for the entire
// database based upon the given access type.
fn should_disable_auto_compactions(access_type: &AccessType) -> bool {
Expand Down
12 changes: 11 additions & 1 deletion ledger/src/blockstore_options.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use rocksdb::{DBCompressionType as RocksCompressionType, DBRecoveryMode};
use {
crate::blockstore_db::{default_num_compaction_threads, default_num_flush_threads},
rocksdb::{DBCompressionType as RocksCompressionType, DBRecoveryMode},
std::num::NonZeroUsize,
};

/// The subdirectory under ledger directory where the Blockstore lives
pub const BLOCKSTORE_DIRECTORY_ROCKS_LEVEL: &str = "rocksdb";
Expand All @@ -13,6 +17,8 @@ pub struct BlockstoreOptions {
// desired open file descriptor limit cannot be configured. Default: true.
pub enforce_ulimit_nofile: bool,
pub column_options: LedgerColumnOptions,
pub num_rocksdb_compaction_threads: NonZeroUsize,
pub num_rocksdb_flush_threads: NonZeroUsize,
}

impl Default for BlockstoreOptions {
Expand All @@ -25,6 +31,8 @@ impl Default for BlockstoreOptions {
recovery_mode: None,
enforce_ulimit_nofile: true,
column_options: LedgerColumnOptions::default(),
num_rocksdb_compaction_threads: default_num_compaction_threads(),
num_rocksdb_flush_threads: default_num_flush_threads(),
}
}
}
Expand All @@ -36,6 +44,8 @@ impl BlockstoreOptions {
recovery_mode: None,
enforce_ulimit_nofile: false,
column_options: LedgerColumnOptions::default(),
num_rocksdb_compaction_threads: default_num_compaction_threads(),
num_rocksdb_flush_threads: default_num_flush_threads(),
}
steviez marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand Down
40 changes: 40 additions & 0 deletions validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub struct DefaultThreadArgs {
pub rayon_global_threads: String,
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
pub rocksdb_compaction_threads: String,
pub rocksdb_flush_threads: String,
pub tvu_receive_threads: String,
pub tvu_sigverify_threads: String,
}
Expand All @@ -36,6 +38,8 @@ impl Default for DefaultThreadArgs {
replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
.to_string(),
rocksdb_compaction_threads: RocksdbCompactionThreadsArg::bounded_default().to_string(),
rocksdb_flush_threads: RocksdbFlushThreadsArg::bounded_default().to_string(),
tvu_receive_threads: TvuReceiveThreadsArg::bounded_default().to_string(),
tvu_sigverify_threads: TvuShredSigverifyThreadsArg::bounded_default().to_string(),
}
Expand All @@ -52,6 +56,8 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
new_thread_arg::<RayonGlobalThreadsArg>(&defaults.rayon_global_threads),
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
new_thread_arg::<RocksdbCompactionThreadsArg>(&defaults.rocksdb_compaction_threads),
new_thread_arg::<RocksdbFlushThreadsArg>(&defaults.rocksdb_flush_threads),
new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
new_thread_arg::<TvuShredSigverifyThreadsArg>(&defaults.tvu_sigverify_threads),
]
Expand All @@ -77,6 +83,8 @@ pub struct NumThreadConfig {
pub rayon_global_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub rocksdb_compaction_threads: NonZeroUsize,
pub rocksdb_flush_threads: NonZeroUsize,
pub tvu_receive_threads: NonZeroUsize,
pub tvu_sigverify_threads: NonZeroUsize,
}
Expand Down Expand Up @@ -119,6 +127,16 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
ReplayTransactionsThreadsArg::NAME,
NonZeroUsize
),
rocksdb_compaction_threads: value_t_or_exit!(
matches,
RocksdbCompactionThreadsArg::NAME,
NonZeroUsize
),
rocksdb_flush_threads: value_t_or_exit!(
matches,
RocksdbFlushThreadsArg::NAME,
NonZeroUsize
),
tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize),
tvu_sigverify_threads: value_t_or_exit!(
matches,
Expand Down Expand Up @@ -257,6 +275,28 @@ impl ThreadArg for ReplayTransactionsThreadsArg {
}
}

struct RocksdbCompactionThreadsArg;
impl ThreadArg for RocksdbCompactionThreadsArg {
const NAME: &'static str = "rocksdb_compaction_threads";
const LONG_NAME: &'static str = "rocksdb-compaction-threads";
const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) compactions";

fn default() -> usize {
solana_ledger::blockstore::default_num_compaction_threads().get()
}
}

struct RocksdbFlushThreadsArg;
impl ThreadArg for RocksdbFlushThreadsArg {
const NAME: &'static str = "rocksdb_flush_threads";
const LONG_NAME: &'static str = "rocksdb-flush-threads";
const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) memtable flushes";

fn default() -> usize {
solana_ledger::blockstore::default_num_flush_threads().get()
}
}

struct TvuReceiveThreadsArg;
impl ThreadArg for TvuReceiveThreadsArg {
const NAME: &'static str = "tvu_receive_threads";
Expand Down
4 changes: 4 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,8 @@ pub fn main() {
rayon_global_threads,
replay_forks_threads,
replay_transactions_threads,
rocksdb_compaction_threads,
rocksdb_flush_threads,
tvu_receive_threads,
tvu_sigverify_threads,
} = cli::thread_args::parse_num_threads_args(&matches);
Expand Down Expand Up @@ -1055,6 +1057,8 @@ pub fn main() {
enforce_ulimit_nofile: true,
// The validator needs primary (read/write)
access_type: AccessType::Primary,
num_rocksdb_compaction_threads: rocksdb_compaction_threads,
num_rocksdb_flush_threads: rocksdb_flush_threads,
};

let accounts_hash_cache_path = matches
Expand Down
Loading