diff --git a/crates/kernel/src/tasks/mod.rs b/crates/kernel/src/tasks/mod.rs index 81a551ac..1f8b6067 100644 --- a/crates/kernel/src/tasks/mod.rs +++ b/crates/kernel/src/tasks/mod.rs @@ -20,7 +20,6 @@ pub mod command_parse; pub mod scheduler; pub mod sessions; -mod moo_vm_host; mod task; pub mod task_messages; pub mod vm_host; @@ -52,9 +51,8 @@ pub struct TaskDescription { } pub mod vm_test_utils { - use crate::tasks::moo_vm_host::MooVmHost; use crate::tasks::sessions::Session; - use crate::tasks::vm_host::{VMHost, VMHostResponse}; + use crate::tasks::vm_host::{VMHostResponse, VmHost}; use crate::tasks::VerbCall; use crate::vm::VmExecParams; use moor_values::model::world_state::WorldState; @@ -70,13 +68,10 @@ pub mod vm_test_utils { args: Vec, ) -> Var { let (scs_tx, _scs_rx) = tokio::sync::mpsc::unbounded_channel(); - let mut vm_host = - MooVmHost::new(20, 90_000, Duration::from_secs(5), session.clone(), scs_tx); + let mut vm_host = VmHost::new(20, 90_000, Duration::from_secs(5), session.clone(), scs_tx); let (sched_send, _) = tokio::sync::mpsc::unbounded_channel(); let _vm_exec_params = VmExecParams { - world_state, - session: session.clone(), scheduler_sender: sched_send.clone(), max_stack_depth: 50, ticks_left: 90_000, diff --git a/crates/kernel/src/tasks/moo_vm_host.rs b/crates/kernel/src/tasks/moo_vm_host.rs deleted file mode 100644 index 42a52e8d..00000000 --- a/crates/kernel/src/tasks/moo_vm_host.rs +++ /dev/null @@ -1,358 +0,0 @@ -// Copyright (C) 2024 Ryan Daum -// -// This program is free software: you can redistribute it and/or modify it under -// the terms of the GNU General Public License as published by the Free Software -// Foundation, version 3. -// -// This program is distributed in the hope that it will be useful, but WITHOUT -// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License along with -// this program. If not, see . -// - -use crate::tasks::command_parse::ParsedCommand; -use crate::tasks::scheduler::AbortLimitReason; -use crate::tasks::sessions::Session; -use crate::tasks::task_messages::SchedulerControlMsg; -use crate::tasks::vm_host::VMHostResponse::{AbortLimit, ContinueOk, DispatchFork, Suspend}; -use crate::tasks::vm_host::{VMHost, VMHostResponse}; -use crate::tasks::{TaskId, VerbCall}; -use crate::vm::VmExecParams; -use crate::vm::{ExecutionResult, Fork, VerbExecutionRequest, VM}; -use crate::vm::{FinallyReason, VMExecState}; -use async_trait::async_trait; -use moor_compiler::labels::Name; -use moor_compiler::opcode::Program; -use moor_values::model::verb_info::VerbInfo; -use moor_values::model::verbs::BinaryType; -use moor_values::model::world_state::WorldState; -use moor_values::util::slice_ref::SliceRef; -use moor_values::var::objid::Objid; -use moor_values::var::Var; -use moor_values::AsByteBuffer; -use std::sync::Arc; -use std::time::{Duration, SystemTime}; -use tokio::sync::mpsc::UnboundedSender; -use tracing::{trace, warn}; - -/// A 'host' for running the MOO virtual machine inside a task. -pub struct MooVmHost { - vm: VM, - exec_state: VMExecState, - /// The maximum stack depth for this task - max_stack_depth: usize, - /// The amount of ticks (opcode executions) allotted to this task - max_ticks: usize, - /// The maximum amount of time allotted to this task - max_time: Duration, - sessions: Arc, - scheduler_control_sender: UnboundedSender<(TaskId, SchedulerControlMsg)>, - run_watch_send: tokio::sync::watch::Sender, - run_watch_recv: tokio::sync::watch::Receiver, -} - -impl MooVmHost { - pub fn new( - max_stack_depth: usize, - max_ticks: usize, - max_time: Duration, - sessions: Arc, - scheduler_control_sender: UnboundedSender<(TaskId, SchedulerControlMsg)>, - ) -> Self { - let vm = VM::new(); - let exec_state = VMExecState::new(); - let (run_watch_send, run_watch_recv) = tokio::sync::watch::channel(false); - // Created in an initial suspended state. - Self { - vm, - exec_state, - max_stack_depth, - max_ticks, - max_time, - sessions, - scheduler_control_sender, - run_watch_send, - run_watch_recv, - } - } -} - -#[async_trait] -impl VMHost for MooVmHost { - /// Setup for executing a method initited from a command. - async fn start_call_command_verb( - &mut self, - task_id: TaskId, - vi: VerbInfo, - verb_call: VerbCall, - command: ParsedCommand, - permissions: Objid, - ) { - let binary = Self::decode_program(vi.verbdef().binary_type(), vi.binary().as_slice()); - let call_request = VerbExecutionRequest { - permissions, - resolved_verb: vi, - call: verb_call, - command: Some(command), - program: binary, - }; - - self.start_execution(task_id, call_request).await - } - /// Setup for executing a method call in this VM. - async fn start_call_method_verb( - &mut self, - task_id: TaskId, - perms: Objid, - verb_info: VerbInfo, - verb_call: VerbCall, - ) { - let binary = Self::decode_program( - verb_info.verbdef().binary_type(), - verb_info.binary().as_slice(), - ); - - let call_request = VerbExecutionRequest { - permissions: perms, - resolved_verb: verb_info.clone(), - call: verb_call, - command: None, - program: binary, - }; - - self.start_execution(task_id, call_request).await - } - async fn start_fork(&mut self, task_id: TaskId, fork_request: Fork, suspended: bool) { - self.exec_state.tick_count = 0; - self.vm - .exec_fork_vector(&mut self.exec_state, fork_request, task_id) - .await; - self.run_watch_send.send(!suspended).unwrap(); - } - /// Start execution of a verb request. - async fn start_execution( - &mut self, - task_id: TaskId, - verb_execution_request: VerbExecutionRequest, - ) { - self.exec_state.start_time = Some(SystemTime::now()); - self.exec_state.tick_count = 0; - self.vm - .exec_call_request(&mut self.exec_state, task_id, verb_execution_request) - .await; - self.run_watch_send.send(true).unwrap(); - } - async fn start_eval(&mut self, task_id: TaskId, player: Objid, program: Program) { - self.exec_state.start_time = Some(SystemTime::now()); - self.exec_state.tick_count = 0; - self.vm - .exec_eval_request(&mut self.exec_state, task_id, player, player, program) - .await; - self.run_watch_send.send(true).unwrap(); - } - async fn exec_interpreter( - &mut self, - task_id: TaskId, - world_state: &mut dyn WorldState, - ) -> VMHostResponse { - self.run_watch_recv - .wait_for(|running| *running) - .await - .unwrap(); - - // Check ticks and seconds, and abort the task if we've exceeded the limits. - let time_left = match self.exec_state.start_time { - Some(start_time) => { - let elapsed = start_time.elapsed().expect("Could not get elapsed time"); - if elapsed > self.max_time { - return AbortLimit(AbortLimitReason::Time(elapsed)); - } - Some(self.max_time - elapsed) - } - None => None, - }; - if self.exec_state.tick_count >= self.max_ticks { - return AbortLimit(AbortLimitReason::Ticks(self.exec_state.tick_count)); - } - let mut exec_params = VmExecParams { - world_state, - session: self.sessions.clone(), - scheduler_sender: self.scheduler_control_sender.clone(), - max_stack_depth: self.max_stack_depth, - ticks_left: self.max_ticks - self.exec_state.tick_count, - time_left, - }; - let pre_exec_tick_count = self.exec_state.tick_count; - - // Actually invoke the VM, asking it to loop until it's ready to yield back to us. - let mut result = self - .vm - .exec(&mut exec_params, &mut self.exec_state, self.max_ticks) - .await; - - let post_exec_tick_count = self.exec_state.tick_count; - trace!( - task_id, - executed_ticks = post_exec_tick_count - pre_exec_tick_count, - ?result, - "Executed ticks", - ); - while self.is_running() { - match result { - ExecutionResult::More => return ContinueOk, - ExecutionResult::ContinueVerb { - permissions, - resolved_verb, - call, - command, - trampoline, - trampoline_arg, - } => { - trace!(task_id, call = ?call, "Task continue, call into verb"); - - self.exec_state.top_mut().bf_trampoline_arg = trampoline_arg; - self.exec_state.top_mut().bf_trampoline = trampoline; - - let program = Self::decode_program( - resolved_verb.verbdef().binary_type(), - resolved_verb.binary().as_slice(), - ); - - let call_request = VerbExecutionRequest { - permissions, - resolved_verb, - call, - command, - program, - }; - - self.vm - .exec_call_request(&mut self.exec_state, task_id, call_request) - .await; - return ContinueOk; - } - ExecutionResult::PerformEval { - permissions, - player, - program, - } => { - self.vm - .exec_eval_request(&mut self.exec_state, 0, permissions, player, program) - .await; - return ContinueOk; - } - ExecutionResult::ContinueBuiltin { - bf_func_num: bf_offset, - arguments: args, - } => { - let mut exec_params = VmExecParams { - world_state, - session: self.sessions.clone(), - max_stack_depth: self.max_stack_depth, - scheduler_sender: self.scheduler_control_sender.clone(), - ticks_left: self.max_ticks - self.exec_state.tick_count, - time_left, - }; - // Ask the VM to execute the builtin function. - // This will push the result onto the stack. - // After this we will loop around and check the result. - result = self - .vm - .call_builtin_function( - &mut self.exec_state, - bf_offset, - &args, - &mut exec_params, - ) - .await; - continue; - } - ExecutionResult::DispatchFork(fork_request) => { - return DispatchFork(fork_request); - } - ExecutionResult::Suspend(delay) => { - self.run_watch_send.send(false).unwrap(); - return Suspend(delay); - } - ExecutionResult::NeedInput => { - self.run_watch_send.send(false).unwrap(); - return VMHostResponse::SuspendNeedInput; - } - ExecutionResult::Complete(a) => { - return VMHostResponse::CompleteSuccess(a); - } - ExecutionResult::Exception(fr) => { - trace!(task_id, result = ?fr, "Task exception"); - - return match &fr { - FinallyReason::Abort => VMHostResponse::CompleteAbort, - FinallyReason::Uncaught(exception) => { - VMHostResponse::CompleteException(exception.clone()) - } - _ => { - unreachable!( - "Invalid FinallyReason {:?} reached for task {} in scheduler", - fr, task_id - ); - } - }; - } - } - } - - // We're not running and we didn't get a completion signal from the VM - we must have been - // asked to stop by the scheduler. - warn!(task_id, "VM host stopped by task"); - VMHostResponse::CompleteAbort - } - /// Resume what you were doing after suspension. - async fn resume_execution(&mut self, value: Var) { - // coming back from suspend, we need a return value to feed back to `bf_suspend` - self.exec_state.top_mut().push(value); - self.exec_state.start_time = Some(SystemTime::now()); - self.exec_state.tick_count = 0; - self.run_watch_send.send(true).unwrap(); - } - fn is_running(&self) -> bool { - *self.run_watch_recv.borrow() - } - async fn stop(&mut self) { - self.run_watch_send.send(false).unwrap(); - } - fn decode_program(binary_type: BinaryType, binary_bytes: &[u8]) -> Program { - match binary_type { - BinaryType::LambdaMoo18X => Program::from_sliceref(SliceRef::from_bytes(binary_bytes)), - _ => panic!("Unsupported binary type {:?}", binary_type), - } - } - fn set_variable(&mut self, task_id_var: Name, value: Var) { - self.exec_state - .top_mut() - .set_var_offset(task_id_var, value) - .expect("Could not set forked task id"); - } - fn permissions(&self) -> Objid { - self.exec_state.top().permissions - } - fn verb_name(&self) -> String { - self.exec_state.top().verb_name.clone() - } - fn verb_definer(&self) -> Objid { - self.exec_state.top().verb_definer() - } - fn this(&self) -> Objid { - self.exec_state.top().this - } - fn line_number(&self) -> usize { - self.exec_state - .top() - .find_line_no(self.exec_state.top().pc) - .unwrap_or(0) - } - - fn args(&self) -> Vec { - self.exec_state.top().args.clone() - } -} diff --git a/crates/kernel/src/tasks/task.rs b/crates/kernel/src/tasks/task.rs index 22542eb0..ab23208d 100644 --- a/crates/kernel/src/tasks/task.rs +++ b/crates/kernel/src/tasks/task.rs @@ -35,11 +35,10 @@ use crate::matching::ws_match_env::WsMatchEnv; use crate::tasks::command_parse::{ parse_command, parse_into_words, ParseCommandError, ParsedCommand, }; -use crate::tasks::moo_vm_host::MooVmHost; use crate::tasks::scheduler::AbortLimitReason; use crate::tasks::sessions::Session; use crate::tasks::task_messages::{SchedulerControlMsg, TaskControlMsg, TaskStart}; -use crate::tasks::vm_host::{VMHost, VMHostResponse}; +use crate::tasks::vm_host::{VMHostResponse, VmHost}; use crate::tasks::{TaskDescription, TaskId, VerbCall}; /// A task is a concurrent, transactionally isolated, thread of execution. It starts with the @@ -75,7 +74,7 @@ pub(crate) struct Task { /// The permissions of the task -- the object on behalf of which all permissions are evaluated. pub(crate) perms: Objid, /// The actual VM host which is managing the execution of this task. - pub(crate) vm_host: MooVmHost, + pub(crate) vm_host: VmHost, } #[derive(Debug, PartialEq, Eq)] @@ -184,7 +183,7 @@ impl Task { // Spawn a new MOO VM host. // TODO: here is where we'd make a choice about alternative VM/VM Host implementations. let scheduler_control_sender = control_sender.clone(); - let vm_host = MooVmHost::new( + let vm_host = VmHost::new( max_stack_depth, max_ticks, Duration::from_secs(max_seconds), diff --git a/crates/kernel/src/tasks/vm_host.rs b/crates/kernel/src/tasks/vm_host.rs index 3bf601cf..90137038 100644 --- a/crates/kernel/src/tasks/vm_host.rs +++ b/crates/kernel/src/tasks/vm_host.rs @@ -12,20 +12,28 @@ // this program. If not, see . // -use std::time::Duration; - use crate::tasks::command_parse::ParsedCommand; use crate::tasks::scheduler::AbortLimitReason; +use crate::tasks::sessions::Session; +use crate::tasks::task_messages::SchedulerControlMsg; +use crate::tasks::vm_host::VMHostResponse::{AbortLimit, ContinueOk, DispatchFork, Suspend}; use crate::tasks::{TaskId, VerbCall}; -use crate::vm::UncaughtException; -use crate::vm::{Fork, VerbExecutionRequest}; -use async_trait::async_trait; +use crate::vm::{ExecutionResult, Fork, VerbExecutionRequest, VM}; +use crate::vm::{FinallyReason, VMExecState}; +use crate::vm::{UncaughtException, VmExecParams}; use moor_compiler::labels::Name; +use moor_compiler::opcode::Program; use moor_values::model::verb_info::VerbInfo; use moor_values::model::verbs::BinaryType; use moor_values::model::world_state::WorldState; +use moor_values::util::slice_ref::SliceRef; use moor_values::var::objid::Objid; use moor_values::var::Var; +use moor_values::AsByteBuffer; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; +use tokio::sync::mpsc::UnboundedSender; +use tracing::{trace, warn}; /// Return values from exec_interpreter back to the Task scheduler loop pub enum VMHostResponse { @@ -47,84 +55,339 @@ pub enum VMHostResponse { CompleteException(UncaughtException), } -/// A "VM Host" is the interface between the Task scheduler and a virtual machine runtime. -/// Defining the level of abstraction for executing programmes which run in tasks against shared -/// virtual state. -#[async_trait] -pub trait VMHost { - /// Setup for executing a method call in this VM. - async fn start_call_command_verb( +/// A 'host' for running the MOO virtual machine inside a task. +pub struct VmHost { + /// The VM we're running for the current execution. + // TODO: The VM itself holds no mutable state, so having our own copy here is maybe pointless. + // TODO: we will hold a few of these, one for each runtime/language and flip between them + // depending on the verbdef.binary_type() of the verb we're executing. + vm: VM, + /// Where we store current execution state for this host. + vm_exec_state: VMExecState, + /// The maximum stack depth for this task + max_stack_depth: usize, + /// The amount of ticks (opcode executions) allotted to this task + max_ticks: usize, + /// The maximum amount of time allotted to this task + max_time: Duration, + sessions: Arc, + scheduler_control_sender: UnboundedSender<(TaskId, SchedulerControlMsg)>, + run_watch_send: tokio::sync::watch::Sender, + run_watch_recv: tokio::sync::watch::Receiver, +} + +impl VmHost { + pub fn new( + max_stack_depth: usize, + max_ticks: usize, + max_time: Duration, + sessions: Arc, + scheduler_control_sender: UnboundedSender<(TaskId, SchedulerControlMsg)>, + ) -> Self { + let vm = VM::new(); + let exec_state = VMExecState::new(); + let (run_watch_send, run_watch_recv) = tokio::sync::watch::channel(false); + // Created in an initial suspended state. + Self { + vm, + vm_exec_state: exec_state, + max_stack_depth, + max_ticks, + max_time, + sessions, + scheduler_control_sender, + run_watch_send, + run_watch_recv, + } + } +} + +impl VmHost { + /// Setup for executing a method initiated from a command. + pub async fn start_call_command_verb( &mut self, task_id: TaskId, vi: VerbInfo, verb_call: VerbCall, command: ParsedCommand, permissions: Objid, - ); + ) { + let binary = Self::decode_program(vi.verbdef().binary_type(), vi.binary().as_slice()); + let call_request = VerbExecutionRequest { + permissions, + resolved_verb: vi, + call: verb_call, + command: Some(command), + program: binary, + }; + + self.start_execution(task_id, call_request).await + } /// Setup for executing a method call in this VM. - async fn start_call_method_verb( + pub async fn start_call_method_verb( &mut self, task_id: TaskId, perms: Objid, verb_info: VerbInfo, verb_call: VerbCall, - ); + ) { + let binary = Self::decode_program( + verb_info.verbdef().binary_type(), + verb_info.binary().as_slice(), + ); + + let call_request = VerbExecutionRequest { + permissions: perms, + resolved_verb: verb_info.clone(), + call: verb_call, + command: None, + program: binary, + }; + + self.start_execution(task_id, call_request).await + } - /// Setup for dispatching into a fork request. - async fn start_fork(&mut self, task_id: TaskId, fork_request: Fork, suspended: bool); + /// Start execution of a fork request in the hosted VM. + pub async fn start_fork(&mut self, task_id: TaskId, fork_request: Fork, suspended: bool) { + self.vm_exec_state.tick_count = 0; + self.vm + .exec_fork_vector(&mut self.vm_exec_state, fork_request, task_id) + .await; + self.run_watch_send.send(!suspended).unwrap(); + } - /// Signal the need to start execution of a verb request. - async fn start_execution( + /// Start execution of a verb request. + pub async fn start_execution( &mut self, task_id: TaskId, verb_execution_request: VerbExecutionRequest, - ); + ) { + self.vm_exec_state.start_time = Some(SystemTime::now()); + self.vm_exec_state.tick_count = 0; + self.vm + .exec_call_request(&mut self.vm_exec_state, task_id, verb_execution_request) + .await; + self.run_watch_send.send(true).unwrap(); + } - /// Setup for executing a free-standing evaluation of `program`. - async fn start_eval(&mut self, task_id: TaskId, player: Objid, program: ProgramType); + /// Start execution of an eval request. + pub async fn start_eval(&mut self, task_id: TaskId, player: Objid, program: Program) { + self.vm_exec_state.start_time = Some(SystemTime::now()); + self.vm_exec_state.tick_count = 0; + self.vm + .exec_eval_request(&mut self.vm_exec_state, task_id, player, player, program) + .await; + self.run_watch_send.send(true).unwrap(); + } - /// The meat of the VM host: this is invoked repeatedly by the task scheduler loop to drive the - /// VM. The responses from this function are used to determine what the task/scheduler should do - /// next with this VM. - async fn exec_interpreter( + /// Run the hosted VM. + pub async fn exec_interpreter( &mut self, task_id: TaskId, world_state: &mut dyn WorldState, - ) -> VMHostResponse; + ) -> VMHostResponse { + self.run_watch_recv + .wait_for(|running| *running) + .await + .unwrap(); - /// Ask the host to resume what it was doing after suspension. - async fn resume_execution(&mut self, value: Var); + // Check ticks and seconds, and abort the task if we've exceeded the limits. + let time_left = match self.vm_exec_state.start_time { + Some(start_time) => { + let elapsed = start_time.elapsed().expect("Could not get elapsed time"); + if elapsed > self.max_time { + return AbortLimit(AbortLimitReason::Time(elapsed)); + } + Some(self.max_time - elapsed) + } + None => None, + }; + if self.vm_exec_state.tick_count >= self.max_ticks { + return AbortLimit(AbortLimitReason::Ticks(self.vm_exec_state.tick_count)); + } + let mut exec_params = VmExecParams { + scheduler_sender: self.scheduler_control_sender.clone(), + max_stack_depth: self.max_stack_depth, + ticks_left: self.max_ticks - self.vm_exec_state.tick_count, + time_left, + }; + let pre_exec_tick_count = self.vm_exec_state.tick_count; - /// Return true if the VM is currently running. - fn is_running(&self) -> bool; + // Actually invoke the VM, asking it to loop until it's ready to yield back to us. + let mut result = self + .vm + .exec( + &mut exec_params, + &mut self.vm_exec_state, + world_state, + self.sessions.clone(), + self.max_ticks, + ) + .await; - /// Stop a running VM. - async fn stop(&mut self); + let post_exec_tick_count = self.vm_exec_state.tick_count; + trace!( + task_id, + executed_ticks = post_exec_tick_count - pre_exec_tick_count, + ?result, + "Executed ticks", + ); + while self.is_running() { + match result { + ExecutionResult::More => return ContinueOk, + ExecutionResult::ContinueVerb { + permissions, + resolved_verb, + call, + command, + trampoline, + trampoline_arg, + } => { + trace!(task_id, call = ?call, "Task continue, call into verb"); - /// Decodes a binary into opcodes that this kind of VM can execute. - fn decode_program(binary_type: BinaryType, binary_bytes: &[u8]) -> ProgramType; + self.vm_exec_state.top_mut().bf_trampoline_arg = trampoline_arg; + self.vm_exec_state.top_mut().bf_trampoline = trampoline; - /// Attempt to set a variable inside the VM's current top stack frame. - /// The sole use of this is to set the task id variable for forked tasks or resumed tasks. - // TODO: a bit of an abstraction break, might require some better thought. - fn set_variable(&mut self, task_id_var: Name, value: Var); + let program = Self::decode_program( + resolved_verb.verbdef().binary_type(), + resolved_verb.binary().as_slice(), + ); - /// Return the operating user permissions in place. - fn permissions(&self) -> Objid; + let call_request = VerbExecutionRequest { + permissions, + resolved_verb, + call, + command, + program, + }; - /// Return the name of the 'verb' (method) being executed by this VM. - fn verb_name(&self) -> String; + self.vm + .exec_call_request(&mut self.vm_exec_state, task_id, call_request) + .await; + return ContinueOk; + } + ExecutionResult::PerformEval { + permissions, + player, + program, + } => { + self.vm + .exec_eval_request(&mut self.vm_exec_state, 0, permissions, player, program) + .await; + return ContinueOk; + } + ExecutionResult::ContinueBuiltin { + bf_func_num: bf_offset, + arguments: args, + } => { + let mut exec_params = VmExecParams { + max_stack_depth: self.max_stack_depth, + scheduler_sender: self.scheduler_control_sender.clone(), + ticks_left: self.max_ticks - self.vm_exec_state.tick_count, + time_left, + }; + // Ask the VM to execute the builtin function. + // This will push the result onto the stack. + // After this we will loop around and check the result. + result = self + .vm + .call_builtin_function( + &mut self.vm_exec_state, + bf_offset, + &args, + &mut exec_params, + world_state, + self.sessions.clone(), + ) + .await; + continue; + } + ExecutionResult::DispatchFork(fork_request) => { + return DispatchFork(fork_request); + } + ExecutionResult::Suspend(delay) => { + self.run_watch_send.send(false).unwrap(); + return Suspend(delay); + } + ExecutionResult::NeedInput => { + self.run_watch_send.send(false).unwrap(); + return VMHostResponse::SuspendNeedInput; + } + ExecutionResult::Complete(a) => { + return VMHostResponse::CompleteSuccess(a); + } + ExecutionResult::Exception(fr) => { + trace!(task_id, result = ?fr, "Task exception"); - /// Return who is the responsible 'definer' of the verb being executed by this VM. - fn verb_definer(&self) -> Objid; + return match &fr { + FinallyReason::Abort => VMHostResponse::CompleteAbort, + FinallyReason::Uncaught(exception) => { + VMHostResponse::CompleteException(exception.clone()) + } + _ => { + unreachable!( + "Invalid FinallyReason {:?} reached for task {} in scheduler", + fr, task_id + ); + } + }; + } + } + } - /// Return the object id of the object being operated on by this VM. - fn this(&self) -> Objid; + // We're not running and we didn't get a completion signal from the VM - we must have been + // asked to stop by the scheduler. + warn!(task_id, "VM host stopped by task"); + VMHostResponse::CompleteAbort + } - /// Return the current source line number being executed by this VM. - fn line_number(&self) -> usize; + /// Resume what you were doing after suspension. + pub async fn resume_execution(&mut self, value: Var) { + // coming back from suspend, we need a return value to feed back to `bf_suspend` + self.vm_exec_state.top_mut().push(value); + self.vm_exec_state.start_time = Some(SystemTime::now()); + self.vm_exec_state.tick_count = 0; + self.run_watch_send.send(true).unwrap(); + } + pub fn is_running(&self) -> bool { + *self.run_watch_recv.borrow() + } + pub async fn stop(&mut self) { + self.run_watch_send.send(false).unwrap(); + } + pub fn decode_program(binary_type: BinaryType, binary_bytes: &[u8]) -> Program { + match binary_type { + BinaryType::LambdaMoo18X => Program::from_sliceref(SliceRef::from_bytes(binary_bytes)), + _ => panic!("Unsupported binary type {:?}", binary_type), + } + } + pub fn set_variable(&mut self, task_id_var: Name, value: Var) { + self.vm_exec_state + .top_mut() + .set_var_offset(task_id_var, value) + .expect("Could not set forked task id"); + } + pub fn permissions(&self) -> Objid { + self.vm_exec_state.top().permissions + } + pub fn verb_name(&self) -> String { + self.vm_exec_state.top().verb_name.clone() + } + pub fn verb_definer(&self) -> Objid { + self.vm_exec_state.top().verb_definer() + } + pub fn this(&self) -> Objid { + self.vm_exec_state.top().this + } + pub fn line_number(&self) -> usize { + self.vm_exec_state + .top() + .find_line_no(self.vm_exec_state.top().pc) + .unwrap_or(0) + } - /// Return the arguments to the verb being executed by this VM. - fn args(&self) -> Vec; + pub fn args(&self) -> Vec { + self.vm_exec_state.top().args.clone() + } } diff --git a/crates/kernel/src/vm/vm_call.rs b/crates/kernel/src/vm/vm_call.rs index dbf36ecc..3077e823 100644 --- a/crates/kernel/src/vm/vm_call.rs +++ b/crates/kernel/src/vm/vm_call.rs @@ -12,6 +12,7 @@ // this program. If not, see . // +use std::sync::Arc; use tracing::{debug, trace}; use moor_values::model::world_state::WorldState; @@ -23,6 +24,7 @@ use moor_values::var::{v_int, Var}; use crate::builtins::bf_server::BF_SERVER_EVAL_TRAMPOLINE_RESUME; use crate::builtins::{BfCallState, BfRet}; use crate::tasks::command_parse::ParsedCommand; +use crate::tasks::sessions::Session; use crate::tasks::{TaskId, VerbCall}; use crate::vm::activation::Activation; use crate::vm::vm_unwind::FinallyReason; @@ -236,7 +238,9 @@ impl VM { vm_state: &mut VMExecState, bf_func_num: usize, args: &[Var], - exec_args: &mut VmExecParams<'a>, + exec_args: &VmExecParams, + world_state: &'a mut dyn WorldState, + session: Arc, ) -> ExecutionResult { if bf_func_num >= self.builtins.len() { return self.raise_error(vm_state, E_VARNF); @@ -266,8 +270,8 @@ impl VM { let mut bf_args = BfCallState { exec_state: vm_state, name: BUILTIN_DESCRIPTORS[bf_func_num].name.clone(), - world_state: exec_args.world_state, - session: exec_args.session.clone(), + world_state, + session: session.clone(), args, scheduler_sender: exec_args.scheduler_sender.clone(), ticks_left: exec_args.ticks_left, @@ -290,7 +294,9 @@ impl VM { pub(crate) async fn reenter_builtin_function<'a>( &self, vm_state: &mut VMExecState, - exec_args: &mut VmExecParams<'a>, + exec_args: &VmExecParams, + world_state: &'a mut dyn WorldState, + session: Arc, ) -> ExecutionResult { trace!( bf_index = vm_state.top().bf_index, @@ -307,12 +313,12 @@ impl VM { let bf = self.builtins[vm_state.top().bf_index.unwrap()].clone(); let verb_name = vm_state.top().verb_name.clone(); - let sessions = exec_args.session.clone(); + let sessions = session.clone(); let args = vm_state.top().args.clone(); let mut bf_args = BfCallState { exec_state: vm_state, name: verb_name, - world_state: exec_args.world_state, + world_state, session: sessions, args, scheduler_sender: exec_args.scheduler_sender.clone(), diff --git a/crates/kernel/src/vm/vm_execute.rs b/crates/kernel/src/vm/vm_execute.rs index d52ce10a..1659c1ce 100644 --- a/crates/kernel/src/vm/vm_execute.rs +++ b/crates/kernel/src/vm/vm_execute.rs @@ -60,9 +60,7 @@ pub struct Fork { } /// Represents the set of parameters passed to the VM for execution. -pub struct VmExecParams<'a> { - pub world_state: &'a mut dyn WorldState, - pub session: Arc, +pub struct VmExecParams { pub scheduler_sender: UnboundedSender<(TaskId, SchedulerControlMsg)>, pub max_stack_depth: usize, pub ticks_left: usize, @@ -159,8 +157,10 @@ impl VM { /// Main VM opcode execution. The actual meat of the machine. pub async fn exec<'a>( &self, - exec_params: &mut VmExecParams<'a>, + exec_params: &VmExecParams, state: &mut VMExecState, + world_state: &'a mut dyn WorldState, + session: Arc, tick_slice: usize, ) -> ExecutionResult { // Before executing, check stack depth... @@ -175,7 +175,9 @@ impl VM { // executing elsewhere. It will be up to the function to interpret the counter. // Functions that did not set a trampoline are assumed to be complete. if !state.stack.is_empty() && state.top().bf_index.is_some() { - return self.reenter_builtin_function(state, exec_params).await; + return self + .reenter_builtin_function(state, exec_params, world_state, session) + .await; } // Try to consume & execute as many opcodes as we can without returning back to the task @@ -537,20 +539,20 @@ impl VM { let (propname, obj) = (state.pop(), state.pop()); return self - .resolve_property(state, exec_params.world_state, propname, obj) + .resolve_property(state, world_state, propname, obj) .await; } Op::PushGetProp => { let peeked = state.peek(2); let (propname, obj) = (peeked[1].clone(), peeked[0].clone()); return self - .resolve_property(state, exec_params.world_state, propname, obj) + .resolve_property(state, world_state, propname, obj) .await; } Op::PutProp => { let (rhs, propname, obj) = (state.pop(), state.pop(), state.pop()); return self - .set_property(state, exec_params.world_state, propname, obj, rhs) + .set_property(state, world_state, propname, obj, rhs) .await; } Op::Fork { id, fv_offset } => { @@ -581,9 +583,7 @@ impl VM { let Variant::List(args) = args.variant() else { return self.push_error(state, E_TYPE); }; - return self - .prepare_pass_verb(state, exec_params.world_state, &args[..]) - .await; + return self.prepare_pass_verb(state, world_state, &args[..]).await; } Op::CallVerb => { let (args, verb, obj) = (state.pop(), state.pop(), state.pop()); @@ -594,13 +594,7 @@ impl VM { } }; return self - .prepare_call_verb( - state, - exec_params.world_state, - *obj, - verb.as_str(), - &args[..], - ) + .prepare_call_verb(state, world_state, *obj, verb.as_str(), &args[..]) .await; } Op::Return => { @@ -620,7 +614,14 @@ impl VM { return self.push_error(state, E_ARGS); }; return self - .call_builtin_function(state, id.0 as usize, &args[..], exec_params) + .call_builtin_function( + state, + id.0 as usize, + &args[..], + exec_params, + world_state, + session, + ) .await; } Op::PushLabel(label) => {