diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index 216687aecf9..1b00f13122d 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -229,7 +229,8 @@ mod tests { crossbeam_channel::unbounded, solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult, + nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, + quic::SpawnServerResult, streamer::StakedNodes, }, std::{ @@ -273,6 +274,7 @@ mod tests { staked_nodes, 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 0456a33a8d9..a37e28cb571 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -33,7 +33,7 @@ use { solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache}, solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, quic::{spawn_server, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, streamer::StakedNodes, }, @@ -167,6 +167,7 @@ impl Tpu { staked_nodes.clone(), MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, tpu_coalesce, ) @@ -191,6 +192,7 @@ impl Tpu { staked_nodes.clone(), MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS), 0, // Prevent unstaked nodes from forwarding transactions + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, tpu_coalesce, ) diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index 9f18acd5c75..3e3608da288 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -10,8 +10,10 @@ mod tests { }, solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult, - streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate, + nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, + quic::SpawnServerResult, + streamer::StakedNodes, + tls_certificates::new_self_signed_tls_certificate, }, std::{ net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, @@ -83,6 +85,7 @@ mod tests { staked_nodes, 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -163,6 +166,7 @@ mod tests { staked_nodes, 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, Duration::from_secs(1), // wait_for_chunk_timeout DEFAULT_TPU_COALESCE, ) @@ -225,6 +229,7 @@ mod tests { staked_nodes.clone(), 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -253,6 +258,7 @@ mod tests { staked_nodes, 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index f3e5a6530ef..38bb3c65172 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1,8 +1,8 @@ use { crate::{ nonblocking::stream_throttle::{ - ConnectionStreamCounter, StakedStreamLoadEMA, MAX_STREAMS_PER_MS, - STREAM_STOP_CODE_THROTTLING, STREAM_THROTTLING_INTERVAL_MS, + ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_STOP_CODE_THROTTLING, + STREAM_THROTTLING_INTERVAL_MS, }, quic::{configure_server, QuicServerError, StreamStats}, streamer::StakedNodes, @@ -74,6 +74,9 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4; const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many"; +/// Limit to 250K PPS +pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250; + // A sequence of bytes that is part of a packet // along with where in the packet it is struct PacketChunk { @@ -122,6 +125,7 @@ pub fn spawn_server( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, + max_streams_per_ms: u64, wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result<(Endpoint, Arc, JoinHandle<()>), QuicServerError> { @@ -145,6 +149,7 @@ pub fn spawn_server( staked_nodes, max_staked_connections, max_unstaked_connections, + max_streams_per_ms, stats.clone(), wait_for_chunk_timeout, coalesce, @@ -162,6 +167,7 @@ async fn run_server( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, + max_streams_per_ms: u64, stats: Arc, wait_for_chunk_timeout: Duration, coalesce: Duration, @@ -174,6 +180,7 @@ async fn run_server( let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( stats.clone(), max_unstaked_connections, + max_streams_per_ms, )); let staked_connection_table: Arc> = Arc::new(Mutex::new(ConnectionTable::new())); @@ -204,6 +211,7 @@ async fn run_server( staked_nodes.clone(), max_staked_connections, max_unstaked_connections, + max_streams_per_ms, stats.clone(), wait_for_chunk_timeout, stream_load_ema.clone(), @@ -482,6 +490,7 @@ async fn setup_connection( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, + max_streams_per_ms: u64, stats: Arc, wait_for_chunk_timeout: Duration, stream_load_ema: Arc, @@ -503,7 +512,7 @@ async fn setup_connection( // The heuristic is that the stake should be large engouh to have 1 stream pass throuh within one throttle // interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams. let min_stake_ratio = - 1_f64 / (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) as f64; + 1_f64 / (max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS) as f64; let stake_ratio = stake as f64 / total_stake as f64; let peer_type = if stake_ratio < min_stake_ratio { // If it is a staked connection with ultra low stake ratio, treat it as unstaked. @@ -1288,6 +1297,7 @@ pub mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, Duration::from_secs(2), DEFAULT_TPU_COALESCE, ) @@ -1724,6 +1734,7 @@ pub mod test { staked_nodes, MAX_STAKED_CONNECTIONS, 0, // Do not allow any connection from unstaked clients/nodes + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -1755,6 +1766,7 @@ pub mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index e3da2be90dd..0497c6993d1 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -11,8 +11,6 @@ use { }, }; -/// Limit to 250K PPS -pub const MAX_STREAMS_PER_MS: u64 = 250; const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20; pub const STREAM_THROTTLING_INTERVAL_MS: u64 = 100; pub const STREAM_STOP_CODE_THROTTLING: u32 = 15; @@ -35,14 +33,18 @@ pub(crate) struct StakedStreamLoadEMA { } impl StakedStreamLoadEMA { - pub(crate) fn new(stats: Arc, max_unstaked_connections: usize) -> Self { + pub(crate) fn new( + stats: Arc, + max_unstaked_connections: usize, + max_streams_per_ms: u64, + ) -> Self { let allow_unstaked_streams = max_unstaked_connections > 0; let max_staked_load_in_ema_window = if allow_unstaked_streams { - (MAX_STREAMS_PER_MS - - Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_MS)) + (max_streams_per_ms + - Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(max_streams_per_ms)) * EMA_WINDOW_MS } else { - MAX_STREAMS_PER_MS * EMA_WINDOW_MS + max_streams_per_ms * EMA_WINDOW_MS }; let max_num_unstaked_connections = @@ -56,7 +58,7 @@ impl StakedStreamLoadEMA { let max_unstaked_load_in_throttling_window = if allow_unstaked_streams { Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT) - .apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) + .apply_to(max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS) .saturating_div(max_num_unstaked_connections) } else { 0 @@ -228,7 +230,9 @@ pub mod test { use { super::*, crate::{ - nonblocking::stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS, + nonblocking::{ + quic::DEFAULT_MAX_STREAMS_PER_MS, stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS, + }, quic::{StreamStats, MAX_UNSTAKED_CONNECTIONS}, }, std::{ @@ -242,6 +246,7 @@ pub mod test { let load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, )); // 25K packets per ms * 20% / 500 max unstaked connections assert_eq!( @@ -258,6 +263,7 @@ pub mod test { let load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, )); // EMA load is used for staked connections to calculate max number of allowed streams. @@ -349,6 +355,7 @@ pub mod test { let load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), 0, + DEFAULT_MAX_STREAMS_PER_MS, )); // EMA load is used for staked connections to calculate max number of allowed streams. @@ -436,6 +443,7 @@ pub mod test { let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, )); stream_load_ema .load_in_recent_interval @@ -464,6 +472,7 @@ pub mod test { let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, )); stream_load_ema .load_in_recent_interval @@ -483,6 +492,7 @@ pub mod test { let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, )); stream_load_ema .load_in_recent_interval diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 78ccbeb6e60..177bfdc7975 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -471,6 +471,7 @@ pub fn spawn_server( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, + max_streams_per_ms: u64, wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result { @@ -488,6 +489,7 @@ pub fn spawn_server( staked_nodes, max_staked_connections, max_unstaked_connections, + max_streams_per_ms, wait_for_chunk_timeout, coalesce, ) @@ -515,7 +517,9 @@ pub fn spawn_server( mod test { use { super::*, - crate::nonblocking::quic::{test::*, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, + crate::nonblocking::quic::{ + test::*, DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + }, crossbeam_channel::unbounded, solana_sdk::net::DEFAULT_TPU_COALESCE, std::net::SocketAddr, @@ -549,6 +553,7 @@ mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -609,6 +614,7 @@ mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -656,6 +662,7 @@ mod test { staked_nodes, MAX_STAKED_CONNECTIONS, 0, // Do not allow any connection from unstaked clients/nodes + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, )