Skip to content

Commit

Permalink
Make TaskHandler::handle() more generalized (#4050)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun authored Dec 12, 2024
1 parent 9d9be71 commit 5fe8d2e
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 58 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ solana-program-runtime = { workspace = true }
solana-sdk = { workspace = true, features = ["dev-context-only-utils"] }
solana-stake-program = { workspace = true }
solana-system-program = { workspace = true }
solana-unified-scheduler-logic = { workspace = true }
solana-unified-scheduler-pool = { workspace = true, features = [
"dev-context-only-utils",
] }
Expand Down
18 changes: 7 additions & 11 deletions core/tests/unified_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@ use {
solana_ledger::genesis_utils::create_genesis_config,
solana_runtime::{
accounts_background_service::AbsRequestSender, bank::Bank, bank_forks::BankForks,
genesis_utils::GenesisConfigInfo, prioritization_fee_cache::PrioritizationFeeCache,
genesis_utils::GenesisConfigInfo, installed_scheduler_pool::SchedulingContext,
prioritization_fee_cache::PrioritizationFeeCache,
},
solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
solana_sdk::{
hash::Hash,
pubkey::Pubkey,
system_transaction,
transaction::{Result, SanitizedTransaction},
},
solana_sdk::{hash::Hash, pubkey::Pubkey, system_transaction, transaction::Result},
solana_timings::ExecuteTimings,
solana_unified_scheduler_logic::Task,
solana_unified_scheduler_pool::{
DefaultTaskHandler, HandlerContext, PooledScheduler, SchedulerPool, TaskHandler,
},
Expand All @@ -48,9 +45,8 @@ fn test_scheduler_waited_by_drop_bank_service() {
fn handle(
result: &mut Result<()>,
timings: &mut ExecuteTimings,
bank: &Arc<Bank>,
transaction: &RuntimeTransaction<SanitizedTransaction>,
index: usize,
scheduling_context: &SchedulingContext,
task: &Task,
handler_context: &HandlerContext,
) {
info!("Stalling at StallingHandler::handle()...");
Expand All @@ -59,7 +55,7 @@ fn test_scheduler_waited_by_drop_bank_service() {
std::thread::sleep(std::time::Duration::from_secs(3));
info!("Now entering into DefaultTaskHandler::handle()...");

DefaultTaskHandler::handle(result, timings, bank, transaction, index, handler_context);
DefaultTaskHandler::handle(result, timings, scheduling_context, task, handler_context);
}
}

Expand Down
87 changes: 40 additions & 47 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use {
execute_batch, TransactionBatchWithIndexes, TransactionStatusSender,
},
solana_runtime::{
bank::Bank,
installed_scheduler_pool::{
initialized_result_with_timings, InstalledScheduler, InstalledSchedulerBox,
InstalledSchedulerPool, InstalledSchedulerPoolArc, ResultWithTimings, ScheduleResult,
Expand Down Expand Up @@ -411,9 +410,8 @@ pub trait TaskHandler: Send + Sync + Debug + Sized + 'static {
fn handle(
result: &mut Result<()>,
timings: &mut ExecuteTimings,
bank: &Arc<Bank>,
transaction: &RuntimeTransaction<SanitizedTransaction>,
index: usize,
scheduling_context: &SchedulingContext,
task: &Task,
handler_context: &HandlerContext,
);
}
Expand All @@ -425,13 +423,16 @@ impl TaskHandler for DefaultTaskHandler {
fn handle(
result: &mut Result<()>,
timings: &mut ExecuteTimings,
bank: &Arc<Bank>,
transaction: &RuntimeTransaction<SanitizedTransaction>,
index: usize,
scheduling_context: &SchedulingContext,
task: &Task,
handler_context: &HandlerContext,
) {
// scheduler must properly prevent conflicting tx executions. thus, task handler isn't
// responsible for locking.
let bank = scheduling_context.bank();
let transaction = task.transaction();
let index = task.task_index();

let batch = bank.prepare_unlocked_batch_from_single_tx(transaction);
let batch_with_indexes = TransactionBatchWithIndexes {
batch,
Expand Down Expand Up @@ -786,17 +787,16 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}

fn execute_task_with_handler(
bank: &Arc<Bank>,
scheduling_context: &SchedulingContext,
executed_task: &mut Box<ExecutedTask>,
handler_context: &HandlerContext,
) {
debug!("handling task at {:?}", thread::current());
TH::handle(
&mut executed_task.result_with_timings.0,
&mut executed_task.result_with_timings.1,
bank,
executed_task.task.transaction(),
executed_task.task.task_index(),
scheduling_context,
&executed_task.task,
handler_context,
);
}
Expand Down Expand Up @@ -1192,7 +1192,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}
let mut task = ExecutedTask::new_boxed(task);
Self::execute_task_with_handler(
runnable_task_receiver.context().bank(),
runnable_task_receiver.context(),
&mut task,
&pool.handler_context,
);
Expand Down Expand Up @@ -1752,9 +1752,8 @@ mod tests {
fn handle(
_result: &mut Result<()>,
timings: &mut ExecuteTimings,
_bank: &Arc<Bank>,
_transaction: &RuntimeTransaction<SanitizedTransaction>,
_index: usize,
_bank: &SchedulingContext,
_task: &Task,
_handler_context: &HandlerContext,
) {
timings.metrics[ExecuteTimingType::CheckUs] += 123;
Expand Down Expand Up @@ -1935,9 +1934,8 @@ mod tests {
fn handle(
result: &mut Result<()>,
_timings: &mut ExecuteTimings,
_bank: &Arc<Bank>,
_transaction: &RuntimeTransaction<SanitizedTransaction>,
_index: usize,
_bank: &SchedulingContext,
_task: &Task,
_handler_context: &HandlerContext,
) {
*result = Err(TransactionError::AccountNotFound);
Expand Down Expand Up @@ -2046,9 +2044,8 @@ mod tests {
fn handle(
_result: &mut Result<()>,
_timings: &mut ExecuteTimings,
_bank: &Arc<Bank>,
_transaction: &RuntimeTransaction<SanitizedTransaction>,
_index: usize,
_bank: &SchedulingContext,
_task: &Task,
_handler_context: &HandlerContext,
) {
*TASK_COUNT.lock().unwrap() += 1;
Expand Down Expand Up @@ -2383,11 +2380,11 @@ mod tests {
fn handle(
_result: &mut Result<()>,
_timings: &mut ExecuteTimings,
_bank: &Arc<Bank>,
_transaction: &RuntimeTransaction<SanitizedTransaction>,
index: usize,
_bank: &SchedulingContext,
task: &Task,
_handler_context: &HandlerContext,
) {
let index = task.task_index();
if index == 0 {
sleepless_testing::at(PanickingHanlderCheckPoint::BeforeNotifiedPanic);
} else if index == 1 {
Expand Down Expand Up @@ -2463,11 +2460,11 @@ mod tests {
fn handle(
result: &mut Result<()>,
_timings: &mut ExecuteTimings,
_bank: &Arc<Bank>,
_transaction: &RuntimeTransaction<SanitizedTransaction>,
index: usize,
_bank: &SchedulingContext,
task: &Task,
_handler_context: &HandlerContext,
) {
let index = task.task_index();
*TASK_COUNT.lock().unwrap() += 1;
if index == 1 {
*result = Err(TransactionError::AccountNotFound);
Expand Down Expand Up @@ -2532,24 +2529,17 @@ mod tests {
fn handle(
result: &mut Result<()>,
timings: &mut ExecuteTimings,
bank: &Arc<Bank>,
transaction: &RuntimeTransaction<SanitizedTransaction>,
index: usize,
bank: &SchedulingContext,
task: &Task,
handler_context: &HandlerContext,
) {
let index = task.task_index();
match index {
STALLED_TRANSACTION_INDEX => *LOCK_TO_STALL.lock().unwrap(),
BLOCKED_TRANSACTION_INDEX => {}
_ => unreachable!(),
};
DefaultTaskHandler::handle(
result,
timings,
bank,
transaction,
index,
handler_context,
);
DefaultTaskHandler::handle(result, timings, bank, task, handler_context);
}
}

Expand Down Expand Up @@ -2617,13 +2607,12 @@ mod tests {
fn handle(
_result: &mut Result<()>,
_timings: &mut ExecuteTimings,
bank: &Arc<Bank>,
_transaction: &RuntimeTransaction<SanitizedTransaction>,
index: usize,
context: &SchedulingContext,
task: &Task,
_handler_context: &HandlerContext,
) {
// The task index must always be matched to the slot.
assert_eq!(index as Slot, bank.slot());
assert_eq!(task.task_index() as Slot, context.bank().slot());
}
}

Expand Down Expand Up @@ -2716,7 +2705,6 @@ mod tests {
transaction: RuntimeTransaction<SanitizedTransaction>,
index: usize,
) -> ScheduleResult {
let transaction_and_index = (transaction, index);
let context = self.context().clone();
let pool = self.3.clone();

Expand All @@ -2728,12 +2716,15 @@ mod tests {
let mut result = Ok(());
let mut timings = ExecuteTimings::default();

let task = SchedulingStateMachine::create_task(transaction, index, &mut |_| {
UsageQueue::default()
});

<DefaultTaskHandler as TaskHandler>::handle(
&mut result,
&mut timings,
context.bank(),
&transaction_and_index.0,
transaction_and_index.1,
&context,
&task,
&pool.handler_context,
);
(result, timings)
Expand Down Expand Up @@ -2923,14 +2914,16 @@ mod tests {
let result = &mut Ok(());
let timings = &mut ExecuteTimings::default();
let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let scheduling_context = &SchedulingContext::new(bank.clone());
let handler_context = &HandlerContext {
log_messages_bytes_limit: None,
transaction_status_sender: None,
replay_vote_sender: None,
prioritization_fee_cache,
};

DefaultTaskHandler::handle(result, timings, bank, &tx, 0, handler_context);
let task = SchedulingStateMachine::create_task(tx, 0, &mut |_| UsageQueue::default());
DefaultTaskHandler::handle(result, timings, scheduling_context, &task, handler_context);
assert_matches!(result, Err(TransactionError::AccountLoadedTwice));
}
}

0 comments on commit 5fe8d2e

Please sign in to comment.