diff --git a/crates/daemon/src/rpc_server.rs b/crates/daemon/src/rpc_server.rs index 82f483b5..ffc741ee 100644 --- a/crates/daemon/src/rpc_server.rs +++ b/crates/daemon/src/rpc_server.rs @@ -33,7 +33,7 @@ use moor_db::DatabaseFlavour; use moor_kernel::config::Config; use moor_kernel::tasks::sessions::SessionError::DeliveryError; use moor_kernel::tasks::sessions::{Session, SessionError}; -use moor_kernel::tasks::TaskHandle; +use moor_kernel::tasks::{TaskHandle, TaskResult}; use moor_kernel::SchedulerClient; use moor_values::matching::command_parse::preposition_to_string; use moor_values::model::{Named, ObjectRef, PropFlag, ValSet, VerbFlag}; @@ -758,7 +758,7 @@ impl RpcServer { let Ok(session) = self.clone().new_session(client_id, connection.clone()) else { return Err(RpcMessageError::CreateSessionFailed); }; - let task_handle = match scheduler_client.submit_verb_task( + let mut task_handle = match scheduler_client.submit_verb_task( connection, &ObjectRef::Id(handler_object.clone()), Symbol::mk("do_login_command"), @@ -774,29 +774,35 @@ impl RpcServer { return Err(RpcMessageError::InternalError(e.to_string())); } }; - let receiver = task_handle.into_receiver(); - let player = match receiver.recv() { - Ok(Ok(v)) => { - // If v is an objid, we have a successful login and we need to rewrite this - // client id to use the player objid and then return a result to the client. - // with its new player objid and login result. - // If it's not an objid, that's considered an auth failure. - match v.variant() { - Variant::Obj(o) => o.clone(), - _ => { - return Ok(LoginResult(None)); + let player = loop { + let receiver = task_handle.into_receiver(); + match receiver.recv() { + Ok(Ok(TaskResult::Restarted(th))) => { + task_handle = th; + continue; + } + Ok(Ok(TaskResult::Result(v))) => { + // If v is an objid, we have a successful login and we need to rewrite this + // client id to use the player objid and then return a result to the client. + // with its new player objid and login result. + // If it's not an objid, that's considered an auth failure. + match v.variant() { + Variant::Obj(o) => break o.clone(), + _ => { + return Ok(LoginResult(None)); + } } } - } - Ok(Err(e)) => { - error!(error = ?e, "Error waiting for login results"); + Ok(Err(e)) => { + error!(error = ?e, "Error waiting for login results"); - return Err(RpcMessageError::LoginTaskFailed); - } - Err(e) => { - error!(error = ?e, "Error waiting for login results"); + return Err(RpcMessageError::LoginTaskFailed); + } + Err(e) => { + error!(error = ?e, "Error waiting for login results"); - return Err(RpcMessageError::InternalError(e.to_string())); + return Err(RpcMessageError::InternalError(e.to_string())); + } } }; @@ -985,7 +991,7 @@ impl RpcServer { return Err(RpcMessageError::CreateSessionFailed); }; - let task_handle = match scheduler_client.submit_eval_task( + let mut task_handle = match scheduler_client.submit_eval_task( connection, connection, expression, @@ -998,13 +1004,19 @@ impl RpcServer { return Err(RpcMessageError::InternalError(e.to_string())); } }; - match task_handle.into_receiver().recv() { - Ok(Ok(v)) => Ok(DaemonToClientReply::EvalResult(v)), - Ok(Err(e)) => Err(RpcMessageError::TaskError(e)), - Err(e) => { - error!(error = ?e, "Error processing eval"); + loop { + match task_handle.into_receiver().recv() { + Ok(Ok(TaskResult::Restarted(th))) => { + task_handle = th; + continue; + } + Ok(Ok(TaskResult::Result(v))) => break Ok(DaemonToClientReply::EvalResult(v)), + Ok(Err(e)) => break Err(RpcMessageError::TaskError(e)), + Err(e) => { + error!(error = ?e, "Error processing eval"); - Err(RpcMessageError::InternalError(e.to_string())) + break Err(RpcMessageError::InternalError(e.to_string())); + } } } } @@ -1095,7 +1107,12 @@ impl RpcServer { let publish = self.events_publish.lock().unwrap(); for (task_id, client_id, result) in completed { let result = match result { - Ok(v) => ClientEvent::TaskSuccess(task_id, v), + Ok(TaskResult::Result(v)) => ClientEvent::TaskSuccess(task_id, v), + Ok(TaskResult::Restarted(th)) => { + info!(?client_id, ?task_id, "Task restarted"); + th_q.insert(task_id, (client_id, th)); + continue; + } Err(e) => ClientEvent::TaskError(task_id, e), }; debug!(?client_id, ?task_id, ?result, "Task completed"); diff --git a/crates/kernel/src/tasks/mod.rs b/crates/kernel/src/tasks/mod.rs index 4eb281b6..8e2f7ec2 100644 --- a/crates/kernel/src/tasks/mod.rs +++ b/crates/kernel/src/tasks/mod.rs @@ -42,7 +42,17 @@ pub const DEFAULT_BG_SECONDS: u64 = 3; pub const DEFAULT_MAX_STACK_DEPTH: usize = 50; /// Just a handle to a task, with a receiver for the result. -pub struct TaskHandle(TaskId, oneshot::Receiver>); +pub struct TaskHandle( + TaskId, + oneshot::Receiver>, +); + +// Results from a task which are either a value or a notification to the task Q to restart the +// task. +pub enum TaskResult { + Result(Var), + Restarted(TaskHandle), +} impl Debug for TaskHandle { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -58,11 +68,11 @@ impl TaskHandle { } /// Dissolve the handle into a receiver for the result. - pub fn into_receiver(self) -> oneshot::Receiver> { + pub fn into_receiver(self) -> oneshot::Receiver> { self.1 } - pub fn receiver(&self) -> &oneshot::Receiver> { + pub fn receiver(&self) -> &oneshot::Receiver> { &self.1 } } @@ -245,7 +255,7 @@ pub mod scheduler_test_utils { use moor_values::tasks::{CommandError, SchedulerError}; use moor_values::{Error::E_VERBNF, Obj, Var, SYSTEM_OBJECT}; - use super::TaskHandle; + use super::{TaskHandle, TaskResult}; use crate::config::Config; use crate::tasks::scheduler_client::SchedulerClient; use crate::tasks::sessions::Session; @@ -275,7 +285,8 @@ pub mod scheduler_test_utils { Err(TaskAbortedException(Exception { code, .. })) => Ok(code.into()), Err(CommandExecutionError(CommandError::NoCommandMatch)) => Ok(E_VERBNF.into()), Err(err) => Err(err), - Ok(var) => Ok(var), + Ok(TaskResult::Result(var)) => Ok(var), + Ok(TaskResult::Restarted(_)) => panic!("Unexpected task restart"), } } diff --git a/crates/kernel/src/tasks/scheduler.rs b/crates/kernel/src/tasks/scheduler.rs index 50b6ad14..7e6038b1 100644 --- a/crates/kernel/src/tasks/scheduler.rs +++ b/crates/kernel/src/tasks/scheduler.rs @@ -40,8 +40,8 @@ use crate::tasks::task::Task; use crate::tasks::task_scheduler_client::{TaskControlMsg, TaskSchedulerClient}; use crate::tasks::tasks_db::TasksDb; use crate::tasks::{ - ServerOptions, TaskHandle, TaskStart, DEFAULT_BG_SECONDS, DEFAULT_BG_TICKS, DEFAULT_FG_SECONDS, - DEFAULT_FG_TICKS, DEFAULT_MAX_STACK_DEPTH, + ServerOptions, TaskHandle, TaskResult, TaskStart, DEFAULT_BG_SECONDS, DEFAULT_BG_TICKS, + DEFAULT_FG_SECONDS, DEFAULT_FG_TICKS, DEFAULT_MAX_STACK_DEPTH, }; use crate::textdump::{make_textdump, TextdumpWriter}; use crate::vm::Fork; @@ -115,7 +115,7 @@ struct RunningTaskControl { /// The connection-session for this task. session: Arc, /// A mailbox to deliver the result of the task to a waiting party with a subscription, if any. - result_sender: Option>>, + result_sender: Option>>, } /// The internal state of the task queue. @@ -1452,7 +1452,7 @@ impl TaskQ { mut task: Task, resume_val: Var, session: Arc, - result_sender: Option>>, + result_sender: Option>>, control_sender: &Sender<(TaskId, TaskControlMsg)>, database: &dyn Database, builtin_registry: Arc, @@ -1522,7 +1522,8 @@ impl TaskQ { if result_sender.is_closed() { return; } - if result_sender.send(result.clone()).is_err() { + let result = result.map(|v| TaskResult::Result(v.clone())); + if result_sender.send(result).is_err() { error!("Notify to task {} failed", task_id); } } @@ -1548,12 +1549,11 @@ impl TaskQ { .remove(&task.task_id) .expect("Task not found for retry"); - info!("Retrying task {}", task.task_id); - // Grab the "task start" record from the (now dead) task, and submit this again with the same // task_id. let task_start = task.task_start.clone(); - if let Err(e) = self.start_task_thread( + + match self.start_task_thread( task.task_id, task_start, &old_tc.player, @@ -1566,8 +1566,19 @@ impl TaskQ { builtin_registry, config, ) { - error!(?e, "Could not restart task"); - } + Ok(th) => { + // Replacement task handle now exists, we need to send a message to the daemon to + // let it know that, otherwise it will sit hanging waiting on the old one forever. + old_tc + .result_sender + .expect("No result sender for retry") + .send(Ok(TaskResult::Restarted(th))) + .expect("Could not send retry result"); + } + Err(e) => { + error!(error = ?e, "Could not retry task: {:?}", e); + } + }; } #[instrument(skip(self))] diff --git a/crates/kernel/src/tasks/suspension.rs b/crates/kernel/src/tasks/suspension.rs index 011b2ba1..0c2091cf 100644 --- a/crates/kernel/src/tasks/suspension.rs +++ b/crates/kernel/src/tasks/suspension.rs @@ -23,11 +23,11 @@ use bincode::{BorrowDecode, Decode, Encode}; use tracing::{debug, error, info, warn}; use uuid::Uuid; -use moor_values::{Obj, Var}; +use moor_values::Obj; use crate::tasks::sessions::{NoopClientSession, Session, SessionFactory}; use crate::tasks::task::Task; -use crate::tasks::{TaskDescription, TasksDb}; +use crate::tasks::{TaskDescription, TaskResult, TasksDb}; use moor_values::tasks::{SchedulerError, TaskId}; /// State a suspended task sits in inside the `suspended` side of the task queue. @@ -36,7 +36,7 @@ pub struct SuspendedTask { pub wake_condition: WakeCondition, pub task: Task, pub session: Arc, - pub result_sender: Option>>, + pub result_sender: Option>>, } /// Possible conditions in which a suspended task can wake from suspension. @@ -118,7 +118,7 @@ impl SuspensionQ { wake_condition: WakeCondition, task: Task, session: Arc, - result_sender: Option>>, + result_sender: Option>>, ) { let task_id = task.task_id; let sr = SuspendedTask {