Skip to content

Commit

Permalink
Adds a new "model-checker" to do tx consistency check
Browse files Browse the repository at this point in the history
#31

A program which runs a series of concurrent workloads of reading and
appending to lists on a set of random properties, in a number of
concurrent transactions. It then logs them into an "EDN" format file
that the `elle` tool from `jepsen` can read and check for
transactional isolation inconsistencies.

With this we are able to prove whether or not `moor` enforces strict
serializable isolation. At least in this (for now) limited use case.

Using this tool I was able to test the conflict-retry logic under
load, as well as find some bugs in it.

Testing shows that with the fjall-db backend and for this list-append
workload, moor behaves correctly with `serializable` isolation, but does
not conform to elle's definition of `strict-serializable`
  • Loading branch information
rdaum committed Dec 1, 2024
1 parent 6a5f538 commit 403cc07
Show file tree
Hide file tree
Showing 13 changed files with 1,143 additions and 115 deletions.
121 changes: 110 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ members = [
"crates/telnet-host",
"crates/values",
"crates/web-host",
"crates/model-checker",
]
default-members = [
"crates/values",
Expand Down Expand Up @@ -158,3 +159,6 @@ signal-hook = "0.3"

# For the telnet host
termimad = "0.31"

# For the consistency checker in `model-checker`
edn-format = "3.3.0"
4 changes: 2 additions & 2 deletions crates/daemon/src/connections_fjall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,10 @@ impl ConnectionsDB for ConnectionsFjall {
panic!("Conflict retry on new connection");
}
RelationalError::Duplicate(_d) => {
return RpcMessageError::AlreadyConnected;
RpcMessageError::AlreadyConnected
}
RelationalError::NotFound => {
return RpcMessageError::CreateSessionFailed;
RpcMessageError::CreateSessionFailed
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions crates/daemon/src/connections_wt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,10 @@ impl ConnectionsDB for ConnectionsWT {
panic!("Conflict retry on new connection");
}
RelationalError::Duplicate(_d) => {
return RpcMessageError::AlreadyConnected;
RpcMessageError::AlreadyConnected
}
RelationalError::NotFound => {
return RpcMessageError::CreateSessionFailed;
RpcMessageError::CreateSessionFailed
}
})
}
Expand Down
12 changes: 5 additions & 7 deletions crates/daemon/src/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ impl RpcServer {
t_rpc_server.ping_pong().expect("Unable to play ping-pong");
})?;

// We need to bind a generic publisher to the narrative endpoint, so that subsequent sessions
// are visible...
let rpc_socket = self.zmq_context.socket(zmq::REP)?;
rpc_socket.bind(&rpc_endpoint)?;

Expand Down Expand Up @@ -912,7 +910,7 @@ impl RpcServer {
let task_id = parse_command_task_handle.task_id();
let mut th_q = self.task_handles.lock().unwrap();
th_q.insert(task_id, (client_id, parse_command_task_handle));
Ok(DaemonToClientReply::CommandSubmitted(task_id))
Ok(DaemonToClientReply::TaskSubmitted(task_id))
}

fn respond_input(
Expand Down Expand Up @@ -973,7 +971,7 @@ impl RpcServer {
// let the session run to completion on its own and output back to the client.
// Maybe we should be returning a value from this for the future, but the way clients are
// written right now, there's little point.
Ok(DaemonToClientReply::CommandSubmitted(task_handle.task_id()))
Ok(DaemonToClientReply::TaskSubmitted(task_handle.task_id()))
}

fn eval(
Expand Down Expand Up @@ -1043,7 +1041,7 @@ impl RpcServer {
let task_id = task_handle.task_id();
let mut th_q = self.task_handles.lock().unwrap();
th_q.insert(task_id, (client_id, task_handle));
Ok(DaemonToClientReply::CommandSubmitted(task_id))
Ok(DaemonToClientReply::TaskSubmitted(task_id))
}

fn program_verb(
Expand Down Expand Up @@ -1097,8 +1095,8 @@ impl RpcServer {
let publish = self.events_publish.lock().unwrap();
for (task_id, client_id, result) in completed {
let result = match result {
Ok(v) => ClientEvent::TaskSuccess(v),
Err(e) => ClientEvent::TaskError(e),
Ok(v) => ClientEvent::TaskSuccess(task_id, v),
Err(e) => ClientEvent::TaskError(task_id, e),
};
debug!(?client_id, ?task_id, ?result, "Task completed");
let payload = bincode::encode_to_vec(&result, bincode::config::standard())
Expand Down
3 changes: 3 additions & 0 deletions crates/kernel/src/tasks/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ impl Scheduler {

let program = compile(code.join("\n").as_str(), self.config.compile_options())
.map_err(|e| {
// TODO: just dumping a string here sucks.
VerbProgramFailed(VerbProgramError::CompilationError(vec![format!("{:?}", e)]))
})?;

Expand Down Expand Up @@ -1547,6 +1548,8 @@ impl TaskQ {
.remove(&task.task_id)
.expect("Task not found for retry");

info!("Retrying task {}", task.task_id);

// Grab the "task start" record from the (now dead) task, and submit this again with the same
// task_id.
let task_start = task.task_start.clone();
Expand Down
2 changes: 1 addition & 1 deletion crates/kernel/src/tasks/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl Task {

let CommitResult::Success = world_state.commit().expect("Could not attempt commit")
else {
warn!("Conflict during commit before complete, asking scheduler to retry task");
warn!("Conflict during commit before complete, asking scheduler to retry task ({})", self.task_id);
task_scheduler_client.conflict_retry(self);
return None;
};
Expand Down
Loading

0 comments on commit 403cc07

Please sign in to comment.