Skip to content

Commit

Permalink
Some additional cleanup on the VM dispatch loop.
Browse files Browse the repository at this point in the history
  • Loading branch information
rdaum committed Sep 26, 2023
1 parent 4808f72 commit 6121a73
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 35 deletions.
76 changes: 41 additions & 35 deletions crates/kernel/src/tasks/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub(crate) struct Task {
}

#[derive(Debug, PartialEq, Eq)]
enum TaskContinue {
enum VmContinue {
Continue,
Complete,
}
Expand All @@ -84,7 +84,12 @@ impl Task {
// Run the dispatch loop for the virtual machine.
vm_continuation = self.vm_dispatch() => {
trace!(task_id = ?self.task_id, ?vm_continuation, "VM dispatch");
if vm_continuation == TaskContinue::Complete {
if let Some(scheduler_msg) = vm_continuation.1 {
self.scheduler_control_sender
.send(scheduler_msg)
.expect("Could not send scheduler_msg");
}
if vm_continuation.0 == VmContinue::Complete {
info!(task_id = ?self.task_id, "task execution complete");
break;
}
Expand Down Expand Up @@ -113,7 +118,10 @@ impl Task {
/// The VM dispatch loop. If we're actively running, we'll dispatch to the VM host to execute
/// the next instruction. If we're suspended, we'll wait for a Resume message from the
/// scheduler.
async fn vm_dispatch(&mut self) -> TaskContinue {
/// Returns a tuple of (VmContinue, Option<SchedulerControlMsg>), where VmContinue indicates
/// whether the VM should continue running, and the SchedulerControlMsg is a message to send
/// back to the scheduler, if any.
async fn vm_dispatch(&mut self) -> (VmContinue, Option<SchedulerControlMsg>) {
// Note: This is expected to (async) block if the given VM host is in a
// suspended/non-running state
let vm_exec_result = self
Expand All @@ -137,7 +145,7 @@ impl Task {
self.vm_host
.set_variable(task_id_var, v_int(task_id as i64));
}
TaskContinue::Continue
(VmContinue::Continue, None)
}
Ok(VMHostResponse::Suspend(delay)) => {
trace!(task_id = self.task_id, delay = ?delay, "Task suspend");
Expand All @@ -154,11 +162,10 @@ impl Task {
.expect("Could not commit world state before suspend");
if let CommitResult::ConflictRetry = commit_result {
error!("Conflict during commit before suspend");
self.scheduler_control_sender
.send(SchedulerControlMsg::TaskAbortCancelled)
.expect("Could not send suspend response");

return TaskContinue::Complete;
return (
VmContinue::Complete,
Some(SchedulerControlMsg::TaskAbortCancelled),
);
}

// Let the scheduler know about our suspension, which can be of the form:
Expand All @@ -168,13 +175,12 @@ impl Task {
// rather than sleep here, which would make this thread unresponsive to other
// messages.
let resume_time = delay.map(|delay| SystemTime::now() + delay);
self.scheduler_control_sender
.send(SchedulerControlMsg::TaskSuspend(resume_time))
.expect("Could not send suspend response");

TaskContinue::Continue
(
VmContinue::Continue,
Some(SchedulerControlMsg::TaskSuspend(resume_time)),
)
}
Ok(VMHostResponse::ContinueOk) => TaskContinue::Continue,
Ok(VMHostResponse::ContinueOk) => (VmContinue::Continue, None),
Ok(VMHostResponse::CompleteSuccess(result)) => {
trace!(task_id = self.task_id, result = ?result, "Task complete, success");

Expand All @@ -193,10 +199,10 @@ impl Task {
.await
.expect("Could not commit session...");

self.scheduler_control_sender
.send(SchedulerControlMsg::TaskSuccess(result))
.expect("Could not send success response");
TaskContinue::Complete
(
VmContinue::Complete,
Some(SchedulerControlMsg::TaskSuccess(result)),
)
}
Ok(VMHostResponse::CompleteAbort) => {
error!(task_id = self.task_id, "Task aborted");
Expand All @@ -217,10 +223,10 @@ impl Task {
.await
.expect("Could not rollback connection...");

self.scheduler_control_sender
.send(SchedulerControlMsg::TaskAbortCancelled)
.expect("Could not send abort response");
TaskContinue::Complete
(
VmContinue::Complete,
Some(SchedulerControlMsg::TaskAbortCancelled),
)
}
Ok(VMHostResponse::CompleteException(exception)) => {
// Compose a string out of the backtrace
Expand Down Expand Up @@ -249,10 +255,10 @@ impl Task {
.await
.expect("Could not commit connection output");

self.scheduler_control_sender
.send(SchedulerControlMsg::TaskException(exception))
.expect("Could not send abort response");
TaskContinue::Complete
(
VmContinue::Complete,
Some(SchedulerControlMsg::TaskException(exception)),
)
}
Ok(VMHostResponse::AbortLimit(reason)) => {
let abort_reason_text = match reason {
Expand All @@ -275,10 +281,10 @@ impl Task {
.rollback()
.await
.expect("Could not rollback connection output");
self.scheduler_control_sender
.send(SchedulerControlMsg::TaskAbortLimitsReached(reason))
.expect("Could not send error response");
TaskContinue::Complete
(
VmContinue::Complete,
Some(SchedulerControlMsg::TaskAbortLimitsReached(reason)),
)
}
Err(err) => {
error!(task_id = self.task_id, error = ?err, "Task error; rollback");
Expand All @@ -292,10 +298,10 @@ impl Task {
.await
.expect("Could not rollback connection output");

self.scheduler_control_sender
.send(SchedulerControlMsg::TaskAbortError(err))
.expect("Could not send error response");
TaskContinue::Complete
(
VmContinue::Complete,
Some(SchedulerControlMsg::TaskAbortError(err)),
)
}
};
}
Expand Down
1 change: 1 addition & 0 deletions crates/kernel/src/tasks/task_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub enum TaskControlMsg {
}

/// The ad-hoc messages that can be sent from tasks (or VM) up to the scheduler.
#[derive(Debug)]
pub enum SchedulerControlMsg {
/// Everything executed. The task is done.
TaskSuccess(Var),
Expand Down

0 comments on commit 6121a73

Please sign in to comment.