Skip to content

Commit

Permalink
Fix forever-blocking on results for retries on commit conflict
Browse files Browse the repository at this point in the history
The TaskHandle for a task that is re-run due to commit-conflict would
sit blocking forever because the scheduler in fact has created a new
one.

This change propagates out a new task handle up to the daemon to let
it know that it needs to listen on that instead, and replace the the
task handle it was working with.

Found while doing concurrent tx model checking.
  • Loading branch information
rdaum committed Dec 1, 2024
1 parent 403cc07 commit 62cea30
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 48 deletions.
75 changes: 46 additions & 29 deletions crates/daemon/src/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use moor_db::DatabaseFlavour;
use moor_kernel::config::Config;
use moor_kernel::tasks::sessions::SessionError::DeliveryError;
use moor_kernel::tasks::sessions::{Session, SessionError};
use moor_kernel::tasks::TaskHandle;
use moor_kernel::tasks::{TaskHandle, TaskResult};
use moor_kernel::SchedulerClient;
use moor_values::matching::command_parse::preposition_to_string;
use moor_values::model::{Named, ObjectRef, PropFlag, ValSet, VerbFlag};
Expand Down Expand Up @@ -758,7 +758,7 @@ impl RpcServer {
let Ok(session) = self.clone().new_session(client_id, connection.clone()) else {
return Err(RpcMessageError::CreateSessionFailed);
};
let task_handle = match scheduler_client.submit_verb_task(
let mut task_handle = match scheduler_client.submit_verb_task(
connection,
&ObjectRef::Id(handler_object.clone()),
Symbol::mk("do_login_command"),
Expand All @@ -774,29 +774,35 @@ impl RpcServer {
return Err(RpcMessageError::InternalError(e.to_string()));
}
};
let receiver = task_handle.into_receiver();
let player = match receiver.recv() {
Ok(Ok(v)) => {
// If v is an objid, we have a successful login and we need to rewrite this
// client id to use the player objid and then return a result to the client.
// with its new player objid and login result.
// If it's not an objid, that's considered an auth failure.
match v.variant() {
Variant::Obj(o) => o.clone(),
_ => {
return Ok(LoginResult(None));
let player = loop {
let receiver = task_handle.into_receiver();
match receiver.recv() {
Ok(Ok(TaskResult::Restarted(th))) => {
task_handle = th;
continue;
}
Ok(Ok(TaskResult::Result(v))) => {
// If v is an objid, we have a successful login and we need to rewrite this
// client id to use the player objid and then return a result to the client.
// with its new player objid and login result.
// If it's not an objid, that's considered an auth failure.
match v.variant() {
Variant::Obj(o) => break o.clone(),
_ => {
return Ok(LoginResult(None));
}
}
}
}
Ok(Err(e)) => {
error!(error = ?e, "Error waiting for login results");
Ok(Err(e)) => {
error!(error = ?e, "Error waiting for login results");

return Err(RpcMessageError::LoginTaskFailed);
}
Err(e) => {
error!(error = ?e, "Error waiting for login results");
return Err(RpcMessageError::LoginTaskFailed);
}
Err(e) => {
error!(error = ?e, "Error waiting for login results");

return Err(RpcMessageError::InternalError(e.to_string()));
return Err(RpcMessageError::InternalError(e.to_string()));
}
}
};

Expand Down Expand Up @@ -985,7 +991,7 @@ impl RpcServer {
return Err(RpcMessageError::CreateSessionFailed);
};

let task_handle = match scheduler_client.submit_eval_task(
let mut task_handle = match scheduler_client.submit_eval_task(
connection,
connection,
expression,
Expand All @@ -998,13 +1004,19 @@ impl RpcServer {
return Err(RpcMessageError::InternalError(e.to_string()));
}
};
match task_handle.into_receiver().recv() {
Ok(Ok(v)) => Ok(DaemonToClientReply::EvalResult(v)),
Ok(Err(e)) => Err(RpcMessageError::TaskError(e)),
Err(e) => {
error!(error = ?e, "Error processing eval");
loop {
match task_handle.into_receiver().recv() {
Ok(Ok(TaskResult::Restarted(th))) => {
task_handle = th;
continue;
}
Ok(Ok(TaskResult::Result(v))) => break Ok(DaemonToClientReply::EvalResult(v)),
Ok(Err(e)) => break Err(RpcMessageError::TaskError(e)),
Err(e) => {
error!(error = ?e, "Error processing eval");

Err(RpcMessageError::InternalError(e.to_string()))
break Err(RpcMessageError::InternalError(e.to_string()));
}
}
}
}
Expand Down Expand Up @@ -1095,7 +1107,12 @@ impl RpcServer {
let publish = self.events_publish.lock().unwrap();
for (task_id, client_id, result) in completed {
let result = match result {
Ok(v) => ClientEvent::TaskSuccess(task_id, v),
Ok(TaskResult::Result(v)) => ClientEvent::TaskSuccess(task_id, v),
Ok(TaskResult::Restarted(th)) => {
info!(?client_id, ?task_id, "Task restarted");
th_q.insert(task_id, (client_id, th));
continue;
}
Err(e) => ClientEvent::TaskError(task_id, e),
};
debug!(?client_id, ?task_id, ?result, "Task completed");
Expand Down
21 changes: 16 additions & 5 deletions crates/kernel/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,17 @@ pub const DEFAULT_BG_SECONDS: u64 = 3;
pub const DEFAULT_MAX_STACK_DEPTH: usize = 50;

/// Just a handle to a task, with a receiver for the result.
pub struct TaskHandle(TaskId, oneshot::Receiver<Result<Var, SchedulerError>>);
pub struct TaskHandle(
TaskId,
oneshot::Receiver<Result<TaskResult, SchedulerError>>,
);

// Results from a task which are either a value or a notification to the task Q to restart the
// task.
pub enum TaskResult {
Result(Var),
Restarted(TaskHandle),
}

impl Debug for TaskHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -58,11 +68,11 @@ impl TaskHandle {
}

/// Dissolve the handle into a receiver for the result.
pub fn into_receiver(self) -> oneshot::Receiver<Result<Var, SchedulerError>> {
pub fn into_receiver(self) -> oneshot::Receiver<Result<TaskResult, SchedulerError>> {
self.1
}

pub fn receiver(&self) -> &oneshot::Receiver<Result<Var, SchedulerError>> {
pub fn receiver(&self) -> &oneshot::Receiver<Result<TaskResult, SchedulerError>> {
&self.1
}
}
Expand Down Expand Up @@ -245,7 +255,7 @@ pub mod scheduler_test_utils {
use moor_values::tasks::{CommandError, SchedulerError};
use moor_values::{Error::E_VERBNF, Obj, Var, SYSTEM_OBJECT};

use super::TaskHandle;
use super::{TaskHandle, TaskResult};
use crate::config::Config;
use crate::tasks::scheduler_client::SchedulerClient;
use crate::tasks::sessions::Session;
Expand Down Expand Up @@ -275,7 +285,8 @@ pub mod scheduler_test_utils {
Err(TaskAbortedException(Exception { code, .. })) => Ok(code.into()),
Err(CommandExecutionError(CommandError::NoCommandMatch)) => Ok(E_VERBNF.into()),
Err(err) => Err(err),
Ok(var) => Ok(var),
Ok(TaskResult::Result(var)) => Ok(var),
Ok(TaskResult::Restarted(_)) => panic!("Unexpected task restart"),
}
}

Expand Down
31 changes: 21 additions & 10 deletions crates/kernel/src/tasks/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use crate::tasks::task::Task;
use crate::tasks::task_scheduler_client::{TaskControlMsg, TaskSchedulerClient};
use crate::tasks::tasks_db::TasksDb;
use crate::tasks::{
ServerOptions, TaskHandle, TaskStart, DEFAULT_BG_SECONDS, DEFAULT_BG_TICKS, DEFAULT_FG_SECONDS,
DEFAULT_FG_TICKS, DEFAULT_MAX_STACK_DEPTH,
ServerOptions, TaskHandle, TaskResult, TaskStart, DEFAULT_BG_SECONDS, DEFAULT_BG_TICKS,
DEFAULT_FG_SECONDS, DEFAULT_FG_TICKS, DEFAULT_MAX_STACK_DEPTH,
};
use crate::textdump::{make_textdump, TextdumpWriter};
use crate::vm::Fork;
Expand Down Expand Up @@ -115,7 +115,7 @@ struct RunningTaskControl {
/// The connection-session for this task.
session: Arc<dyn Session>,
/// A mailbox to deliver the result of the task to a waiting party with a subscription, if any.
result_sender: Option<oneshot::Sender<Result<Var, SchedulerError>>>,
result_sender: Option<oneshot::Sender<Result<TaskResult, SchedulerError>>>,
}

/// The internal state of the task queue.
Expand Down Expand Up @@ -1452,7 +1452,7 @@ impl TaskQ {
mut task: Task,
resume_val: Var,
session: Arc<dyn Session>,
result_sender: Option<oneshot::Sender<Result<Var, SchedulerError>>>,
result_sender: Option<oneshot::Sender<Result<TaskResult, SchedulerError>>>,
control_sender: &Sender<(TaskId, TaskControlMsg)>,
database: &dyn Database,
builtin_registry: Arc<BuiltinRegistry>,
Expand Down Expand Up @@ -1522,7 +1522,8 @@ impl TaskQ {
if result_sender.is_closed() {
return;
}
if result_sender.send(result.clone()).is_err() {
let result = result.map(|v| TaskResult::Result(v.clone()));
if result_sender.send(result).is_err() {
error!("Notify to task {} failed", task_id);
}
}
Expand All @@ -1548,12 +1549,11 @@ impl TaskQ {
.remove(&task.task_id)
.expect("Task not found for retry");

info!("Retrying task {}", task.task_id);

// Grab the "task start" record from the (now dead) task, and submit this again with the same
// task_id.
let task_start = task.task_start.clone();
if let Err(e) = self.start_task_thread(

match self.start_task_thread(
task.task_id,
task_start,
&old_tc.player,
Expand All @@ -1566,8 +1566,19 @@ impl TaskQ {
builtin_registry,
config,
) {
error!(?e, "Could not restart task");
}
Ok(th) => {
// Replacement task handle now exists, we need to send a message to the daemon to
// let it know that, otherwise it will sit hanging waiting on the old one forever.
old_tc
.result_sender
.expect("No result sender for retry")
.send(Ok(TaskResult::Restarted(th)))
.expect("Could not send retry result");
}
Err(e) => {
error!(error = ?e, "Could not retry task: {:?}", e);
}
};
}

#[instrument(skip(self))]
Expand Down
8 changes: 4 additions & 4 deletions crates/kernel/src/tasks/suspension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use bincode::{BorrowDecode, Decode, Encode};
use tracing::{debug, error, info, warn};
use uuid::Uuid;

use moor_values::{Obj, Var};
use moor_values::Obj;

use crate::tasks::sessions::{NoopClientSession, Session, SessionFactory};
use crate::tasks::task::Task;
use crate::tasks::{TaskDescription, TasksDb};
use crate::tasks::{TaskDescription, TaskResult, TasksDb};
use moor_values::tasks::{SchedulerError, TaskId};

/// State a suspended task sits in inside the `suspended` side of the task queue.
Expand All @@ -36,7 +36,7 @@ pub struct SuspendedTask {
pub wake_condition: WakeCondition,
pub task: Task,
pub session: Arc<dyn Session>,
pub result_sender: Option<oneshot::Sender<Result<Var, SchedulerError>>>,
pub result_sender: Option<oneshot::Sender<Result<TaskResult, SchedulerError>>>,
}

/// Possible conditions in which a suspended task can wake from suspension.
Expand Down Expand Up @@ -118,7 +118,7 @@ impl SuspensionQ {
wake_condition: WakeCondition,
task: Task,
session: Arc<dyn Session>,
result_sender: Option<oneshot::Sender<Result<Var, SchedulerError>>>,
result_sender: Option<oneshot::Sender<Result<TaskResult, SchedulerError>>>,
) {
let task_id = task.task_id;
let sr = SuspendedTask {
Expand Down

0 comments on commit 62cea30

Please sign in to comment.