Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make QUIC tpu QOS parameters configurable #4170

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bench-tps/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use {
pubkey::Pubkey,
signature::{read_keypair_file, Keypair},
},
solana_streamer::nonblocking::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
solana_streamer::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC},
std::{
net::{IpAddr, Ipv4Addr},
Expand Down
39 changes: 7 additions & 32 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ use {
},
solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair},
solana_streamer::{
quic::{
spawn_server_multi, QuicServerParams, SpawnServerResult, MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
},
quic::{spawn_server_multi, QuicServerParams, SpawnServerResult},
streamer::StakedNodes,
},
solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType},
Expand All @@ -54,9 +51,6 @@ use {
tokio::sync::mpsc::Sender as AsyncSender,
};

// allow multiple connections for NAT and any open/close overlap
pub const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to quic.rs in streamer


pub struct TpuSockets {
pub transactions: Vec<UdpSocket>,
pub transaction_forwards: Vec<UdpSocket>,
Expand Down Expand Up @@ -115,7 +109,9 @@ impl Tpu {
banking_tracer: Arc<BankingTracer>,
tracer_thread_hdl: TracerThread,
tpu_enable_udp: bool,
tpu_max_connections_per_ipaddr_per_minute: u64,
tpu_quic_server_config: QuicServerParams,
tpu_fwd_quic_server_config: QuicServerParams,
voe_quic_server_config: QuicServerParams,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
block_production_method: BlockProductionMethod,
enable_block_production_forwarding: bool,
Expand Down Expand Up @@ -178,15 +174,7 @@ impl Tpu {
vote_packet_sender.clone(),
exit.clone(),
staked_nodes.clone(),
QuicServerParams {
max_connections_per_peer: 1,
max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
coalesce: tpu_coalesce,
max_staked_connections: MAX_STAKED_CONNECTIONS
.saturating_add(MAX_UNSTAKED_CONNECTIONS),
max_unstaked_connections: 0,
..QuicServerParams::default()
},
voe_quic_server_config,
)
.unwrap();

Expand All @@ -203,12 +191,7 @@ impl Tpu {
packet_sender,
exit.clone(),
staked_nodes.clone(),
QuicServerParams {
max_connections_per_peer: MAX_QUIC_CONNECTIONS_PER_PEER,
max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
coalesce: tpu_coalesce,
..QuicServerParams::default()
},
tpu_quic_server_config,
)
.unwrap();

Expand All @@ -225,15 +208,7 @@ impl Tpu {
forwarded_packet_sender,
exit.clone(),
staked_nodes.clone(),
QuicServerParams {
max_connections_per_peer: MAX_QUIC_CONNECTIONS_PER_PEER,
max_staked_connections: MAX_STAKED_CONNECTIONS
.saturating_add(MAX_UNSTAKED_CONNECTIONS),
max_unstaked_connections: 0, // Prevent unstaked nodes from forwarding transactions
max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
coalesce: tpu_coalesce,
..QuicServerParams::default()
},
tpu_fwd_quic_server_config,
)
.unwrap();

Expand Down
63 changes: 40 additions & 23 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ use {
timing::timestamp,
},
solana_send_transaction_service::send_transaction_service,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
solana_streamer::{quic::QuicServerParams, socket::SocketAddrSpace, streamer::StakedNodes},
solana_tpu_client::tpu_client::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
},
solana_turbine::{self, broadcast_stage::BroadcastStageType},
solana_unified_scheduler_pool::DefaultSchedulerPool,
solana_vote_program::vote_state,
Expand Down Expand Up @@ -486,8 +489,33 @@ pub struct ValidatorTpuConfig {
pub tpu_connection_pool_size: usize,
/// Controls if to enable UDP for TPU tansactions.
pub tpu_enable_udp: bool,
/// Controls the new maximum connections per IpAddr per minute
pub tpu_max_connections_per_ipaddr_per_minute: u64,
/// QUIC server config for regular TPU
pub tpu_quic_server_config: QuicServerParams,
/// QUIC server config for TPU forward
pub tpu_fwd_quic_server_config: QuicServerParams,
/// QUIC server config for Vote
pub vote_quic_server_config: QuicServerParams,
}

pub fn build_validator_tpu_config_for_test(tpu_enable_udp: bool) -> ValidatorTpuConfig {
let mut tpu_quic_server_config = QuicServerParams::default();
tpu_quic_server_config.max_connections_per_peer = 32; // max connections per IpAddr per minute

let mut tpu_fwd_quic_server_config = QuicServerParams::default();
tpu_fwd_quic_server_config.max_connections_per_peer = 32; // max connections per IpAddr per minute

let mut vote_quic_server_config = QuicServerParams::default();
vote_quic_server_config.max_connections_per_peer = 32; // max connections per IpAddr per minute

ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp,
tpu_quic_server_config,
tpu_fwd_quic_server_config,
vote_quic_server_config,
}
}

