Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.1: tpu-client-next: return receiver in scheduler::run (backport of #4454) #4521

Merged
merged 1 commit into from
Jan 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
tpu-client-next: return receiver in scheduler::run (#4454)
Return receiver from scheduler::run so that it can be reused if required. This is needed to use tpu-client-next in validator because user can make an rpc call to update underlying authority, and due to the way this mechanism is implemented, we have to re-utilize the same receiver.

(cherry picked from commit 85b6118)
KirillLykov authored and mergify[bot] committed Jan 17, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit ee66c72787015dd115145e965459b613c5d68864
28 changes: 16 additions & 12 deletions tpu-client-next/src/connection_workers_scheduler.rs
Original file line number Diff line number Diff line change
@@ -90,20 +90,28 @@ pub struct ConnectionWorkersSchedulerConfig {
pub leaders_fanout: Fanout,
}

pub type TransactionStatsAndReceiver = (
SendTransactionStatsPerAddr,
mpsc::Receiver<TransactionBatch>,
);

impl ConnectionWorkersScheduler {
/// Starts the scheduler, which manages the distribution of transactions to
/// the network's upcoming leaders.
///
/// Runs the main loop that handles worker scheduling and management for
/// connections. Returns the error quic statistics per connection address or
/// an error.
/// an error along with receiver for transactions. The receiver returned
/// back to the user because in some cases we need to re-utilize the same
/// receiver for the new scheduler. For example, this happens when the
/// identity for the validator is updated.
///
/// Importantly, if some transactions were not delivered due to network
/// problems, they will not be retried when the problem is resolved.
pub async fn run(
ConnectionWorkersSchedulerConfig {
bind,
stake_identity: validator_identity,
stake_identity,
num_connections,
skip_check_transaction_age,
worker_channel_size,
@@ -113,14 +121,14 @@ impl ConnectionWorkersScheduler {
mut leader_updater: Box<dyn LeaderUpdater>,
mut transaction_receiver: mpsc::Receiver<TransactionBatch>,
cancel: CancellationToken,
) -> Result<SendTransactionStatsPerAddr, ConnectionWorkersSchedulerError> {
let endpoint = Self::setup_endpoint(bind, validator_identity)?;
) -> Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError> {
let endpoint = Self::setup_endpoint(bind, stake_identity.as_ref())?;
debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
let mut workers = WorkersCache::new(num_connections, cancel.clone());
let mut send_stats_per_addr = SendTransactionStatsPerAddr::new();

loop {
let transaction_batch = tokio::select! {
let transaction_batch: TransactionBatch = tokio::select! {
recv_res = transaction_receiver.recv() => match recv_res {
Some(txs) => txs,
None => {
@@ -184,19 +192,15 @@ impl ConnectionWorkersScheduler {

endpoint.close(0u32.into(), b"Closing connection");
leader_updater.stop().await;
Ok(send_stats_per_addr)
Ok((send_stats_per_addr, transaction_receiver))
}

/// Sets up the QUIC endpoint for the scheduler to handle connections.
fn setup_endpoint(
bind: SocketAddr,
validator_identity: Option<Keypair>,
stake_identity: Option<&Keypair>,
) -> Result<Endpoint, ConnectionWorkersSchedulerError> {
let client_certificate = if let Some(validator_identity) = validator_identity {
Arc::new(QuicClientCertificate::new(&validator_identity))
} else {
Arc::new(QuicClientCertificate::new(&Keypair::new()))
};
let client_certificate = QuicClientCertificate::new(stake_identity);
let client_config = create_client_config(client_certificate);
let endpoint = create_client_endpoint(bind, client_config)?;
Ok(endpoint)
2 changes: 1 addition & 1 deletion tpu-client-next/src/quic_networking.rs
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ pub use {
quic_client_certificate::QuicClientCertificate,
};

pub(crate) fn create_client_config(client_certificate: Arc<QuicClientCertificate>) -> ClientConfig {
pub(crate) fn create_client_config(client_certificate: QuicClientCertificate) -> ClientConfig {
// adapted from QuicLazyInitializedEndpoint::create_endpoint
let mut crypto = rustls::ClientConfig::builder()
.dangerous()
Original file line number Diff line number Diff line change
@@ -10,7 +10,12 @@ pub struct QuicClientCertificate {
}

impl QuicClientCertificate {
pub fn new(keypair: &Keypair) -> Self {
pub fn new(keypair: Option<&Keypair>) -> Self {
let keypair = if let Some(keypair) = keypair {
keypair
} else {
&Keypair::new()
};
let (certificate, key) = new_dummy_x509_certificate(keypair);
Self { certificate, key }
}
29 changes: 15 additions & 14 deletions tpu-client-next/tests/connection_workers_scheduler_test.rs
Original file line number Diff line number Diff line change
@@ -16,11 +16,13 @@ use {
streamer::StakedNodes,
},
solana_tpu_client_next::{
connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout},
connection_workers_scheduler::{
ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver,
},
leader_updater::create_leader_updater,
send_transaction_stats::SendTransactionStatsNonAtomic,
transaction_batch::TransactionBatch,
ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStatsPerAddr,
ConnectionWorkersScheduler, ConnectionWorkersSchedulerError,
},
std::{
collections::HashMap,
@@ -41,10 +43,10 @@ use {
tokio_util::sync::CancellationToken,
};

fn test_config(validator_identity: Option<Keypair>) -> ConnectionWorkersSchedulerConfig {
fn test_config(stake_identity: Option<Keypair>) -> ConnectionWorkersSchedulerConfig {
ConnectionWorkersSchedulerConfig {
bind: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0),
stake_identity: validator_identity,
stake_identity,
num_connections: 1,
skip_check_transaction_age: false,
// At the moment we have only one strategy to send transactions: we try
@@ -64,9 +66,9 @@ fn test_config(validator_identity: Option<Keypair>) -> ConnectionWorkersSchedule
async fn setup_connection_worker_scheduler(
tpu_address: SocketAddr,
transaction_receiver: Receiver<TransactionBatch>,
validator_identity: Option<Keypair>,
stake_identity: Option<Keypair>,
) -> (
JoinHandle<Result<SendTransactionStatsPerAddr, ConnectionWorkersSchedulerError>>,
JoinHandle<Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError>>,
CancellationToken,
) {
let json_rpc_url = "http://127.0.0.1:8899";
@@ -83,7 +85,7 @@ async fn setup_connection_worker_scheduler(
.expect("Leader updates was successfully created");

let cancel = CancellationToken::new();
let config = test_config(validator_identity);
let config = test_config(stake_identity);
let scheduler = tokio::spawn(ConnectionWorkersScheduler::run(
config,
leader_updater,
@@ -96,10 +98,10 @@ async fn setup_connection_worker_scheduler(

async fn join_scheduler(
scheduler_handle: JoinHandle<
Result<SendTransactionStatsPerAddr, ConnectionWorkersSchedulerError>,
Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError>,
>,
) -> SendTransactionStatsNonAtomic {
let stats_per_ip = scheduler_handle
let (stats_per_ip, _) = scheduler_handle
.await
.unwrap()
.expect("Scheduler should stop successfully.");
@@ -401,8 +403,8 @@ async fn test_connection_pruned_and_reopened() {
/// connection and verify that all the txs has been received.
#[tokio::test]
async fn test_staked_connection() {
let validator_identity = Keypair::new();
let stakes = HashMap::from([(validator_identity.pubkey(), 100_000)]);
let stake_identity = Keypair::new();
let stakes = HashMap::from([(stake_identity.pubkey(), 100_000)]);
let staked_nodes = StakedNodes::new(Arc::new(stakes), HashMap::<Pubkey, u64>::default());

let SpawnTestServerResult {
@@ -433,8 +435,7 @@ async fn test_staked_connection() {
} = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));

let (scheduler_handle, _scheduler_cancel) =
setup_connection_worker_scheduler(server_address, tx_receiver, Some(validator_identity))
.await;
setup_connection_worker_scheduler(server_address, tx_receiver, Some(stake_identity)).await;

// Check results
let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
@@ -534,7 +535,7 @@ async fn test_no_host() {

// While attempting to establish a connection with a nonexistent host, we fill the worker's
// channel.
let stats = scheduler_handle
let (stats, _) = scheduler_handle
.await
.expect("Scheduler should stop successfully")
.expect("Scheduler execution was successful");