Skip to content

Commit

Permalink
Add some basic unit tests for the task logic:
Browse files Browse the repository at this point in the history
  * Run simple program to completion
  * Throw an exception
  * `suspend()` and resume
  * `read()`` and resume
  * fork
  • Loading branch information
rdaum committed Jun 27, 2024
1 parent 28f480b commit f2d051c
Showing 1 changed file with 207 additions and 0 deletions.
207 changes: 207 additions & 0 deletions crates/kernel/src/tasks/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,3 +631,210 @@ fn find_verb_for_command(

// TODO: a battery of unit tests here. Which will likely involve setting up a standalone VM running
// a simple program.
#[cfg(test)]
mod tests {
use crate::tasks::sessions::NoopClientSession;
use crate::tasks::task::Task;
use crate::tasks::task_messages::{SchedulerControlMsg, TaskStart};
use crate::tasks::TaskId;
use crossbeam_channel::{unbounded, Receiver, Sender};
use moor_compiler::compile;
use moor_db_wiredtiger::WiredTigerDB;
use moor_values::model::{WorldState, WorldStateSource};
use moor_values::util::BitEnum;
use moor_values::var::Error::E_DIV;
use moor_values::var::{v_int, v_str};
use moor_values::{NOTHING, SYSTEM_OBJECT};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

/// Build a simple test environment with an Eval task (since that is simplest to setup)
fn setup_test_env(
program: &str,
) -> (
Arc<AtomicBool>,
Task,
WiredTigerDB,
Box<dyn WorldState>,
Sender<(TaskId, SchedulerControlMsg)>,
Receiver<(TaskId, SchedulerControlMsg)>,
) {
let program = compile(program).unwrap();
let task_start = Arc::new(TaskStart::StartEval {
player: SYSTEM_OBJECT,
program,
});
let noop_session = Arc::new(NoopClientSession::new());
let (control_sender, control_receiver) = unbounded();
let kill_switch = Arc::new(AtomicBool::new(false));
let mut task = Task::new(
1,
SYSTEM_OBJECT,
task_start.clone(),
SYSTEM_OBJECT,
false,
noop_session.clone(),
&control_sender,
kill_switch.clone(),
);
let (db, _) = WiredTigerDB::open(None);
let mut tx = db.new_world_state().unwrap();

let sysobj = tx
.create_object(SYSTEM_OBJECT, NOTHING, SYSTEM_OBJECT, BitEnum::all())
.unwrap();
tx.update_property(SYSTEM_OBJECT, sysobj, "name", &v_str("system"))
.unwrap();
tx.update_property(SYSTEM_OBJECT, sysobj, "programmer", &v_int(1))
.unwrap();
tx.update_property(SYSTEM_OBJECT, sysobj, "wizard", &v_int(1))
.unwrap();

task.setup_task_start(&control_sender, tx.as_mut());

(kill_switch, task, db, tx, control_sender, control_receiver)
}

/// Test that we can start a task and run it to completion and it sends the right message with
/// the result back to the scheduler.
#[test]
fn test_simple_run_return() {
let (_kill_switch, task, _db, tx, control_sender, control_receiver) =
setup_test_env("return 1 + 1;");

Task::run_task_loop(task, control_sender, tx);

// Scheduler should have received a TaskSuccess message.
let (task_id, msg) = control_receiver.recv().unwrap();
assert_eq!(task_id, 1);
let SchedulerControlMsg::TaskSuccess(result) = msg else {
panic!("Expected TaskSuccess, got {:?}", msg);
};
assert_eq!(result, v_int(2));
}

/// Trigger a MOO VM exception, and verify it gets sent to scheduler
#[test]
fn test_simple_run_exception() {
let (_kill_switch, task, _db, tx, control_sender, control_receiver) =
setup_test_env("return 1 / 0;");

Task::run_task_loop(task, control_sender, tx);

// Scheduler should have received a TaskException message.
let (task_id, msg) = control_receiver.recv().unwrap();
assert_eq!(task_id, 1);
let SchedulerControlMsg::TaskException(exception) = msg else {
panic!("Expected TaskException, got {:?}", msg);
};
assert_eq!(exception.code, E_DIV);
}

/// Trigger a task-suspend-resume
#[test]
fn test_simple_run_suspend() {
let (_kill_switch, task, db, tx, control_sender, control_receiver) =
setup_test_env("suspend(1); return 123;");

Task::run_task_loop(task, control_sender.clone(), tx);

// Scheduler should have received a TaskSuspend message.
let (task_id, msg) = control_receiver.recv().unwrap();
assert_eq!(task_id, 1);
let SchedulerControlMsg::TaskSuspend(instant, mut resume_task) = msg else {
panic!("Expected TaskSuspend, got {:?}", msg);
};
assert_eq!(resume_task.task_id, 1);
assert!(instant.is_some());

// Now we can simulate resumption...
resume_task.vm_host.resume_execution(v_int(0));

// Now we can simulate resumption...
let tx = db.new_world_state().unwrap();
Task::run_task_loop(resume_task, control_sender, tx);
let (task_id, msg) = control_receiver.recv().unwrap();
assert_eq!(task_id, 1);
let SchedulerControlMsg::TaskSuccess(result) = msg else {
panic!("Expected TaskSuccess, got {:?}", msg);
};
assert_eq!(result, v_int(123));
}

/// Trigger a simulated read()
#[test]
fn test_simple_run_read() {
let (_kill_switch, task, db, tx, control_sender, control_receiver) =
setup_test_env("return read();");

Task::run_task_loop(task, control_sender.clone(), tx);

// Scheduler should have received a TaskRequestInput message, and it should contain the task.
let (task_id, msg) = control_receiver.recv().unwrap();
assert_eq!(task_id, 1);
let SchedulerControlMsg::TaskRequestInput(mut resume_task) = msg else {
panic!("Expected TaskRequestInput, got {:?}", msg);
};
assert_eq!(resume_task.task_id, 1);

// Now we can simulate resumption...
resume_task.vm_host.resume_execution(v_str("hello, world!"));

// And run its task loop again, with a new transaction.
let tx = db.new_world_state().unwrap();
Task::run_task_loop(resume_task, control_sender, tx);

// Scheduler should have received a TaskSuccess message.
let (task_id, msg) = control_receiver.recv().unwrap();
assert_eq!(task_id, 1);
let SchedulerControlMsg::TaskSuccess(result) = msg else {
panic!("Expected TaskSuccess, got {:?}", msg);
};
assert_eq!(result, v_str("hello, world!"));
}

/// Trigger a task-fork
#[test]
fn test_simple_run_fork() {
let (_kill_switch, task, db, mut tx, control_sender, control_receiver) =
setup_test_env("fork (1) return 1 + 1; endfork return 123;");
tx.commit().unwrap();

// Pull a copy of the program out for comparison later.
let task_start = task.task_start.clone();
let TaskStart::StartEval { program, .. } = task_start.as_ref() else {
panic!("Expected StartEval, got {:?}", task.task_start);
};

// This one needs to run in a thread because it's going to block waiting on a reply from
// our fake scheduler.
let jh = std::thread::spawn(move || {
let tx = db.new_world_state().unwrap();
Task::run_task_loop(task, control_sender, tx)
});

// Scheduler should have received a TaskRequestFork message.
let (task_id, msg) = control_receiver.recv().unwrap();
assert_eq!(task_id, 1);
let SchedulerControlMsg::TaskRequestFork(fork_request, reply_channel) = msg else {
panic!("Expected TaskRequestFork, got {:?}", msg);
};
assert_eq!(fork_request.task_id, None);
assert_eq!(fork_request.parent_task_id, 1);
assert_eq!(fork_request.activation.frame.program, *program);

// Reply back with the new task id.
reply_channel.send(2).unwrap();

// Wait for the task to finish.
jh.join().unwrap();

// Scheduler should have received a TaskSuccess message.
let (task_id, msg) = control_receiver.recv().unwrap();
assert_eq!(task_id, 1);
let SchedulerControlMsg::TaskSuccess(result) = msg else {
panic!("Expected TaskSuccess, got {:?}", msg);
};
assert_eq!(result, v_int(123));
}
}

0 comments on commit f2d051c

Please sign in to comment.