Skip to content

Commit

Permalink
refactor: replace measure_time with measure_us where appropriate (#2269)
Browse files Browse the repository at this point in the history
  • Loading branch information
jstarry authored Jul 25, 2024
1 parent 918a395 commit 1519a2e
Show file tree
Hide file tree
Showing 10 changed files with 363 additions and 448 deletions.
41 changes: 18 additions & 23 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use {
solana_client::connection_cache::ConnectionCache,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::blockstore_processor::TransactionStatusSender,
solana_measure::{measure_time, measure_us},
solana_measure::measure_us,
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
solana_poh::poh_recorder::{PohRecorder, TransactionRecorder},
solana_runtime::{
Expand Down Expand Up @@ -676,13 +676,13 @@ impl BankingStage {
if unprocessed_transaction_storage.should_not_process() {
return;
}
let (decision, make_decision_time) =
measure_time!(decision_maker.make_consume_or_forward_decision());
let (decision, make_decision_us) =
measure_us!(decision_maker.make_consume_or_forward_decision());
let metrics_action = slot_metrics_tracker.check_leader_slot_boundary(
decision.bank_start(),
Some(unprocessed_transaction_storage),
);
slot_metrics_tracker.increment_make_decision_us(make_decision_time.as_us());
slot_metrics_tracker.increment_make_decision_us(make_decision_us);

match decision {
BufferedPacketsDecision::Consume(bank_start) => {
Expand All @@ -691,17 +691,15 @@ impl BankingStage {
// packet processing metrics from the next slot towards the metrics
// of the previous slot
slot_metrics_tracker.apply_action(metrics_action);
let (_, consume_buffered_packets_time) = measure_time!(
consumer.consume_buffered_packets(
let (_, consume_buffered_packets_us) = measure_us!(consumer
.consume_buffered_packets(
&bank_start,
unprocessed_transaction_storage,
banking_stage_stats,
slot_metrics_tracker,
),
"consume_buffered_packets",
);
));
slot_metrics_tracker
.increment_consume_buffered_packets_us(consume_buffered_packets_time.as_us());
.increment_consume_buffered_packets_us(consume_buffered_packets_us);
}
BufferedPacketsDecision::Forward => {
let ((), forward_us) = measure_us!(forwarder.handle_forwarding(
Expand Down Expand Up @@ -750,20 +748,17 @@ impl BankingStage {
if !unprocessed_transaction_storage.is_empty()
|| last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD
{
let (_, process_buffered_packets_time) = measure_time!(
Self::process_buffered_packets(
decision_maker,
forwarder,
consumer,
&mut unprocessed_transaction_storage,
&banking_stage_stats,
&mut slot_metrics_tracker,
&mut tracer_packet_stats,
),
"process_buffered_packets",
);
let (_, process_buffered_packets_us) = measure_us!(Self::process_buffered_packets(
decision_maker,
forwarder,
consumer,
&mut unprocessed_transaction_storage,
&banking_stage_stats,
&mut slot_metrics_tracker,
&mut tracer_packet_stats,
));
slot_metrics_tracker
.increment_process_buffered_packets_us(process_buffered_packets_time.as_us());
.increment_process_buffered_packets_us(process_buffered_packets_us);
last_metrics_update = Instant::now();
}

Expand Down
35 changes: 12 additions & 23 deletions core/src/banking_stage/unprocessed_transaction_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use {
},
itertools::Itertools,
min_max_heap::MinMaxHeap,
solana_measure::{measure_time, measure_us},
solana_measure::measure_us,
solana_runtime::bank::Bank,
solana_sdk::{
clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, feature_set::FeatureSet, hash::Hash,
Expand Down Expand Up @@ -636,35 +636,24 @@ impl ThreadLocalUnprocessedPackets {
if accepting_packets {
let (
(sanitized_transactions, transaction_to_packet_indexes),
packet_conversion_time,
): (
(Vec<SanitizedTransaction>, Vec<usize>),
_,
) = measure_time!(
self.sanitize_unforwarded_packets(
&packets_to_forward,
&bank,
&mut total_dropped_packets
),
"sanitize_packet",
);
packet_conversion_us,
) = measure_us!(self.sanitize_unforwarded_packets(
&packets_to_forward,
&bank,
&mut total_dropped_packets
));
saturating_add_assign!(
total_packet_conversion_us,
packet_conversion_time.as_us()
packet_conversion_us
);

let (forwardable_transaction_indexes, filter_packets_time) = measure_time!(
Self::filter_invalid_transactions(
let (forwardable_transaction_indexes, filter_packets_us) =
measure_us!(Self::filter_invalid_transactions(
&sanitized_transactions,
&bank,
&mut total_dropped_packets
),
"filter_packets",
);
saturating_add_assign!(
total_filter_packets_us,
filter_packets_time.as_us()
);
));
saturating_add_assign!(total_filter_packets_us, filter_packets_us);

for forwardable_transaction_index in &forwardable_transaction_indexes {
saturating_add_assign!(total_forwardable_packets, 1);
Expand Down
58 changes: 26 additions & 32 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use {
solana_entry::entry::{
self, create_ticks, Entry, EntrySlice, EntryType, EntryVerificationStatus, VerifyRecyclers,
},
solana_measure::{measure::Measure, measure_time},
solana_measure::{measure::Measure, measure_us},
solana_metrics::datapoint_error,
solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
solana_runtime::{
Expand Down Expand Up @@ -187,24 +187,23 @@ pub fn execute_batch(
..
} = tx_results;

let (check_block_cost_limits_result, check_block_cost_limits_time): (Result<()>, Measure) =
measure_time!(if bank
.feature_set
.is_active(&feature_set::apply_cost_tracker_during_replay::id())
{
check_block_cost_limits(
bank,
&loaded_accounts_stats,
&execution_results,
batch.sanitized_transactions(),
)
} else {
Ok(())
});
let (check_block_cost_limits_result, check_block_cost_limits_us) = measure_us!(if bank
.feature_set
.is_active(&feature_set::apply_cost_tracker_during_replay::id())
{
check_block_cost_limits(
bank,
&loaded_accounts_stats,
&execution_results,
batch.sanitized_transactions(),
)
} else {
Ok(())
});

timings.saturating_add_in_place(
ExecuteTimingType::CheckBlockLimitsUs,
check_block_cost_limits_time.as_us(),
check_block_cost_limits_us,
);
check_block_cost_limits_result?;

Expand Down Expand Up @@ -328,20 +327,15 @@ fn execute_batches_internal(
let transaction_count =
transaction_batch.batch.sanitized_transactions().len() as u64;
let mut timings = ExecuteTimings::default();
let (result, execute_batches_time): (Result<()>, Measure) = measure_time!(
{
execute_batch(
transaction_batch,
bank,
transaction_status_sender,
replay_vote_sender,
&mut timings,
log_messages_bytes_limit,
prioritization_fee_cache,
)
},
"execute_batch",
);
let (result, execute_batches_us) = measure_us!(execute_batch(
transaction_batch,
bank,
transaction_status_sender,
replay_vote_sender,
&mut timings,
log_messages_bytes_limit,
prioritization_fee_cache,
));

let thread_index = replay_tx_thread_pool.current_thread_index().unwrap();
execution_timings_per_thread
Expand All @@ -354,14 +348,14 @@ fn execute_batches_internal(
total_transactions_executed,
execute_timings: total_thread_execute_timings,
} = thread_execution_time;
*total_thread_us += execute_batches_time.as_us();
*total_thread_us += execute_batches_us;
*total_transactions_executed += transaction_count;
total_thread_execute_timings
.saturating_add_in_place(ExecuteTimingType::TotalBatchesLen, 1);
total_thread_execute_timings.accumulate(&timings);
})
.or_insert(ThreadExecuteTimings {
total_thread_us: execute_batches_time.as_us(),
total_thread_us: execute_batches_us,
total_transactions_executed: transaction_count,
execute_timings: timings,
});
Expand Down
100 changes: 43 additions & 57 deletions poh/src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use {
poh::Poh,
},
solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
solana_measure::{measure_time, measure_us},
solana_measure::measure_us,
solana_metrics::poh_timing_point::{send_poh_timing_point, PohTimingSender, SlotPohTimingInfo},
solana_runtime::{bank::Bank, installed_scheduler_pool::BankWithScheduler},
solana_sdk::{
Expand Down Expand Up @@ -848,20 +848,17 @@ impl PohRecorder {
}

pub fn tick(&mut self) {
let ((poh_entry, target_time), tick_lock_contention_time) = measure_time!(
{
let mut poh_l = self.poh.lock().unwrap();
let poh_entry = poh_l.tick();
let target_time = if poh_entry.is_some() {
Some(poh_l.target_poh_time(self.target_ns_per_tick))
} else {
None
};
(poh_entry, target_time)
},
"tick_lock_contention",
);
self.tick_lock_contention_us += tick_lock_contention_time.as_us();
let ((poh_entry, target_time), tick_lock_contention_us) = measure_us!({
let mut poh_l = self.poh.lock().unwrap();
let poh_entry = poh_l.tick();
let target_time = if poh_entry.is_some() {
Some(poh_l.target_poh_time(self.target_ns_per_tick))
} else {
None
};
(poh_entry, target_time)
});
self.tick_lock_contention_us += tick_lock_contention_us;

if let Some(poh_entry) = poh_entry {
self.tick_height += 1;
Expand All @@ -884,24 +881,19 @@ impl PohRecorder {
self.tick_height,
));

let (_flush_res, flush_cache_and_tick_time) =
measure_time!(self.flush_cache(true), "flush_cache_and_tick");
self.flush_cache_tick_us += flush_cache_and_tick_time.as_us();

let sleep_time = measure_time!(
{
let target_time = target_time.unwrap();
// sleep is not accurate enough to get a predictable time.
// Kernel can not schedule the thread for a while.
while Instant::now() < target_time {
// TODO: a caller could possibly desire to reset or record while we're spinning here
std::hint::spin_loop();
}
},
"poh_sleep",
)
.1;
self.total_sleep_us += sleep_time.as_us();
let (_flush_res, flush_cache_and_tick_us) = measure_us!(self.flush_cache(true));
self.flush_cache_tick_us += flush_cache_and_tick_us;

let (_, sleep_us) = measure_us!({
let target_time = target_time.unwrap();
// sleep is not accurate enough to get a predictable time.
// Kernel can not schedule the thread for a while.
while Instant::now() < target_time {
// TODO: a caller could possibly desire to reset or record while we're spinning here
std::hint::spin_loop();
}
});
self.total_sleep_us += sleep_us;
}
}

Expand Down Expand Up @@ -949,14 +941,12 @@ impl PohRecorder {
// cannot be generated by `record()`
assert!(!transactions.is_empty(), "No transactions provided");

let ((), report_metrics_time) =
measure_time!(self.report_metrics(bank_slot), "report_metrics");
self.report_metrics_us += report_metrics_time.as_us();
let ((), report_metrics_us) = measure_us!(self.report_metrics(bank_slot));
self.report_metrics_us += report_metrics_us;

loop {
let (flush_cache_res, flush_cache_time) =
measure_time!(self.flush_cache(false), "flush_cache");
self.flush_cache_no_tick_us += flush_cache_time.as_us();
let (flush_cache_res, flush_cache_us) = measure_us!(self.flush_cache(false));
self.flush_cache_no_tick_us += flush_cache_us;
flush_cache_res?;

let working_bank = self
Expand All @@ -967,30 +957,26 @@ impl PohRecorder {
return Err(PohRecorderError::MaxHeightReached);
}

let (mut poh_lock, poh_lock_time) = measure_time!(self.poh.lock().unwrap(), "poh_lock");
self.record_lock_contention_us += poh_lock_time.as_us();
let (mut poh_lock, poh_lock_us) = measure_us!(self.poh.lock().unwrap());
self.record_lock_contention_us += poh_lock_us;

let (record_mixin_res, record_mixin_time) =
measure_time!(poh_lock.record(mixin), "record_mixin");
self.record_us += record_mixin_time.as_us();
let (record_mixin_res, record_mixin_us) = measure_us!(poh_lock.record(mixin));
self.record_us += record_mixin_us;

drop(poh_lock);

if let Some(poh_entry) = record_mixin_res {
let num_transactions = transactions.len();
let (send_entry_res, send_entry_time) = measure_time!(
{
let entry = Entry {
num_hashes: poh_entry.num_hashes,
hash: poh_entry.hash,
transactions,
};
let bank_clone = working_bank.bank.clone();
self.sender.send((bank_clone, (entry, self.tick_height)))
},
"send_poh_entry",
);
self.send_entry_us += send_entry_time.as_us();
let (send_entry_res, send_entry_us) = measure_us!({
let entry = Entry {
num_hashes: poh_entry.num_hashes,
hash: poh_entry.hash,
transactions,
};
let bank_clone = working_bank.bank.clone();
self.sender.send((bank_clone, (entry, self.tick_height)))
});
self.send_entry_us += send_entry_us;
send_entry_res?;
let starting_transaction_index =
working_bank.transaction_index.map(|transaction_index| {
Expand Down
7 changes: 3 additions & 4 deletions poh/src/poh_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
crossbeam_channel::Receiver,
log::*,
solana_entry::poh::Poh,
solana_measure::{measure::Measure, measure_time},
solana_measure::{measure::Measure, measure_us},
solana_sdk::poh_config::PohConfig,
std::{
sync::{
Expand Down Expand Up @@ -261,9 +261,8 @@ impl PohService {
std::mem::take(&mut record.transactions),
);
// what do we do on failure here? Ignore for now.
let (_send_res, send_record_result_time) =
measure_time!(record.sender.send(res), "send_record_result");
timing.total_send_record_result_us += send_record_result_time.as_us();
let (_send_res, send_record_result_us) = measure_us!(record.sender.send(res));
timing.total_send_record_result_us += send_record_result_us;
timing.num_hashes += 1; // note: may have also ticked inside record

let new_record_result = record_receiver.try_recv();
Expand Down
Loading

0 comments on commit 1519a2e

Please sign in to comment.