Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler Frequency Fixes #4545

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type SchedulerPrioGraph = PrioGraph<

pub(crate) struct PrioGraphSchedulerConfig {
pub max_scheduled_cus: u64,
pub max_transactions_per_scheduling_pass: usize,
pub max_scanned_transactions_per_scheduling_pass: usize,
pub look_ahead_window_size: usize,
pub target_transactions_per_batch: usize,
}
Expand All @@ -52,8 +52,8 @@ impl Default for PrioGraphSchedulerConfig {
fn default() -> Self {
Self {
max_scheduled_cus: MAX_BLOCK_UNITS,
max_transactions_per_scheduling_pass: 100_000,
look_ahead_window_size: 2048,
max_scanned_transactions_per_scheduling_pass: 1000,
look_ahead_window_size: 256,
target_transactions_per_batch: TARGET_NUM_TRANSACTIONS_PER_BATCH,
}
}
Expand Down Expand Up @@ -192,16 +192,18 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
let mut unblock_this_batch = Vec::with_capacity(
self.consume_work_senders.len() * self.config.target_transactions_per_batch,
);
let mut num_scanned: usize = 0;
let mut num_scheduled: usize = 0;
let mut num_sent: usize = 0;
let mut num_unschedulable: usize = 0;
while num_scheduled < self.config.max_transactions_per_scheduling_pass {
while num_scanned < self.config.max_scanned_transactions_per_scheduling_pass {
// If nothing is in the main-queue of the `PrioGraph` then there's nothing left to schedule.
if self.prio_graph.is_empty() {
break;
}

while let Some(id) = self.prio_graph.pop() {
num_scanned += 1;
unblock_this_batch.push(id);

// Should always be in the container, during initial testing phase panic.
Expand Down Expand Up @@ -267,12 +269,12 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
break;
}
}

if num_scheduled >= self.config.max_transactions_per_scheduling_pass {
break;
}
}
}

if num_scanned >= self.config.max_scanned_transactions_per_scheduling_pass {
break;
}
Comment on lines +275 to +277
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavioral change here: break loop if the number of scanned transactions exceed the configured maximum

}

// Send all non-empty batches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,10 @@ impl ReceiveAndBuffer for SanitizedTransactionReceiveAndBuffer {
count_metrics: &mut SchedulerCountMetrics,
decision: &BufferedPacketsDecision,
) -> Result<usize, ()> {
let remaining_queue_capacity = container.remaining_capacity();

const MAX_RECEIVE_PACKETS: usize = 5_000;
const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(10);
let (recv_timeout, should_buffer) = match decision {
BufferedPacketsDecision::Consume(_) => (
BufferedPacketsDecision::Consume(_) | BufferedPacketsDecision::Hold => (
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavioral change: Hold can now result in 0 timeout if the container is not empty. This is intended to avoid long timeouts of 10ms if a Hold pops up between our leader slots (it can)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the profiles I've been taking so far, we seem to spend exactly 20ms doing nothing in between slots. Curious to see what happens now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't you recently profile on the box with this change in there? did you see something there?

if container.is_empty() {
MAX_PACKET_RECEIVE_TIME
} else {
Expand All @@ -93,14 +92,12 @@ impl ReceiveAndBuffer for SanitizedTransactionReceiveAndBuffer {
true,
),
BufferedPacketsDecision::Forward => (MAX_PACKET_RECEIVE_TIME, self.forwarding_enabled),
BufferedPacketsDecision::ForwardAndHold | BufferedPacketsDecision::Hold => {
(MAX_PACKET_RECEIVE_TIME, true)
}
BufferedPacketsDecision::ForwardAndHold => (MAX_PACKET_RECEIVE_TIME, true),
};

let (received_packet_results, receive_time_us) = measure_us!(self
.packet_receiver
.receive_packets(recv_timeout, remaining_queue_capacity, |packet| {
.receive_packets(recv_timeout, MAX_RECEIVE_PACKETS, |packet| {
Copy link
Author

@apfitzge apfitzge Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavioral change: max received number of transactions is 5000. This is a "loose maximum" as it will continue to receive packets up until it has receive >= 5000. This is because the smallest unit we receive is a Vec<PacketBatch> which obviously can contain more than one packet.

packet.check_excessive_precompiles()?;
Ok(packet)
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ impl<C: LikeClusterInfo, R: ReceiveAndBuffer> SchedulerController<C, R> {
self.timing_metrics
.maybe_report_and_reset_slot(new_leader_slot);

self.process_transactions(&decision)?;
self.receive_completed()?;
self.process_transactions(&decision)?;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavioral change: receive completed transactions just before scheduling instead of just after.

if self.receive_and_buffer_packets(&decision).is_err() {
break;
}
Expand Down
Loading