From 62cea300c49a010aa65bbe7bccf7cfb8c3988a6e Mon Sep 17 00:00:00 2001 From: Ryan Daum Date: Sun, 1 Dec 2024 14:09:23 -0500 Subject: [PATCH] Fix forever-blocking on results for retries on commit conflict The TaskHandle for a task that is re-run due to commit-conflict would sit blocking forever because the scheduler in fact has created a new one. This change propagates out a new task handle up to the daemon to let it know that it needs to listen on that instead, and replace the the task handle it was working with. Found while doing concurrent tx model checking. --- crates/daemon/src/rpc_server.rs | 75 ++++++++++++++++----------- crates/kernel/src/tasks/mod.rs | 21 ++++++-- crates/kernel/src/tasks/scheduler.rs | 31 +++++++---- crates/kernel/src/tasks/suspension.rs | 8 +-- 4 files changed, 87 insertions(+), 48 deletions(-) diff --git a/crates/daemon/src/rpc_server.rs b/crates/daemon/src/rpc_server.rs index 82f483b5..ffc741ee 100644 --- a/crates/daemon/src/rpc_server.rs +++ b/crates/daemon/src/rpc_server.rs @@ -33,7 +33,7 @@ use moor_db::DatabaseFlavour; use moor_kernel::config::Config; use moor_kernel::tasks::sessions::SessionError::DeliveryError; use moor_kernel::tasks::sessions::{Session, SessionError}; -use moor_kernel::tasks::TaskHandle; +use moor_kernel::tasks::{TaskHandle, TaskResult}; use moor_kernel::SchedulerClient; use moor_values::matching::command_parse::preposition_to_string; use moor_values::model::{Named, ObjectRef, PropFlag, ValSet, VerbFlag}; @@ -758,7 +758,7 @@ impl RpcServer { let Ok(session) = self.clone().new_session(client_id, connection.clone()) else { return Err(RpcMessageError::CreateSessionFailed); }; - let task_handle = match scheduler_client.submit_verb_task( + let mut task_handle = match scheduler_client.submit_verb_task( connection, &ObjectRef::Id(handler_object.clone()), Symbol::mk("do_login_command"), @@ -774,29 +774,35 @@ impl RpcServer { return Err(RpcMessageError::InternalError(e.to_string())); } }; - let receiver = task_handle.into_receiver(); - let player = match receiver.recv() { - Ok(Ok(v)) => { - // If v is an objid, we have a successful login and we need to rewrite this - // client id to use the player objid and then return a result to the client. - // with its new player objid and login result. - // If it's not an objid, that's considered an auth failure. - match v.variant() { - Variant::Obj(o) => o.clone(), - _ => { - return Ok(LoginResult(None)); + let player = loop { + let receiver = task_handle.into_receiver(); + match receiver.recv() { + Ok(Ok(TaskResult::Restarted(th))) => { + task_handle = th; + continue; + } + Ok(Ok(TaskResult::Result(v))) => { + // If v is an objid, we have a successful login and we need to rewrite this + // client id to use the player objid and then return a result to the client. + // with its new player objid and login result. + // If it's not an objid, that's considered an auth failure. + match v.variant() { + Variant::Obj(o) => break o.clone(), + _ => { + return Ok(LoginResult(None)); + } } } - } - Ok(Err(e)) => { - error!(error = ?e, "Error waiting for login results"); + Ok(Err(e)) => { + error!(error = ?e, "Error waiting for login results"); - return Err(RpcMessageError::LoginTaskFailed); - } - Err(e) => { - error!(error = ?e, "Error waiting for login results"); + return Err(RpcMessageError::LoginTaskFailed); + } + Err(e) => { + error!(error = ?e, "Error waiting for login results"); - return Err(RpcMessageError::InternalError(e.to_string())); + return Err(RpcMessageError::InternalError(e.to_string())); + } } }; @@ -985,7 +991,7 @@ impl RpcServer { return Err(RpcMessageError::CreateSessionFailed); }; - let task_handle = match scheduler_client.submit_eval_task( + let mut task_handle = match scheduler_client.submit_eval_task( connection, connection, expression, @@ -998,13 +1004,19 @@ impl RpcServer { return Err(RpcMessageError::InternalError(e.to_string())); } }; - match task_handle.into_receiver().recv() { - Ok(Ok(v)) => Ok(DaemonToClientReply::EvalResult(v)), - Ok(Err(e)) => Err(RpcMessageError::TaskError(e)), - Err(e) => { - error!(error = ?e, "Error processing eval"); + loop { + match task_handle.into_receiver().recv() { + Ok(Ok(TaskResult::Restarted(th))) => { + task_handle = th; + continue; + } + Ok(Ok(TaskResult::Result(v))) => break Ok(DaemonToClientReply::EvalResult(v)), + Ok(Err(e)) => break Err(RpcMessageError::TaskError(e)), + Err(e) => { + error!(error = ?e, "Error processing eval"); - Err(RpcMessageError::InternalError(e.to_string())) + break Err(RpcMessageError::InternalError(e.to_string())); + } } } } @@ -1095,7 +1107,12 @@ impl RpcServer { let publish = self.events_publish.lock().unwrap(); for (task_id, client_id, result) in completed { let result = match result { - Ok(v) => ClientEvent::TaskSuccess(task_id, v), + Ok(TaskResult::Result(v)) => ClientEvent::TaskSuccess(task_id, v), + Ok(TaskResult::Restarted(th)) => { + info!(?client_id, ?task_id, "Task restarted"); + th_q.insert(task_id, (client_id, th)); + continue; + } Err(e) => ClientEvent::TaskError(task_id, e), }; debug!(?client_id, ?task_id, ?result, "Task completed"); diff --git a/crates/kernel/src/tasks/mod.rs b/crates/kernel/src/tasks/mod.rs index 4eb281b6..8e2f7ec2 100644 --- a/crates/kernel/src/tasks/mod.rs +++ b/crates/kernel/src/tasks/mod.rs @@ -42,7 +42,17 @@ pub const DEFAULT_BG_SECONDS: u64 = 3; pub const DEFAULT_MAX_STACK_DEPTH: usize = 50; /// Just a handle to a task, with a receiver for the result. -pub struct TaskHandle(TaskId, oneshot::Receiver>); +pub struct TaskHandle( + TaskId, + oneshot::Receiver>, +); + +// Results from a task which are either a value or a notification to the task Q to restart the +// task. +pub enum TaskResult { + Result(Var), + Restarted(TaskHandle), +} impl Debug for TaskHandle { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -58,11 +68,11 @@ impl TaskHandle { } /// Dissolve the handle into a receiver for the result. - pub fn into_receiver(self) -> oneshot::Receiver> { + pub fn into_receiver(self) -> oneshot::Receiver> { self.1 } - pub fn receiver(&self) -> &oneshot::Receiver> { + pub fn receiver(&self) -> &oneshot::Receiver> { &self.1 } } @@ -245,7 +255,7 @@ pub mod scheduler_test_utils { use moor_values::tasks::{CommandError, SchedulerError}; use moor_values::{Error::E_VERBNF, Obj, Var, SYSTEM_OBJECT}; - use super::TaskHandle; + use super::{TaskHandle, TaskResult}; use crate::config::Config; use crate::tasks::scheduler_client::SchedulerClient; use crate::tasks::sessions::Session; @@ -275,7 +285,8 @@ pub mod scheduler_test_utils { Err(TaskAbortedException(Exception { code, .. })) => Ok(code.into()), Err(CommandExecutionError(CommandError::NoCommandMatch)) => Ok(E_VERBNF.into()), Err(err) => Err(err), - Ok(var) => Ok(var), + Ok(TaskResult::Result(var)) => Ok(var), + Ok(TaskResult::Restarted(_)) => panic!("Unexpected task restart"), } } diff --git a/crates/kernel/src/tasks/scheduler.rs b/crates/kernel/src/tasks/scheduler.rs index 50b6ad14..7e6038b1 100644 --- a/crates/kernel/src/tasks/scheduler.rs +++ b/crates/kernel/src/tasks/scheduler.rs @@ -40,8 +40,8 @@ use crate::tasks::task::Task; use crate::tasks::task_scheduler_client::{TaskControlMsg, TaskSchedulerClient}; use crate::tasks::tasks_db::TasksDb; use crate::tasks::{ - ServerOptions, TaskHandle, TaskStart, DEFAULT_BG_SECONDS, DEFAULT_BG_TICKS, DEFAULT_FG_SECONDS, - DEFAULT_FG_TICKS, DEFAULT_MAX_STACK_DEPTH, + ServerOptions, TaskHandle, TaskResult, TaskStart, DEFAULT_BG_SECONDS, DEFAULT_BG_TICKS, + DEFAULT_FG_SECONDS, DEFAULT_FG_TICKS, DEFAULT_MAX_STACK_DEPTH, }; use crate::textdump::{make_textdump, TextdumpWriter}; use crate::vm::Fork; @@ -115,7 +115,7 @@ struct RunningTaskControl { /// The connection-session for this task. session: Arc, /// A mailbox to deliver the result of the task to a waiting party with a subscription, if any. - result_sender: Option>>, + result_sender: Option>>, } /// The internal state of the task queue. @@ -1452,7 +1452,7 @@ impl TaskQ { mut task: Task, resume_val: Var, session: Arc, - result_sender: Option>>, + result_sender: Option>>, control_sender: &Sender<(TaskId, TaskControlMsg)>, database: &dyn Database, builtin_registry: Arc, @@ -1522,7 +1522,8 @@ impl TaskQ { if result_sender.is_closed() { return; } - if result_sender.send(result.clone()).is_err() { + let result = result.map(|v| TaskResult::Result(v.clone())); + if result_sender.send(result).is_err() { error!("Notify to task {} failed", task_id); } } @@ -1548,12 +1549,11 @@ impl TaskQ { .remove(&task.task_id) .expect("Task not found for retry"); - info!("Retrying task {}", task.task_id); - // Grab the "task start" record from the (now dead) task, and submit this again with the same // task_id. let task_start = task.task_start.clone(); - if let Err(e) = self.start_task_thread( + + match self.start_task_thread( task.task_id, task_start, &old_tc.player, @@ -1566,8 +1566,19 @@ impl TaskQ { builtin_registry, config, ) { - error!(?e, "Could not restart task"); - } + Ok(th) => { + // Replacement task handle now exists, we need to send a message to the daemon to + // let it know that, otherwise it will sit hanging waiting on the old one forever. + old_tc + .result_sender + .expect("No result sender for retry") + .send(Ok(TaskResult::Restarted(th))) + .expect("Could not send retry result"); + } + Err(e) => { + error!(error = ?e, "Could not retry task: {:?}", e); + } + }; } #[instrument(skip(self))] diff --git a/crates/kernel/src/tasks/suspension.rs b/crates/kernel/src/tasks/suspension.rs index 011b2ba1..0c2091cf 100644 --- a/crates/kernel/src/tasks/suspension.rs +++ b/crates/kernel/src/tasks/suspension.rs @@ -23,11 +23,11 @@ use bincode::{BorrowDecode, Decode, Encode}; use tracing::{debug, error, info, warn}; use uuid::Uuid; -use moor_values::{Obj, Var}; +use moor_values::Obj; use crate::tasks::sessions::{NoopClientSession, Session, SessionFactory}; use crate::tasks::task::Task; -use crate::tasks::{TaskDescription, TasksDb}; +use crate::tasks::{TaskDescription, TaskResult, TasksDb}; use moor_values::tasks::{SchedulerError, TaskId}; /// State a suspended task sits in inside the `suspended` side of the task queue. @@ -36,7 +36,7 @@ pub struct SuspendedTask { pub wake_condition: WakeCondition, pub task: Task, pub session: Arc, - pub result_sender: Option>>, + pub result_sender: Option>>, } /// Possible conditions in which a suspended task can wake from suspension. @@ -118,7 +118,7 @@ impl SuspensionQ { wake_condition: WakeCondition, task: Task, session: Arc, - result_sender: Option>>, + result_sender: Option>>, ) { let task_id = task.task_id; let sr = SuspendedTask {