Skip to content

Commit

Permalink
Some more scheduler cleanups:
Browse files Browse the repository at this point in the history
  * Split task control msg handling out into its own function for readability
  * Cleanup some comments
  * Move the transaction-start inside the task thread -- this way delayed-start transactions are less likely to get serialization conflict.
  * Add TaskConflictRetry message for tx serialization conflict results, and have the scheduler receive them, though they are still unhandled (panic).  (#13)
  * Move VM creation into the VM host, since *it* knows what kind of VM it needs.
  * Remove some redundant arguments/value-passing.
  * Break task's command handling out of the msg match and into its own function for readability
  • Loading branch information
rdaum committed Oct 8, 2023
1 parent 72af49d commit 9197243
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 128 deletions.
6 changes: 4 additions & 2 deletions crates/kernel/src/tasks/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ impl Scheduler {
}
self.do_process().await;
{
let mut start_lock = self.inner.write().await;
start_lock.running = false;
let mut finish_lock = self.inner.write().await;
finish_lock.running = false;
}
info!("Scheduler done.");
}
Expand Down Expand Up @@ -573,6 +573,8 @@ impl Scheduler {
debug!(number_readblocked_tasks, number_tasks, number_suspended_tasks, "...");
}
// Look for tasks that need to be woken (have hit their wakeup-time), and wake them.
// TODO: we might be able to use a vector of delay-futures for this instead, and just poll
// those using some futures_util magic.
_ = task_poller_interval.tick() => {
let mut inner = self.inner.write().await;
let mut to_wake = Vec::new();
Expand Down
258 changes: 132 additions & 126 deletions crates/kernel/src/tasks/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,133 +355,14 @@ impl Task {
/// Handle an inbound control message from the scheduler, and return a response message to send
/// back, if any.
async fn handle_control_message(&mut self, msg: TaskControlMsg) -> Option<SchedulerControlMsg> {
match msg {
return match msg {
// We've been asked to start a command.
// We need to set up the VM and then execute it.
TaskControlMsg::StartCommandVerb { player, command } => {
increment_counter!("task.start_command");

// Command execution is a multi-phase process:
// 1. Lookup $do_command. If we have the verb, execute it.
// 2. If it returns a boolean `true`, we're done, let scheduler know, otherwise:
// 3. Call parse_command, looking for a verb to execute in the environment.
// a. If something, call that verb.
// b. If nothing, look for :huh. If we have it, execute it.
// 4. On completion, let the scheduler know.

// All of this should occur in the same task id, and in the same transaction, and
// forms a multi-part process with continuation back from the VM along the whole
// chain, which complicates things significantly.

// TODO First try to match $do_command. And execute that, scheduling a callback into
// this stage again, if that fails. For now though, we rely on the daemon having
// done this work for us.

// Next, try parsing the command.

// We need the player's location, and we'll just die if we can't get it.
let player_location = match self.world_state.location_of(player, player).await {
Ok(loc) => loc,
Err(WorldStateError::VerbPermissionDenied)
| Err(WorldStateError::ObjectPermissionDenied)
| Err(WorldStateError::PropertyPermissionDenied) => {
return Some(SchedulerControlMsg::TaskCommandError(PermissionDenied));
}
Err(wse) => {
return Some(SchedulerControlMsg::TaskCommandError(
CommandError::DatabaseError(wse),
));
}
};

// Parse the command in the current environment.
let me = WsMatchEnv {
ws: self.world_state.as_mut(),
perms: player,
};
let matcher = MatchEnvironmentParseMatcher { env: me, player };
let parsed_command = match parse_command(&command, matcher).await {
Ok(pc) => pc,
Err(ParseCommandError::PermissionDenied) => {
return Some(SchedulerControlMsg::TaskCommandError(PermissionDenied));
}
Err(_) => {
return Some(SchedulerControlMsg::TaskCommandError(
CommandError::CouldNotParseCommand,
));
}
};

// Look for the verb...
let parse_results = match find_verb_for_command(
player,
player_location,
&parsed_command,
self.world_state.as_mut(),
)
.await
{
Ok(results) => results,
Err(e) => return Some(SchedulerControlMsg::TaskCommandError(e)),
};
let (verb_info, target) = match parse_results {
// If we have a successul match, that's what we'll call into
Some((verb_info, target)) => {
trace!(
?parsed_command,
?player,
?target,
?verb_info,
"Starting command"
);
(verb_info, target)
}
// Otherwise, we want to try to call :huh, if it exists.
None => {
if player_location == NOTHING {
return Some(SchedulerControlMsg::TaskCommandError(
CommandError::NoCommandMatch,
));
}

// Try to find :huh. If it exists, we'll dispatch to that, instead.
// If we don't find it, that's the end of the line.
let Ok(verb_info) = self
.world_state
.find_method_verb_on(self.perms, player_location, "huh")
.await
else {
return Some(SchedulerControlMsg::TaskCommandError(
CommandError::NoCommandMatch,
));
};
let words = parse_into_words(&command);
trace!(?verb_info, ?player, ?player_location, args = ?words,
"Dispatching to :huh");

(verb_info, player_location)
}
};
let verb_call = VerbCall {
verb_name: parsed_command.verb.clone(),
location: target,
this: target,
player,
args: parsed_command.args.clone(),
argstr: parsed_command.argstr.clone(),
caller: player,
};
self.vm_host
.start_call_command_verb(
self.task_id,
verb_info,
verb_call,
parsed_command,
self.perms,
)
.await;
self.start_command(player, command.as_str()).await
}

TaskControlMsg::StartVerb {
player,
vloc,
Expand Down Expand Up @@ -517,6 +398,7 @@ impl Task {
self.vm_host
.start_call_method_verb(self.task_id, self.perms, verb_info, verb_call)
.await;
None
}
TaskControlMsg::StartFork {
task_id,
Expand All @@ -528,12 +410,14 @@ impl Task {
self.vm_host
.start_fork(task_id, fork_request, suspended)
.await;
None
}
TaskControlMsg::StartEval { player, program } => {
increment_counter!("task.start_eval");

self.scheduled_start_time = None;
self.vm_host.start_eval(self.task_id, player, program).await;
None
}
TaskControlMsg::Resume(world_state, value) => {
increment_counter!("task.resume");
Expand All @@ -546,7 +430,7 @@ impl Task {
self.world_state = world_state;
self.scheduled_start_time = None;
self.vm_host.resume_execution(value).await;
return None;
None
}
TaskControlMsg::ResumeReceiveInput(world_state, input) => {
increment_counter!("task.resume_receive_input");
Expand All @@ -561,7 +445,7 @@ impl Task {
self.world_state = world_state;
self.scheduled_start_time = None;
self.vm_host.resume_execution(v_string(input)).await;
return None;
None
}
TaskControlMsg::Abort => {
// We've been asked to die. Go tell the VM host to abort, and roll back the
Expand All @@ -578,7 +462,7 @@ impl Task {
.expect("Could not rollback transaction. Panic.");

// And now tell the scheduler we're done, as we exit.
return Some(SchedulerControlMsg::TaskAbortCancelled);
Some(SchedulerControlMsg::TaskAbortCancelled)
}
TaskControlMsg::Describe(reply_sender) => {
increment_counter!("task.describe");
Expand All @@ -595,9 +479,131 @@ impl Task {
reply_sender
.send(description)
.expect("Could not send task description");
return None;
None
}
}
};
}

async fn start_command(&mut self, player: Objid, command: &str) -> Option<SchedulerControlMsg> {
// Command execution is a multi-phase process:
// 1. Lookup $do_command. If we have the verb, execute it.
// 2. If it returns a boolean `true`, we're done, let scheduler know, otherwise:
// 3. Call parse_command, looking for a verb to execute in the environment.
// a. If something, call that verb.
// b. If nothing, look for :huh. If we have it, execute it.
// 4. On completion, let the scheduler know.

// All of this should occur in the same task id, and in the same transaction, and
// forms a multi-part process with continuation back from the VM along the whole
// chain, which complicates things significantly.

// TODO First try to match $do_command. And execute that, scheduling a callback into
// this stage again, if that fails. For now though, we rely on the daemon having
// done this work for us.

// Next, try parsing the command.

// We need the player's location, and we'll just die if we can't get it.
let player_location = match self.world_state.location_of(player, player).await {
Ok(loc) => loc,
Err(WorldStateError::VerbPermissionDenied)
| Err(WorldStateError::ObjectPermissionDenied)
| Err(WorldStateError::PropertyPermissionDenied) => {
return Some(SchedulerControlMsg::TaskCommandError(PermissionDenied));
}
Err(wse) => {
return Some(SchedulerControlMsg::TaskCommandError(
CommandError::DatabaseError(wse),
));
}
};

// Parse the command in the current environment.
let me = WsMatchEnv {
ws: self.world_state.as_mut(),
perms: player,
};
let matcher = MatchEnvironmentParseMatcher { env: me, player };
let parsed_command = match parse_command(command, matcher).await {
Ok(pc) => pc,
Err(ParseCommandError::PermissionDenied) => {
return Some(SchedulerControlMsg::TaskCommandError(PermissionDenied));
}
Err(_) => {
return Some(SchedulerControlMsg::TaskCommandError(
CommandError::CouldNotParseCommand,
));
}
};

// Look for the verb...
let parse_results = match find_verb_for_command(
player,
player_location,
&parsed_command,
self.world_state.as_mut(),
)
.await
{
Ok(results) => results,
Err(e) => return Some(SchedulerControlMsg::TaskCommandError(e)),
};
let (verb_info, target) = match parse_results {
// If we have a successul match, that's what we'll call into
Some((verb_info, target)) => {
trace!(
?parsed_command,
?player,
?target,
?verb_info,
"Starting command"
);
(verb_info, target)
}
// Otherwise, we want to try to call :huh, if it exists.
None => {
if player_location == NOTHING {
return Some(SchedulerControlMsg::TaskCommandError(
CommandError::NoCommandMatch,
));
}

// Try to find :huh. If it exists, we'll dispatch to that, instead.
// If we don't find it, that's the end of the line.
let Ok(verb_info) = self
.world_state
.find_method_verb_on(self.perms, player_location, "huh")
.await
else {
return Some(SchedulerControlMsg::TaskCommandError(
CommandError::NoCommandMatch,
));
};
let words = parse_into_words(&command);
trace!(?verb_info, ?player, ?player_location, args = ?words,
"Dispatching to :huh");

(verb_info, player_location)
}
};
let verb_call = VerbCall {
verb_name: parsed_command.verb.clone(),
location: target,
this: target,
player,
args: parsed_command.args.clone(),
argstr: parsed_command.argstr.clone(),
caller: player,
};
self.vm_host
.start_call_command_verb(
self.task_id,
verb_info,
verb_call,
parsed_command,
self.perms,
)
.await;
None
}
}
Expand Down

0 comments on commit 9197243

Please sign in to comment.