Skip to content

Commit

Permalink
Cleanup the scheduler post-processing step a bunch, breaking each of …
Browse files Browse the repository at this point in the history
…the steps up into its own function, for readability.
  • Loading branch information
rdaum committed Sep 26, 2023
1 parent ff30558 commit 6929347
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 45 deletions.
4 changes: 2 additions & 2 deletions crates/kernel/src/tasks/moo_vm_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::tasks::{TaskId, VerbCall};
use crate::vm::opcode::Program;
use crate::vm::vm_execute::VmExecParams;
use crate::vm::vm_unwind::FinallyReason;
use crate::vm::{ExecutionResult, ForkRequest, VerbExecutionRequest, VM};
use crate::vm::{ExecutionResult, Fork, VerbExecutionRequest, VM};
use async_trait::async_trait;
use moor_values::model::verb_info::VerbInfo;
use moor_values::model::verbs::BinaryType;
Expand Down Expand Up @@ -107,7 +107,7 @@ impl VMHost<Program> for MooVmHost {

self.start_execution(task_id, call_request).await
}
async fn start_fork(&mut self, task_id: TaskId, fork_request: ForkRequest, suspended: bool) {
async fn start_fork(&mut self, task_id: TaskId, fork_request: Fork, suspended: bool) {
self.vm.tick_count = 0;
self.vm.exec_fork_vector(fork_request, task_id).await;
self.running_method = !suspended;
Expand Down
164 changes: 133 additions & 31 deletions crates/kernel/src/tasks/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{oneshot, RwLock};
use tracing::{debug, error, info, instrument, span, trace, warn, Level};

use moor_values::model::permissions::Perms;
use moor_values::model::world_state::{WorldState, WorldStateSource};
use moor_values::model::CommandError;
use moor_values::var::error::Error::{E_INVARG, E_PERM};
Expand All @@ -31,7 +32,7 @@ use crate::tasks::task::Task;
use crate::tasks::task_messages::{SchedulerControlMsg, TaskControlMsg};
use crate::tasks::TaskId;
use crate::vm::vm_unwind::UncaughtException;
use crate::vm::{ForkRequest, VM};
use crate::vm::{Fork, VM};

const SCHEDULER_TICK_TIME: Duration = Duration::from_millis(5);
const METRICS_POLLER_TICK_TIME: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -83,6 +84,29 @@ pub enum TaskWaiterResult {
Error(SchedulerError),
}

struct KillRequest {
requesting_task_id: TaskId,
victim_task_id: TaskId,
sender_permissions: Perms,
result_sender: oneshot::Sender<Var>,
}

struct ResumeRequest {
requesting_task_id: TaskId,
queued_task_id: TaskId,
sender_permissions: Perms,
return_value: Var,
result_sender: oneshot::Sender<Var>,
}

struct ForkRequest {
fork_request: Fork,
reply: oneshot::Sender<TaskId>,
state_source: Arc<dyn WorldStateSource>,
session: Arc<dyn Session>,
scheduler: Scheduler,
}

/// Scheduler-side per-task record. Lives in the scheduler thread and owned by the scheduler and
/// not shared elsewhere.
struct TaskControl {
Expand Down Expand Up @@ -515,20 +539,20 @@ impl Scheduler {
impl Inner {
async fn submit_fork_task(
&mut self,
fork_request: ForkRequest,
fork: Fork,
state_source: Arc<dyn WorldStateSource>,
session: Arc<dyn Session>,
scheduler_ref: Scheduler,
) -> Result<TaskId, SchedulerError> {
increment_counter!("scheduler.forked_tasks");
let task_id = self
.new_task(
fork_request.player,
fork.player,
state_source,
session,
fork_request.delay,
fork.delay,
scheduler_ref,
fork_request.progr,
fork.progr,
false,
)
.await?;
Expand All @@ -540,7 +564,7 @@ impl Inner {
// If there's a delay on the fork, we will mark it in suspended state and put in the
// delay time.
let mut suspended = false;
if let Some(delay) = fork_request.delay {
if let Some(delay) = fork.delay {
task_ref.suspended = true;
task_ref.resume_time = Some(SystemTime::now() + delay);
suspended = true;
Expand All @@ -550,7 +574,7 @@ impl Inner {
.task_control_sender
.send(TaskControlMsg::StartFork {
task_id,
fork_request,
fork_request: fork,
suspended,
})
{
Expand Down Expand Up @@ -663,13 +687,13 @@ impl Inner {
// Task has requested a fork. Dispatch it and reply with the new task id.
// Gotta dump this out til we exit the loop tho, since self.tasks is already
// borrowed here.
fork_requests.push((
fork_requests.push(ForkRequest {
fork_request,
reply,
task.state_source.clone(),
task.session.clone(),
task.scheduler.clone(),
));
state_source: task.state_source.clone(),
session: task.session.clone(),
scheduler: task.scheduler.clone(),
});
}
SchedulerControlMsg::TaskSuspend(resume_time) => {
increment_counter!("scheduler.suspend_task");
Expand All @@ -690,12 +714,12 @@ impl Inner {
} => {
increment_counter!("scheduler.kill_task");
// Task is asking to kill another task.
kill_requests.push((
task.task_id,
kill_requests.push(KillRequest {
requesting_task_id: task.task_id,
victim_task_id,
sender_permissions,
result_sender,
));
});
}
SchedulerControlMsg::ResumeTask {
queued_task_id,
Expand All @@ -704,13 +728,13 @@ impl Inner {
result_sender,
} => {
increment_counter!("scheduler.resume_task");
resume_requests.push((
task.task_id,
resume_requests.push(ResumeRequest {
requesting_task_id: task.task_id,
queued_task_id,
sender_permissions,
return_value,
result_sender,
));
});
}
SchedulerControlMsg::BootPlayer {
player,
Expand All @@ -731,16 +755,54 @@ impl Inner {
}

// Send notifications. These are oneshot and consumed.
to_remove.append(&mut self.process_notifications(to_notify).await);

// Service wake-ups
to_remove.append(&mut self.process_wake_ups(to_wake).await);

// Service fork requests
to_remove.append(&mut self.process_fork_requests(fork_requests).await);

// Service describe requests.
to_remove.append(&mut self.process_describe_requests(desc_requests).await);

// Service kill requests, removing any that were non-responsive (returned from function)
to_remove.append(&mut self.process_kill_requests(kill_requests).await);

// Service resume requests, removing any that were non-responsive (returned from function)
to_remove.append(&mut self.process_resume_requests(resume_requests).await);

self.process_disconnect_tasks(to_disconnect).await;

// Prune any completed/dead tasks
self.prune_dead_tasks();

// Service task removals. This is done last because other queues above might contributed to
// this list.
self.process_task_removals(to_remove);
}

async fn process_notifications(
&mut self,
to_notify: Vec<(TaskId, TaskWaiterResult)>,
) -> Vec<TaskId> {
let mut to_remove = vec![];

for (task_id, result) in to_notify {
let task = self.tasks.get_mut(&task_id).unwrap();
for subscriber in task.subscribers.drain(..) {
if subscriber.send(result.clone()).is_err() {
to_remove.push(task_id);
error!("Notify to subscriber on task {} failed", task_id);
}
}
}
to_remove
}

async fn process_wake_ups(&mut self, to_wake: Vec<TaskId>) -> Vec<TaskId> {
let mut to_remove = vec![];

// Service wake-ups
for task_id in to_wake {
let task = self.tasks.get_mut(&task_id).unwrap();
task.suspended = false;
Expand All @@ -760,9 +822,19 @@ impl Inner {
to_remove.push(task.task_id);
}
}
to_remove
}

// Service fork requests
for (fork_request, reply, state_source, session, scheduler) in fork_requests {
async fn process_fork_requests(&mut self, fork_requests: Vec<ForkRequest>) -> Vec<TaskId> {
let mut to_remove = vec![];
for ForkRequest {
fork_request,
reply,
state_source,
session,
scheduler,
} in fork_requests
{
// Fork the session.
let forked_session = session.clone();
let task_id = self
Expand All @@ -774,8 +846,14 @@ impl Inner {
to_remove.push(task_id);
}
}
to_remove
}
async fn process_describe_requests(
&mut self,
desc_requests: Vec<(TaskId, oneshot::Sender<Vec<TaskDescription>>)>,
) -> Vec<TaskId> {
let mut to_remove = vec![];

// Service describe requests.
// Note these could be done in parallel and joined instead of single file, to avoid blocking
// the loop on one uncooperative thread, and could be done in a separate thread as well?
// The challenge being the borrow semantics of the 'tasks' list.
Expand Down Expand Up @@ -828,11 +906,20 @@ impl Inner {
reply.send(tasks).expect("Could not send task description");
trace!(task = requesting_task_id, "Sent task descriptions back");
}
to_remove
}

async fn process_kill_requests(&mut self, kill_requests: Vec<KillRequest>) -> Vec<TaskId> {
let mut to_remove = vec![];
// Service kill requests
for (requesting_task_id, victim_task_id, sender_permissions, result_sender) in kill_requests
for KillRequest {
requesting_task_id,
victim_task_id,
sender_permissions,
result_sender,
} in kill_requests
{
// If the task somehow is reuesting a kill on itself, that would lead to deadlock,
// If the task somehow is requesting a kill on itself, that would lead to deadlock,
// because we could never send the result back. So we reject that outright. bf_kill_task
// should be handling this upfront.
if requesting_task_id == victim_task_id {
Expand Down Expand Up @@ -879,10 +966,23 @@ impl Inner {
error!(task = requesting_task_id, error = ?e, "Could not send kill result to requesting task. Requesting task being removed.");
}
}
to_remove
}

async fn process_resume_requests(
&mut self,
resume_requests: Vec<ResumeRequest>,
) -> Vec<TaskId> {
let mut to_remove = vec![];

// Service resume requests
for (requesting_task_id, queued_task_id, sender_permissions, return_value, result_sender) in
resume_requests
for ResumeRequest {
requesting_task_id,
queued_task_id,
sender_permissions,
return_value,
result_sender,
} in resume_requests
{
// Task can't resume itself, it couldn't be queued. Builtin should not have sent this
// request.
Expand Down Expand Up @@ -916,7 +1016,6 @@ impl Inner {
.expect("Could not send resume result");
continue;
}

// Task is not suspended.
if !queued_task.suspended {
result_sender
Expand Down Expand Up @@ -948,7 +1047,10 @@ impl Inner {
to_remove.push(requesting_task_id);
}
}
to_remove
}

async fn process_disconnect_tasks(&mut self, to_disconnect: Vec<(TaskId, Objid)>) {
for (disconnect_task_id, player) in to_disconnect {
{
let Some(task) = self.tasks.get_mut(&disconnect_task_id) else {
Expand Down Expand Up @@ -983,8 +1085,9 @@ impl Inner {
};
}
}
}

// Prune any completed/dead tasks
fn prune_dead_tasks(&mut self) {
let dead_tasks: Vec<_> = self
.tasks
.iter()
Expand All @@ -993,9 +1096,8 @@ impl Inner {
for task in dead_tasks {
self.tasks.remove(&task);
}

// Service task removals. This is done last because other queues above might contribute to
// this list.
}
fn process_task_removals(&mut self, to_remove: Vec<TaskId>) {
for task_id in to_remove {
trace!(task = task_id, "Task removed");
self.tasks.remove(&task_id);
Expand Down
6 changes: 3 additions & 3 deletions crates/kernel/src/tasks/task_messages.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::tasks::scheduler::{AbortLimitReason, TaskDescription};
use crate::tasks::TaskId;
use crate::vm::vm_unwind::UncaughtException;
use crate::vm::ForkRequest;
use crate::vm::Fork;

use crate::vm::opcode::Program;
use moor_values::model::permissions::Perms;
Expand Down Expand Up @@ -30,7 +30,7 @@ pub enum TaskControlMsg {
/// set up execution.
StartFork {
task_id: TaskId,
fork_request: ForkRequest,
fork_request: Fork,
// If we're starting in a suspended state. If this is true, an explicit Resume from the
// scheduler will be required to start the task.
suspended: bool,
Expand Down Expand Up @@ -61,7 +61,7 @@ pub enum SchedulerControlMsg {
/// An execption was thrown while executing the verb.
TaskException(UncaughtException),
/// The task is requesting that it be forked.
TaskRequestFork(ForkRequest, oneshot::Sender<TaskId>),
TaskRequestFork(Fork, oneshot::Sender<TaskId>),
/// The task is letting us know it was cancelled.
TaskAbortCancelled,
/// The task is letting us know that it has reached its abort limits.
Expand Down
6 changes: 3 additions & 3 deletions crates/kernel/src/tasks/vm_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::tasks::command_parse::ParsedCommand;
use crate::tasks::scheduler::AbortLimitReason;
use crate::tasks::{TaskId, VerbCall};
use crate::vm::vm_unwind::UncaughtException;
use crate::vm::{ForkRequest, VerbExecutionRequest};
use crate::vm::{Fork, VerbExecutionRequest};
use async_trait::async_trait;
use moor_values::model::verb_info::VerbInfo;
use moor_values::model::verbs::BinaryType;
Expand All @@ -18,7 +18,7 @@ pub enum VMHostResponse {
/// Tell the task to just keep on letting us do what we're doing.
ContinueOk,
/// Tell the task to ask the scheduler to dispatch a fork request, and then resume execution.
DispatchFork(ForkRequest),
DispatchFork(Fork),
/// Tell the task to suspend us.
Suspend(Option<Duration>),
/// Task timed out or exceeded ticks.
Expand Down Expand Up @@ -56,7 +56,7 @@ pub trait VMHost<ProgramType> {
);

/// Setup for dispatching into a fork request.
async fn start_fork(&mut self, task_id: TaskId, fork_request: ForkRequest, suspended: bool);
async fn start_fork(&mut self, task_id: TaskId, fork_request: Fork, suspended: bool);

/// Signal the need to start execution of a verb request.
async fn start_execution(
Expand Down
Loading

0 comments on commit 6929347

Please sign in to comment.