pub struct Validator {
Expand Down Expand Up @@ -550,7 +578,9 @@ impl Validator {
vote_use_quic,
tpu_connection_pool_size,
tpu_enable_udp,
tpu_max_connections_per_ipaddr_per_minute,
tpu_quic_server_config,
tpu_fwd_quic_server_config,
vote_quic_server_config,
} = tpu_config;

let start_time = Instant::now();
Expand Down Expand Up @@ -1533,7 +1563,9 @@ impl Validator {
banking_tracer,
tracer_thread,
tpu_enable_udp,
tpu_max_connections_per_ipaddr_per_minute,
tpu_quic_server_config,
tpu_fwd_quic_server_config,
vote_quic_server_config,
&prioritization_fee_cache,
config.block_production_method.clone(),
config.enable_block_production_forwarding,
Expand Down Expand Up @@ -2756,10 +2788,7 @@ mod tests {
get_tmp_ledger_path_auto_delete,
},
solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
solana_tpu_client::tpu_client::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC,
DEFAULT_VOTE_USE_QUIC,
},
solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
std::{fs::remove_dir_all, thread, time::Duration},
};

Expand Down Expand Up @@ -2797,13 +2826,7 @@ mod tests {
None, // rpc_to_plugin_manager_receiver
start_progress.clone(),
SocketAddrSpace::Unspecified,
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test
},
build_validator_tpu_config_for_test(DEFAULT_TPU_ENABLE_UDP),
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -3019,13 +3042,7 @@ mod tests {
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
SocketAddrSpace::Unspecified,
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test
},
build_validator_tpu_config_for_test(DEFAULT_TPU_ENABLE_UDP),
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start")
Expand Down
32 changes: 8 additions & 24 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use {
solana_client::connection_cache::ConnectionCache,
solana_core::{
consensus::tower_storage::FileTowerStorage,
validator::{Validator, ValidatorConfig, ValidatorStartProgress, ValidatorTpuConfig},
validator::{
build_validator_tpu_config_for_test, Validator, ValidatorConfig, ValidatorStartProgress,
},
},
solana_gossip::{
cluster_info::Node,
Expand Down Expand Up @@ -341,15 +343,9 @@ impl LocalCluster {
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
// We are turning tpu_enable_udp to true in order to prevent concurrent local cluster tests
// to use the same QUIC ports due to SO_REUSEPORT.
tpu_enable_udp: true,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute
},
// We are turning tpu_enable_udp to true in order to prevent concurrent local cluster tests
// to use the same QUIC ports due to SO_REUSEPORT.
build_validator_tpu_config_for_test(true),
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -553,13 +549,7 @@ impl LocalCluster {
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per mintute
},
build_validator_tpu_config_for_test(DEFAULT_TPU_ENABLE_UDP),
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -1089,13 +1079,7 @@ impl Cluster for LocalCluster {
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute, use higher value because of tests
},
build_validator_tpu_config_for_test(DEFAULT_TPU_ENABLE_UDP),
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down
8 changes: 0 additions & 8 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,6 @@ const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";
const CONNECTION_CLOSE_CODE_INVALID_STREAM: u32 = 5;
const CONNECTION_CLOSE_REASON_INVALID_STREAM: &[u8] = b"invalid_stream";

/// Limit to 250K PPS
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to ../quic.rs

pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250;

/// The new connections per minute from a particular IP address.
/// Heuristically set to the default maximum concurrent connections
/// per IP address. Might be adjusted later.
pub const DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE: u64 = 8;

/// Total new connection counts per second. Heuristically taken from
/// the default staked and unstaked connection limits. Might be adjusted
/// later.
Expand Down
16 changes: 7 additions & 9 deletions streamer/src/nonblocking/stream_throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,8 @@ pub mod test {
use {
super::*,
crate::{
nonblocking::{
quic::DEFAULT_MAX_STREAMS_PER_MS, stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
},
quic::{StreamerStats, MAX_UNSTAKED_CONNECTIONS},
nonblocking::stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
quic::{StreamerStats, DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_MAX_UNSTAKED_CONNECTIONS},
},
std::{
sync::{atomic::Ordering, Arc},
Expand All @@ -251,7 +249,7 @@ pub mod test {
fn test_max_streams_for_unstaked_connection() {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
// 25K packets per ms * 20% / 500 max unstaked connections
Expand All @@ -268,7 +266,7 @@ pub mod test {
fn test_max_streams_for_staked_connection() {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));

Expand Down Expand Up @@ -448,7 +446,7 @@ pub mod test {
fn test_update_ema() {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
Expand Down Expand Up @@ -477,7 +475,7 @@ pub mod test {
fn test_update_ema_missing_interval() {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
Expand All @@ -497,7 +495,7 @@ pub mod test {
fn test_update_ema_if_needed() {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
Expand Down
10 changes: 5 additions & 5 deletions streamer/src/nonblocking/testing_utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
use {
super::quic::{
spawn_server_multi, SpawnNonBlockingServerResult, ALPN_TPU_PROTOCOL_ID,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
crate::{
quic::{
QuicServerParams, StreamerStats, DEFAULT_TPU_COALESCE, MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
QuicServerParams, StreamerStats, DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_MAX_STAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS, DEFAULT_TPU_COALESCE,
},
streamer::StakedNodes,
},
Expand Down Expand Up @@ -63,8 +63,8 @@ impl Default for TestServerConfig {
fn default() -> Self {
Self {
max_connections_per_peer: 1,
max_staked_connections: MAX_STAKED_CONNECTIONS,
max_unstaked_connections: MAX_UNSTAKED_CONNECTIONS,
max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS,
max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS,
max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS,
max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
}
Expand Down
Loading
Loading