From c70acaa0488d23bb9d1cd07dd9fcef16bb7ddf9c Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 19 Dec 2024 01:24:20 -0800 Subject: [PATCH 1/7] make QUIC tpu QOS parameters configurable --- bench-tps/src/cli.rs | 2 +- core/src/tpu.rs | 39 ++-------- core/src/validator.rs | 63 ++++++++++------ local-cluster/src/local_cluster.rs | 32 +++------ streamer/src/nonblocking/quic.rs | 8 --- streamer/src/nonblocking/stream_throttle.rs | 16 ++--- streamer/src/nonblocking/testing_utilities.rs | 10 +-- streamer/src/quic.rs | 25 ++++--- test-validator/src/lib.rs | 17 ++--- validator/src/admin_rpc_service.rs | 15 +--- validator/src/cli.rs | 72 +++++++++++++++++-- validator/src/main.rs | 48 +++++++++++-- 12 files changed, 204 insertions(+), 143 deletions(-) diff --git a/bench-tps/src/cli.rs b/bench-tps/src/cli.rs index 4525ee5ca12c99..3c5d8e642ac1bd 100644 --- a/bench-tps/src/cli.rs +++ b/bench-tps/src/cli.rs @@ -11,7 +11,7 @@ use { pubkey::Pubkey, signature::{read_keypair_file, Keypair}, }, - solana_streamer::nonblocking::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, + solana_streamer::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC}, std::{ net::{IpAddr, Ipv4Addr}, diff --git a/core/src/tpu.rs b/core/src/tpu.rs index d715bb5c7b0534..2a03333ffb03a6 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -37,10 +37,7 @@ use { }, solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair}, solana_streamer::{ - quic::{ - spawn_server_multi, QuicServerParams, SpawnServerResult, MAX_STAKED_CONNECTIONS, - MAX_UNSTAKED_CONNECTIONS, - }, + quic::{spawn_server_multi, QuicServerParams, SpawnServerResult}, streamer::StakedNodes, }, solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType}, @@ -54,9 +51,6 @@ use { tokio::sync::mpsc::Sender as AsyncSender, }; -// allow multiple connections for NAT and any open/close overlap -pub const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; - pub struct TpuSockets { pub transactions: Vec, pub transaction_forwards: Vec, @@ -115,7 +109,9 @@ impl Tpu { banking_tracer: Arc, tracer_thread_hdl: TracerThread, tpu_enable_udp: bool, - tpu_max_connections_per_ipaddr_per_minute: u64, + tpu_quic_server_config: QuicServerParams, + tpu_fwd_quic_server_config: QuicServerParams, + voe_quic_server_config: QuicServerParams, prioritization_fee_cache: &Arc, block_production_method: BlockProductionMethod, enable_block_production_forwarding: bool, @@ -178,15 +174,7 @@ impl Tpu { vote_packet_sender.clone(), exit.clone(), staked_nodes.clone(), - QuicServerParams { - max_connections_per_peer: 1, - max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute, - coalesce: tpu_coalesce, - max_staked_connections: MAX_STAKED_CONNECTIONS - .saturating_add(MAX_UNSTAKED_CONNECTIONS), - max_unstaked_connections: 0, - ..QuicServerParams::default() - }, + voe_quic_server_config, ) .unwrap(); @@ -203,12 +191,7 @@ impl Tpu { packet_sender, exit.clone(), staked_nodes.clone(), - QuicServerParams { - max_connections_per_peer: MAX_QUIC_CONNECTIONS_PER_PEER, - max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute, - coalesce: tpu_coalesce, - ..QuicServerParams::default() - }, + tpu_quic_server_config, ) .unwrap(); @@ -225,15 +208,7 @@ impl Tpu { forwarded_packet_sender, exit.clone(), staked_nodes.clone(), - QuicServerParams { - max_connections_per_peer: MAX_QUIC_CONNECTIONS_PER_PEER, - max_staked_connections: MAX_STAKED_CONNECTIONS - .saturating_add(MAX_UNSTAKED_CONNECTIONS), - max_unstaked_connections: 0, // Prevent unstaked nodes from forwarding transactions - max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute, - coalesce: tpu_coalesce, - ..QuicServerParams::default() - }, + tpu_fwd_quic_server_config, ) .unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index c3318ee070f2bc..aeb9c2418a482d 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -125,7 +125,10 @@ use { timing::timestamp, }, solana_send_transaction_service::send_transaction_service, - solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, + solana_streamer::{quic::QuicServerParams, socket::SocketAddrSpace, streamer::StakedNodes}, + solana_tpu_client::tpu_client::{ + DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC, + }, solana_turbine::{self, broadcast_stage::BroadcastStageType}, solana_unified_scheduler_pool::DefaultSchedulerPool, solana_vote_program::vote_state, @@ -486,8 +489,33 @@ pub struct ValidatorTpuConfig { pub tpu_connection_pool_size: usize, /// Controls if to enable UDP for TPU tansactions. pub tpu_enable_udp: bool, - /// Controls the new maximum connections per IpAddr per minute - pub tpu_max_connections_per_ipaddr_per_minute: u64, + /// QUIC server config for regular TPU + pub tpu_quic_server_config: QuicServerParams, + /// QUIC server config for TPU forward + pub tpu_fwd_quic_server_config: QuicServerParams, + /// QUIC server config for Vote + pub vote_quic_server_config: QuicServerParams, +} + +pub fn build_validator_tpu_config_for_test(tpu_enable_udp: bool) -> ValidatorTpuConfig { + let mut tpu_quic_server_config = QuicServerParams::default(); + tpu_quic_server_config.max_connections_per_peer = 32; // max connections per IpAddr per minute + + let mut tpu_fwd_quic_server_config = QuicServerParams::default(); + tpu_fwd_quic_server_config.max_connections_per_peer = 32; // max connections per IpAddr per minute + + let mut vote_quic_server_config = QuicServerParams::default(); + vote_quic_server_config.max_connections_per_peer = 32; // max connections per IpAddr per minute + + ValidatorTpuConfig { + use_quic: DEFAULT_TPU_USE_QUIC, + vote_use_quic: DEFAULT_VOTE_USE_QUIC, + tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, + tpu_enable_udp, + tpu_quic_server_config, + tpu_fwd_quic_server_config, + vote_quic_server_config, + } } pub struct Validator { @@ -550,7 +578,9 @@ impl Validator { vote_use_quic, tpu_connection_pool_size, tpu_enable_udp, - tpu_max_connections_per_ipaddr_per_minute, + tpu_quic_server_config, + tpu_fwd_quic_server_config, + vote_quic_server_config, } = tpu_config; let start_time = Instant::now(); @@ -1533,7 +1563,9 @@ impl Validator { banking_tracer, tracer_thread, tpu_enable_udp, - tpu_max_connections_per_ipaddr_per_minute, + tpu_quic_server_config, + tpu_fwd_quic_server_config, + vote_quic_server_config, &prioritization_fee_cache, config.block_production_method.clone(), config.enable_block_production_forwarding, @@ -2756,10 +2788,7 @@ mod tests { get_tmp_ledger_path_auto_delete, }, solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig}, - solana_tpu_client::tpu_client::{ - DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, - DEFAULT_VOTE_USE_QUIC, - }, + solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP, std::{fs::remove_dir_all, thread, time::Duration}, }; @@ -2797,13 +2826,7 @@ mod tests { None, // rpc_to_plugin_manager_receiver start_progress.clone(), SocketAddrSpace::Unspecified, - ValidatorTpuConfig { - use_quic: DEFAULT_TPU_USE_QUIC, - vote_use_quic: DEFAULT_VOTE_USE_QUIC, - tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, - tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP, - tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test - }, + build_validator_tpu_config_for_test(DEFAULT_TPU_ENABLE_UDP), Arc::new(RwLock::new(None)), ) .expect("assume successful validator start"); @@ -3019,13 +3042,7 @@ mod tests { None, // rpc_to_plugin_manager_receiver Arc::new(RwLock::new(ValidatorStartProgress::default())), SocketAddrSpace::Unspecified, - ValidatorTpuConfig { - use_quic: DEFAULT_TPU_USE_QUIC, - vote_use_quic: DEFAULT_VOTE_USE_QUIC, - tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, - tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP, - tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test - }, + build_validator_tpu_config_for_test(DEFAULT_TPU_ENABLE_UDP), Arc::new(RwLock::new(None)), ) .expect("assume successful validator start") diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index ce3d82dcddfe84..759b3a2664c96e 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -11,7 +11,9 @@ use { solana_client::connection_cache::ConnectionCache, solana_core::{ consensus::tower_storage::FileTowerStorage, - validator::{Validator, ValidatorConfig, ValidatorStartProgress, ValidatorTpuConfig}, + validator::{ + build_validator_tpu_config_for_test, Validator, ValidatorConfig, ValidatorStartProgress, + }, }, solana_gossip::{ cluster_info::Node, @@ -341,15 +343,9 @@ impl LocalCluster { None, // rpc_to_plugin_manager_receiver Arc::new(RwLock::new(ValidatorStartProgress::default())), socket_addr_space, - ValidatorTpuConfig { - use_quic: DEFAULT_TPU_USE_QUIC, - vote_use_quic: DEFAULT_VOTE_USE_QUIC, - tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, - // We are turning tpu_enable_udp to true in order to prevent concurrent local cluster tests - // to use the same QUIC ports due to SO_REUSEPORT. - tpu_enable_udp: true, - tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute - }, + // We are turning tpu_enable_udp to true in order to prevent concurrent local cluster tests + // to use the same QUIC ports due to SO_REUSEPORT. + build_validator_tpu_config_for_test(true), Arc::new(RwLock::new(None)), ) .expect("assume successful validator start"); @@ -553,13 +549,7 @@ impl LocalCluster { None, // rpc_to_plugin_manager_receiver Arc::new(RwLock::new(ValidatorStartProgress::default())), socket_addr_space, - ValidatorTpuConfig { - use_quic: DEFAULT_TPU_USE_QUIC, - vote_use_quic: DEFAULT_VOTE_USE_QUIC, - tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, - tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP, - tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per mintute - }, + build_validator_tpu_config_for_test(DEFAULT_TPU_ENABLE_UDP), Arc::new(RwLock::new(None)), ) .expect("assume successful validator start"); @@ -1089,13 +1079,7 @@ impl Cluster for LocalCluster { None, // rpc_to_plugin_manager_receiver Arc::new(RwLock::new(ValidatorStartProgress::default())), socket_addr_space, - ValidatorTpuConfig { - use_quic: DEFAULT_TPU_USE_QUIC, - vote_use_quic: DEFAULT_VOTE_USE_QUIC, - tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, - tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP, - tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute, use higher value because of tests - }, + build_validator_tpu_config_for_test(DEFAULT_TPU_ENABLE_UDP), Arc::new(RwLock::new(None)), ) .expect("assume successful validator start"); diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 002a00bb12674a..1ed62a56226bf5 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -87,14 +87,6 @@ const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many"; const CONNECTION_CLOSE_CODE_INVALID_STREAM: u32 = 5; const CONNECTION_CLOSE_REASON_INVALID_STREAM: &[u8] = b"invalid_stream"; -/// Limit to 250K PPS -pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250; - -/// The new connections per minute from a particular IP address. -/// Heuristically set to the default maximum concurrent connections -/// per IP address. Might be adjusted later. -pub const DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE: u64 = 8; - /// Total new connection counts per second. Heuristically taken from /// the default staked and unstaked connection limits. Might be adjusted /// later. diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index a9db82874b9c21..7baeab7d5b1646 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -236,10 +236,8 @@ pub mod test { use { super::*, crate::{ - nonblocking::{ - quic::DEFAULT_MAX_STREAMS_PER_MS, stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS, - }, - quic::{StreamerStats, MAX_UNSTAKED_CONNECTIONS}, + nonblocking::stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS, + quic::{StreamerStats, DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_MAX_UNSTAKED_CONNECTIONS}, }, std::{ sync::{atomic::Ordering, Arc}, @@ -251,7 +249,7 @@ pub mod test { fn test_max_streams_for_unstaked_connection() { let load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamerStats::default()), - MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_UNSTAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, )); // 25K packets per ms * 20% / 500 max unstaked connections @@ -268,7 +266,7 @@ pub mod test { fn test_max_streams_for_staked_connection() { let load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamerStats::default()), - MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_UNSTAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, )); @@ -448,7 +446,7 @@ pub mod test { fn test_update_ema() { let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamerStats::default()), - MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_UNSTAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, )); stream_load_ema @@ -477,7 +475,7 @@ pub mod test { fn test_update_ema_missing_interval() { let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamerStats::default()), - MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_UNSTAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, )); stream_load_ema @@ -497,7 +495,7 @@ pub mod test { fn test_update_ema_if_needed() { let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamerStats::default()), - MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_UNSTAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, )); stream_load_ema diff --git a/streamer/src/nonblocking/testing_utilities.rs b/streamer/src/nonblocking/testing_utilities.rs index fc0c3a801784e0..81cb8e9c094d76 100644 --- a/streamer/src/nonblocking/testing_utilities.rs +++ b/streamer/src/nonblocking/testing_utilities.rs @@ -2,13 +2,13 @@ use { super::quic::{ spawn_server_multi, SpawnNonBlockingServerResult, ALPN_TPU_PROTOCOL_ID, - DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, }, crate::{ quic::{ - QuicServerParams, StreamerStats, DEFAULT_TPU_COALESCE, MAX_STAKED_CONNECTIONS, - MAX_UNSTAKED_CONNECTIONS, + QuicServerParams, StreamerStats, DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, + DEFAULT_MAX_STAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_MAX_UNSTAKED_CONNECTIONS, DEFAULT_TPU_COALESCE, }, streamer::StakedNodes, }, @@ -63,8 +63,8 @@ impl Default for TestServerConfig { fn default() -> Self { Self { max_connections_per_peer: 1, - max_staked_connections: MAX_STAKED_CONNECTIONS, - max_unstaked_connections: MAX_UNSTAKED_CONNECTIONS, + max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS, + max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS, max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS, max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, } diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 85760f096f1932..a08ebcab507d41 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -1,9 +1,6 @@ use { crate::{ - nonblocking::quic::{ - ALPN_TPU_PROTOCOL_ID, DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, - DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, - }, + nonblocking::quic::{ALPN_TPU_PROTOCOL_ID, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, streamer::StakedNodes, }, crossbeam_channel::Sender, @@ -32,8 +29,19 @@ use { tokio::runtime::Runtime, }; -pub const MAX_STAKED_CONNECTIONS: usize = 2000; -pub const MAX_UNSTAKED_CONNECTIONS: usize = 500; +// allow multiple connections for NAT and any open/close overlap +pub const DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; + +pub const DEFAULT_MAX_STAKED_CONNECTIONS: usize = 2000; +pub const DEFAULT_MAX_UNSTAKED_CONNECTIONS: usize = 500; + +/// Limit to 250K PPS +pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250; + +/// The new connections per minute from a particular IP address. +/// Heuristically set to the default maximum concurrent connections +/// per IP address. Might be adjusted later. +pub const DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE: u64 = 8; // This will be adjusted and parameterized in follow-on PRs. pub const DEFAULT_QUIC_ENDPOINTS: usize = 1; @@ -564,6 +572,7 @@ pub fn spawn_server( ) } +#[derive(Clone)] pub struct QuicServerParams { pub max_connections_per_peer: usize, pub max_staked_connections: usize, @@ -578,8 +587,8 @@ impl Default for QuicServerParams { fn default() -> Self { QuicServerParams { max_connections_per_peer: 1, - max_staked_connections: MAX_STAKED_CONNECTIONS, - max_unstaked_connections: MAX_UNSTAKED_CONNECTIONS, + max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS, + max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS, max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS, max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, diff --git a/test-validator/src/lib.rs b/test-validator/src/lib.rs index 66ea5a914647cc..274d4c5e69bf8b 100644 --- a/test-validator/src/lib.rs +++ b/test-validator/src/lib.rs @@ -13,7 +13,9 @@ use { solana_core::{ admin_rpc_post_init::AdminRpcRequestMetadataPostInit, consensus::tower_storage::TowerStorage, - validator::{Validator, ValidatorConfig, ValidatorStartProgress, ValidatorTpuConfig}, + validator::{ + build_validator_tpu_config_for_test, Validator, ValidatorConfig, ValidatorStartProgress, + }, }, solana_feature_set::FEATURE_NAMES, solana_geyser_plugin_manager::{ @@ -56,10 +58,7 @@ use { signature::{read_keypair_file, write_keypair_file, Keypair, Signer}, }, solana_streamer::socket::SocketAddrSpace, - solana_tpu_client::tpu_client::{ - DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, - DEFAULT_VOTE_USE_QUIC, - }, + solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP, std::{ collections::{HashMap, HashSet}, ffi::OsStr, @@ -1046,13 +1045,7 @@ impl TestValidator { rpc_to_plugin_manager_receiver, config.start_progress.clone(), socket_addr_space, - ValidatorTpuConfig { - use_quic: DEFAULT_TPU_USE_QUIC, - vote_use_quic: DEFAULT_VOTE_USE_QUIC, - tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, - tpu_enable_udp: config.tpu_enable_udp, - tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test - }, + build_validator_tpu_config_for_test(config.tpu_enable_udp), config.admin_rpc_service_post_init.clone(), )?); diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 851c2959e97c0b..6c86aea768a085 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -871,7 +871,7 @@ mod tests { }, solana_core::{ consensus::tower_storage::NullTowerStorage, - validator::{Validator, ValidatorConfig, ValidatorTpuConfig}, + validator::{build_validator_tpu_config_for_test, Validator, ValidatorConfig}, }, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_inline_spl::token, @@ -893,10 +893,7 @@ mod tests { system_program, }, solana_streamer::socket::SocketAddrSpace, - solana_tpu_client::tpu_client::{ - DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, - DEFAULT_VOTE_USE_QUIC, - }, + solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP, spl_token_2022::{ solana_program::{program_option::COption, program_pack::Pack}, state::{Account as TokenAccount, AccountState as TokenAccountState, Mint}, @@ -1398,13 +1395,7 @@ mod tests { None, // rpc_to_plugin_manager_receiver start_progress.clone(), SocketAddrSpace::Unspecified, - ValidatorTpuConfig { - use_quic: DEFAULT_TPU_USE_QUIC, - vote_use_quic: DEFAULT_VOTE_USE_QUIC, - tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, - tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP, - tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test - }, + build_validator_tpu_config_for_test(DEFAULT_TPU_ENABLE_UDP), post_init, ) .expect("assume successful validator start"); diff --git a/validator/src/cli.rs b/validator/src/cli.rs index aed7a3bffc1d9f..8f14c058526001 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -47,17 +47,18 @@ use { solana_send_transaction_service::send_transaction_service::{ self, MAX_BATCH_SEND_RATE_MS, MAX_TRANSACTION_BATCH_SIZE, }, - solana_streamer::quic::DEFAULT_QUIC_ENDPOINTS, + solana_streamer::quic::{ + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER, + DEFAULT_MAX_STAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_MAX_UNSTAKED_CONNECTIONS, DEFAULT_QUIC_ENDPOINTS, + }, solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_VOTE_USE_QUIC}, solana_unified_scheduler_pool::DefaultSchedulerPool, std::{path::PathBuf, str::FromStr}, }; pub mod thread_args; -use { - solana_streamer::nonblocking::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, - thread_args::{thread_args, DefaultThreadArgs}, -}; +use thread_args::{thread_args, DefaultThreadArgs}; const EXCLUDE_KEY: &str = "account-index-exclude-key"; const INCLUDE_KEY: &str = "account-index-include-key"; @@ -906,6 +907,51 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .hidden(hidden_unless_forced()) .help("Controls if to use QUIC to send votes."), ) + .arg( + Arg::with_name("tpu_max_connections_per_peer") + .long("tpu-max-connections-per-peer") + .takes_value(true) + .default_value(&default_args.tpu_max_connections_per_peer) + .validator(is_parsable::) + .hidden(hidden_unless_forced()) + .help("Controls the max concurrent connections per IpAddr."), + ) + .arg( + Arg::with_name("max_tpu_staked_connections") + .long("max-tpu-staked-connections") + .takes_value(true) + .default_value(&default_args.max_tpu_staked_connections) + .validator(is_parsable::) + .hidden(hidden_unless_forced()) + .help("Controls the max concurrent connections for TPU from staked nodes."), + ) + .arg( + Arg::with_name("max_tpu_unstaked_connections") + .long("max-tpu-unstaked-connections") + .takes_value(true) + .default_value(&default_args.max_tpu_unstaked_connections) + .validator(is_parsable::) + .hidden(hidden_unless_forced()) + .help("Controls the max concurrent connections fort TPU from unstaked nodes."), + ) + .arg( + Arg::with_name("max_fwd_staked_connections") + .long("max-fwd-staked-connections") + .takes_value(true) + .default_value(&default_args.max_fwd_staked_connections) + .validator(is_parsable::) + .hidden(hidden_unless_forced()) + .help("Controls the max concurrent connections for TPU-forward from staked nodes."), + ) + .arg( + Arg::with_name("max_fwd_unstaked_connections") + .long("max-fwd-unstaked-connections") + .takes_value(true) + .default_value(&default_args.max_fwd_unstaked_connections) + .validator(is_parsable::) + .hidden(hidden_unless_forced()) + .help("Controls the max concurrent connections for TPU-forward from unstaked nodes."), + ) .arg( Arg::with_name("num_quic_endpoints") .long("num-quic-endpoints") @@ -2300,7 +2346,15 @@ pub struct DefaultArgs { pub accounts_shrink_optimize_total_space: String, pub accounts_shrink_ratio: String, pub tpu_connection_pool_size: String, + + pub tpu_max_connections_per_peer: String, pub tpu_max_connections_per_ipaddr_per_minute: String, + pub max_tpu_staked_connections: String, + pub max_tpu_unstaked_connections: String, + pub max_fwd_staked_connections: String, + pub max_fwd_unstaked_connections: String, + pub max_streams_per_ms: String, + pub num_quic_endpoints: String, pub vote_use_quic: String, @@ -2395,6 +2449,14 @@ impl DefaultArgs { tpu_max_connections_per_ipaddr_per_minute: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE.to_string(), vote_use_quic: DEFAULT_VOTE_USE_QUIC.to_string(), + tpu_max_connections_per_peer: DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER.to_string(), + max_tpu_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS.to_string(), + max_tpu_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS.to_string(), + max_fwd_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS + .saturating_add(DEFAULT_MAX_UNSTAKED_CONNECTIONS) + .to_string(), + max_fwd_unstaked_connections: 0.to_string(), + max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS.to_string(), num_quic_endpoints: DEFAULT_QUIC_ENDPOINTS.to_string(), rpc_max_request_body_size: MAX_REQUEST_BODY_SIZE.to_string(), exit_min_idle_time: "10".to_string(), diff --git a/validator/src/main.rs b/validator/src/main.rs index a7de615b3be9ac..2f82764af5d79b 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -74,7 +74,7 @@ use { signature::{read_keypair, Keypair, Signer}, }, solana_send_transaction_service::send_transaction_service, - solana_streamer::socket::SocketAddrSpace, + solana_streamer::{quic::QuicServerParams, socket::SocketAddrSpace}, solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP, std::{ collections::{HashSet, VecDeque}, @@ -1136,8 +1136,6 @@ pub fn main() { }; let tpu_connection_pool_size = value_t_or_exit!(matches, "tpu_connection_pool_size", usize); - let tpu_max_connections_per_ipaddr_per_minute = - value_t_or_exit!(matches, "tpu_max_connections_per_ipaddr_per_minute", u64); let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64); if !(0.0..=1.0).contains(&shrink_ratio) { @@ -1968,6 +1966,21 @@ pub fn main() { }); let num_quic_endpoints = value_t_or_exit!(matches, "num_quic_endpoints", NonZeroUsize); + + let tpu_max_connections_per_peer = + value_t_or_exit!(matches, "tpu_max_connections_per_peer", u64); + let max_tpu_staked_connections = value_t_or_exit!(matches, "max_tpu_staked_connections", u64); + let max_tpu_unstaked_connections = + value_t_or_exit!(matches, "max_tpu_unstaked_connections", u64); + + let max_fwd_staked_connections = value_t_or_exit!(matches, "max_fwd_staked_connections", u64); + let max_fwd_unstaked_connections = + value_t_or_exit!(matches, "max_fwd_unstaked_connections", u64); + + let tpu_max_connections_per_ipaddr_per_minute: u64 = + value_t_or_exit!(matches, "tpu_max_connections_per_ipaddr_per_minute", u64); + let max_streams_per_ms = value_t_or_exit!(matches, "max_streams_per_ms", u64); + let node_config = NodeConfig { gossip_addr, port_range: dynamic_port_range, @@ -2071,6 +2084,31 @@ pub fn main() { // the one pushed by bootstrap. node.info.hot_swap_pubkey(identity_keypair.pubkey()); + let tpu_quic_server_config = QuicServerParams { + max_connections_per_peer: tpu_max_connections_per_peer.try_into().unwrap(), + max_staked_connections: max_tpu_staked_connections.try_into().unwrap(), + max_unstaked_connections: max_tpu_unstaked_connections.try_into().unwrap(), + max_streams_per_ms, + max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute, + coalesce: tpu_coalesce, + ..Default::default() + }; + + let tpu_fwd_quic_server_config = QuicServerParams { + max_connections_per_peer: tpu_max_connections_per_peer.try_into().unwrap(), + max_staked_connections: max_fwd_staked_connections.try_into().unwrap(), + max_unstaked_connections: max_fwd_unstaked_connections.try_into().unwrap(), + max_streams_per_ms, + max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute, + coalesce: tpu_coalesce, + ..Default::default() + }; + + // Vote shares TPU forward's characteristics, except that we accept 1 connection + // per peer. + let mut vote_quic_server_config = tpu_fwd_quic_server_config.clone(); + vote_quic_server_config.max_connections_per_peer = 1; + let validator = match Validator::new( node, identity_keypair, @@ -2088,7 +2126,9 @@ pub fn main() { vote_use_quic, tpu_connection_pool_size, tpu_enable_udp, - tpu_max_connections_per_ipaddr_per_minute, + tpu_quic_server_config, + tpu_fwd_quic_server_config, + vote_quic_server_config, }, admin_service_post_init, ) { From 656af6e68adfd13fcefe64b0cad2f90c5879e80e Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 19 Dec 2024 01:38:45 -0800 Subject: [PATCH 2/7] Use max_connections_per_ipaddr_per_min --- core/src/validator.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index aeb9c2418a482d..aa81384f32f108 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -499,13 +499,13 @@ pub struct ValidatorTpuConfig { pub fn build_validator_tpu_config_for_test(tpu_enable_udp: bool) -> ValidatorTpuConfig { let mut tpu_quic_server_config = QuicServerParams::default(); - tpu_quic_server_config.max_connections_per_peer = 32; // max connections per IpAddr per minute + tpu_quic_server_config.max_connections_per_ipaddr_per_min = 32; let mut tpu_fwd_quic_server_config = QuicServerParams::default(); - tpu_fwd_quic_server_config.max_connections_per_peer = 32; // max connections per IpAddr per minute + tpu_fwd_quic_server_config.max_connections_per_ipaddr_per_min = 32; let mut vote_quic_server_config = QuicServerParams::default(); - vote_quic_server_config.max_connections_per_peer = 32; // max connections per IpAddr per minute + vote_quic_server_config.max_connections_per_ipaddr_per_min = 32; ValidatorTpuConfig { use_quic: DEFAULT_TPU_USE_QUIC, From e8f55acb3ae9521855d3c540e3ab7a4cdd290953 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 19 Dec 2024 01:44:17 -0800 Subject: [PATCH 3/7] set max_unstaked_connections to 0 for tpu-fwd and vote in testing --- core/src/validator.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/validator.rs b/core/src/validator.rs index aa81384f32f108..25d311df3f484d 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -497,15 +497,19 @@ pub struct ValidatorTpuConfig { pub vote_quic_server_config: QuicServerParams, } +/// A convenient function to build a ValidatorTpuConfig for testing with good +/// default. pub fn build_validator_tpu_config_for_test(tpu_enable_udp: bool) -> ValidatorTpuConfig { let mut tpu_quic_server_config = QuicServerParams::default(); tpu_quic_server_config.max_connections_per_ipaddr_per_min = 32; let mut tpu_fwd_quic_server_config = QuicServerParams::default(); tpu_fwd_quic_server_config.max_connections_per_ipaddr_per_min = 32; + tpu_fwd_quic_server_config.max_unstaked_connections = 0; let mut vote_quic_server_config = QuicServerParams::default(); vote_quic_server_config.max_connections_per_ipaddr_per_min = 32; + vote_quic_server_config.max_unstaked_connections = 0; ValidatorTpuConfig { use_quic: DEFAULT_TPU_USE_QUIC, From 865bcce4c21d39991a901ab3a13eb696ab55f550 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 19 Dec 2024 09:55:52 -0800 Subject: [PATCH 4/7] fixed some clippy complaint --- core/src/validator.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index 25d311df3f484d..204c798da98fbb 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -500,16 +500,19 @@ pub struct ValidatorTpuConfig { /// A convenient function to build a ValidatorTpuConfig for testing with good /// default. pub fn build_validator_tpu_config_for_test(tpu_enable_udp: bool) -> ValidatorTpuConfig { - let mut tpu_quic_server_config = QuicServerParams::default(); - tpu_quic_server_config.max_connections_per_ipaddr_per_min = 32; + let tpu_quic_server_config = QuicServerParams { + max_connections_per_ipaddr_per_min: 32, + ..Default::default() + }; - let mut tpu_fwd_quic_server_config = QuicServerParams::default(); - tpu_fwd_quic_server_config.max_connections_per_ipaddr_per_min = 32; - tpu_fwd_quic_server_config.max_unstaked_connections = 0; + let tpu_fwd_quic_server_config = QuicServerParams { + max_connections_per_ipaddr_per_min: 32, + max_unstaked_connections: 0, + ..Default::default() + }; - let mut vote_quic_server_config = QuicServerParams::default(); - vote_quic_server_config.max_connections_per_ipaddr_per_min = 32; - vote_quic_server_config.max_unstaked_connections = 0; + // vote and tpu_fwd share the same characteristics -- disallow non-staked connections: + let vote_quic_server_config = tpu_fwd_quic_server_config.clone(); ValidatorTpuConfig { use_quic: DEFAULT_TPU_USE_QUIC, From 6507ebd5c2b204e447916a3ba73fbe250daacd74 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 19 Dec 2024 18:41:35 -0800 Subject: [PATCH 5/7] missing max-streams-per-ms --- validator/src/cli.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 8f14c058526001..31322d815375c2 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -952,6 +952,15 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .hidden(hidden_unless_forced()) .help("Controls the max concurrent connections for TPU-forward from unstaked nodes."), ) + .arg( + Arg::with_name("max_streams_per_ms") + .long("max-streams-per-ms") + .takes_value(true) + .default_value(&default_args.max_streams_per_ms) + .validator(is_parsable::) + .hidden(hidden_unless_forced()) + .help("Controls the max number of streams per steamer service."), + ) .arg( Arg::with_name("num_quic_endpoints") .long("num-quic-endpoints") From c2af87fd4f05c8845229dd77e5224a6cbf803554 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 19 Dec 2024 18:50:05 -0800 Subject: [PATCH 6/7] missing tpu_max_streams_per_ms --- validator/src/cli.rs | 14 +++++++------- validator/src/main.rs | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 31322d815375c2..c4c8d1b9fc82a2 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -953,14 +953,14 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .help("Controls the max concurrent connections for TPU-forward from unstaked nodes."), ) .arg( - Arg::with_name("max_streams_per_ms") - .long("max-streams-per-ms") + Arg::with_name("tpu_max_streams_per_ms") + .long("tpu-max-streams-per-ms") .takes_value(true) - .default_value(&default_args.max_streams_per_ms) + .default_value(&default_args.tpu_max_streams_per_ms) .validator(is_parsable::) .hidden(hidden_unless_forced()) - .help("Controls the max number of streams per steamer service."), - ) + .help("Controls the max number of streams for a TPU service."), + ) .arg( Arg::with_name("num_quic_endpoints") .long("num-quic-endpoints") @@ -2362,7 +2362,7 @@ pub struct DefaultArgs { pub max_tpu_unstaked_connections: String, pub max_fwd_staked_connections: String, pub max_fwd_unstaked_connections: String, - pub max_streams_per_ms: String, + pub tpu_max_streams_per_ms: String, pub num_quic_endpoints: String, pub vote_use_quic: String, @@ -2465,7 +2465,7 @@ impl DefaultArgs { .saturating_add(DEFAULT_MAX_UNSTAKED_CONNECTIONS) .to_string(), max_fwd_unstaked_connections: 0.to_string(), - max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS.to_string(), + tpu_max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS.to_string(), num_quic_endpoints: DEFAULT_QUIC_ENDPOINTS.to_string(), rpc_max_request_body_size: MAX_REQUEST_BODY_SIZE.to_string(), exit_min_idle_time: "10".to_string(), diff --git a/validator/src/main.rs b/validator/src/main.rs index 2f82764af5d79b..ed5f7a992341dc 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1979,7 +1979,7 @@ pub fn main() { let tpu_max_connections_per_ipaddr_per_minute: u64 = value_t_or_exit!(matches, "tpu_max_connections_per_ipaddr_per_minute", u64); - let max_streams_per_ms = value_t_or_exit!(matches, "max_streams_per_ms", u64); + let max_streams_per_ms = value_t_or_exit!(matches, "tpu_max_streams_per_ms", u64); let node_config = NodeConfig { gossip_addr, From 88323a366b8f52c2d961aff78b038486db859930 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Fri, 20 Dec 2024 01:37:46 -0800 Subject: [PATCH 7/7] vote does not accept unstaked connections --- validator/src/main.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/validator/src/main.rs b/validator/src/main.rs index ed5f7a992341dc..14da7d3d357902 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -2105,9 +2105,10 @@ pub fn main() { }; // Vote shares TPU forward's characteristics, except that we accept 1 connection - // per peer. + // per peer and no unstaked connections are accepted. let mut vote_quic_server_config = tpu_fwd_quic_server_config.clone(); vote_quic_server_config.max_connections_per_peer = 1; + vote_quic_server_config.max_unstaked_connections = 0; let validator = match Validator::new( node,