Skip to content

Commit

Permalink
fix(gstd): Fix deadlocks in mutex/rw-lock (#2906)
Browse files Browse the repository at this point in the history
  • Loading branch information
DennisInSky authored Jul 6, 2023
1 parent 758c72d commit 7625bbd
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 14 deletions.
53 changes: 44 additions & 9 deletions examples/binaries/waiter/src/code.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use crate::{Command, SleepForWaitType, WaitSubcommand};
use crate::{
Command, MxLockContinuation, RwLockContinuation, RwLockType, SleepForWaitType, WaitSubcommand,
};
use futures::future;

use gstd::{errors::Error, exec, format, msg, MessageId};
use gstd::{errors::Error, exec, format, lock, msg, MessageId};

fn process_wait_subcommand(subcommand: WaitSubcommand) {
match subcommand {
WaitSubcommand::Wait => exec::wait(),
WaitSubcommand::WaitFor(duration) => exec::wait_for(duration),
WaitSubcommand::WaitUpTo(duration) => exec::wait_up_to(duration),
}
}
static mut MUTEX: lock::Mutex<()> = lock::Mutex::new(());
static mut RW_LOCK: lock::RwLock<()> = lock::RwLock::new(());

#[gstd::async_main]
async fn main() {
Expand Down Expand Up @@ -79,5 +76,43 @@ async fn main() {
Command::WakeUp(msg_id) => {
exec::wake(msg_id.into()).expect("Failed to wake up the message");
}
Command::MxLock(continuation) => {
let _lock_guard = unsafe { MUTEX.lock().await };
process_mx_lock_continuation(continuation).await;
}
Command::RwLock(lock_type, continuation) => {
match lock_type {
RwLockType::Read => {
let _lock_guard = unsafe { RW_LOCK.read().await };
process_rw_lock_continuation(continuation).await;
}
RwLockType::Write => {
let _lock_guard = unsafe { RW_LOCK.write().await };
process_rw_lock_continuation(continuation).await;
}
};
}
}
}

fn process_wait_subcommand(subcommand: WaitSubcommand) {
match subcommand {
WaitSubcommand::Wait => exec::wait(),
WaitSubcommand::WaitFor(duration) => exec::wait_for(duration),
WaitSubcommand::WaitUpTo(duration) => exec::wait_up_to(duration),
}
}

async fn process_mx_lock_continuation(continuation: MxLockContinuation) {
match continuation {
MxLockContinuation::Nothing => {}
MxLockContinuation::SleepFor(duration) => exec::sleep_for(duration).await,
}
}

async fn process_rw_lock_continuation(continuation: RwLockContinuation) {
match continuation {
RwLockContinuation::Nothing => {}
RwLockContinuation::SleepFor(duration) => exec::sleep_for(duration).await,
}
}
20 changes: 20 additions & 0 deletions examples/binaries/waiter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,24 @@ pub enum SleepForWaitType {
Any,
}

#[derive(Debug, Encode, Decode)]
pub enum MxLockContinuation {
Nothing,
SleepFor(u32),
}

#[derive(Debug, Encode, Decode)]
pub enum RwLockType {
Read,
Write,
}

#[derive(Debug, Encode, Decode)]
pub enum RwLockContinuation {
Nothing,
SleepFor(u32),
}

#[derive(Debug, Encode, Decode)]
pub enum Command {
Wait(WaitSubcommand),
Expand All @@ -65,4 +83,6 @@ pub enum Command {
ReplyAndWait(WaitSubcommand),
SleepFor(Vec<u32>, SleepForWaitType),
WakeUp([u8; 32]),
MxLock(MxLockContinuation),
RwLock(RwLockType, RwLockContinuation),
}
6 changes: 6 additions & 0 deletions gstd/src/lock/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ impl AccessQueue {
inner.as_mut().and_then(|v| v.pop_front())
}

pub fn contains(&self, message_id: &MessageId) -> bool {
let inner = unsafe { &*self.0.get() };

inner.as_ref().map_or(false, |v| v.contains(message_id))
}

pub const fn new() -> Self {
AccessQueue(UnsafeCell::new(None))
}
Expand Down
11 changes: 9 additions & 2 deletions gstd/src/lock/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,19 @@ impl<'a, T> Future for MutexLockFuture<'a, T> {
// In case of locked mutex and an `.await`, function `poll` checks if the
// mutex can be taken, else it waits (goes into *waiting queue*).
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let current_msg_id = crate::msg::id();
let lock = unsafe { &mut *self.mutex.locked.get() };
if lock.is_none() {
*lock = Some(crate::msg::id());
*lock = Some(current_msg_id);
Poll::Ready(MutexGuard { mutex: self.mutex })
} else {
self.mutex.queue.enqueue(crate::msg::id());
// If the message is already in the access queue, and we come here,
// it means the message has just been woken up from the waitlist.
// In that case we do not want to register yet another access attempt
// and just go back to the waitlist.
if !self.mutex.queue.contains(&current_msg_id) {
self.mutex.queue.enqueue(crate::msg::id());
}
Poll::Pending
}
}
Expand Down
20 changes: 17 additions & 3 deletions gstd/src/lock/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,19 @@ impl<'a, T> Future for RwLockReadFuture<'a, T> {
let readers = &self.lock.readers;
let readers_count = readers.get().saturating_add(1);

let current_msg_id = crate::msg::id();
let lock = unsafe { &mut *self.lock.locked.get() };
if lock.is_none() && readers_count <= READERS_LIMIT {
readers.replace(readers_count);
Poll::Ready(RwLockReadGuard { lock: self.lock })
} else {
self.lock.queue.enqueue(crate::msg::id());
// If the message is already in the access queue, and we come here,
// it means the message has just been woken up from the waitlist.
// In that case we do not want to register yet another access attempt
// and just go back to the waitlist.
if !self.lock.queue.contains(&current_msg_id) {
self.lock.queue.enqueue(current_msg_id);
}
Poll::Pending
}
}
Expand All @@ -324,12 +331,19 @@ impl<'a, T> Future for RwLockWriteFuture<'a, T> {
type Output = RwLockWriteGuard<'a, T>;

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let current_msg_id = crate::msg::id();
let lock = unsafe { &mut *self.lock.locked.get() };
if lock.is_none() && self.lock.readers.get() == 0 {
*lock = Some(crate::msg::id());
*lock = Some(current_msg_id);
Poll::Ready(RwLockWriteGuard { lock: self.lock })
} else {
self.lock.queue.enqueue(crate::msg::id());
// If the message is already in the access queue, and we come here,
// it means the message has just been woken up from the waitlist.
// In that case we do not want to register yet another access attempt
// and just go back to the waitlist.
if !self.lock.queue.contains(&current_msg_id) {
self.lock.queue.enqueue(current_msg_id);
}
Poll::Pending
}
}
Expand Down
197 changes: 197 additions & 0 deletions pallets/gear/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8555,6 +8555,203 @@ fn call_forbidden_function() {
});
}

#[test]
fn waking_message_waiting_for_mx_lock_does_not_lead_to_deadlock() {
use demo_waiter::{Command as WaiterCommand, MxLockContinuation, WASM_BINARY as WAITER_WASM};

let execution = || {
System::reset_events();

Gear::upload_program(
RuntimeOrigin::signed(USER_1),
WAITER_WASM.to_vec(),
DEFAULT_SALT.to_vec(),
EMPTY_PAYLOAD.to_vec(),
BlockGasLimitOf::<Test>::get(),
0,
)
.expect("Failed to upload Waiter");
let waiter_prog_id = get_last_program_id();
run_to_next_block(None);

let send_command_to_waiter = |command: WaiterCommand| {
MailboxOf::<Test>::clear();
Gear::send_message(
RuntimeOrigin::signed(USER_1),
waiter_prog_id,
command.encode(),
BlockGasLimitOf::<Test>::get(),
0,
)
.unwrap_or_else(|_| panic!("Failed to send command {:?} to Waiter", command));
let msg_id = get_last_message_id();
let msg_block_number = System::block_number() + 1;
run_to_next_block(None);
(msg_id, msg_block_number)
};

let (lock_owner_msg_id, _lock_owner_msg_block_number) =
send_command_to_waiter(WaiterCommand::MxLock(MxLockContinuation::SleepFor(4)));

let (lock_rival_1_msg_id, _) =
send_command_to_waiter(WaiterCommand::MxLock(MxLockContinuation::Nothing));

send_command_to_waiter(WaiterCommand::WakeUp(lock_rival_1_msg_id.into()));

let (lock_rival_2_msg_id, _) =
send_command_to_waiter(WaiterCommand::MxLock(MxLockContinuation::Nothing));

assert!(WaitlistOf::<Test>::contains(
&waiter_prog_id,
&lock_owner_msg_id
));
assert!(WaitlistOf::<Test>::contains(
&waiter_prog_id,
&lock_rival_1_msg_id
));
assert!(WaitlistOf::<Test>::contains(
&waiter_prog_id,
&lock_rival_2_msg_id
));

// Run for 1 block, so the lock owner wakes up after sleeping for 4 blocks,
// releases the mutex so the lock rival 1 can acquire and release it for
// the lock rival 2 to acquire it.
run_for_blocks(1, None);

assert_succeed(lock_owner_msg_id);
assert_succeed(lock_rival_1_msg_id);
assert_succeed(lock_rival_2_msg_id);
};

init_logger();
new_test_ext().execute_with(execution);
}

#[test]
fn waking_message_waiting_for_rw_lock_does_not_lead_to_deadlock() {
use demo_waiter::{
Command as WaiterCommand, RwLockContinuation, RwLockType, WASM_BINARY as WAITER_WASM,
};

let execution = || {
System::reset_events();

Gear::upload_program(
RuntimeOrigin::signed(USER_1),
WAITER_WASM.to_vec(),
DEFAULT_SALT.to_vec(),
EMPTY_PAYLOAD.to_vec(),
BlockGasLimitOf::<Test>::get(),
0,
)
.expect("Failed to upload Waiter");
let waiter_prog_id = get_last_program_id();
run_to_next_block(None);

let send_command_to_waiter = |command: WaiterCommand| {
MailboxOf::<Test>::clear();
Gear::send_message(
RuntimeOrigin::signed(USER_1),
waiter_prog_id,
command.encode(),
BlockGasLimitOf::<Test>::get(),
0,
)
.unwrap_or_else(|_| panic!("Failed to send command {:?} to Waiter", command));
let msg_id = get_last_message_id();
let msg_block_number = System::block_number() + 1;
run_to_next_block(None);
(msg_id, msg_block_number)
};

// For write lock
{
let (lock_owner_msg_id, _lock_owner_msg_block_number) = send_command_to_waiter(
WaiterCommand::RwLock(RwLockType::Read, RwLockContinuation::SleepFor(4)),
);

let (lock_rival_1_msg_id, _) = send_command_to_waiter(WaiterCommand::RwLock(
RwLockType::Write,
RwLockContinuation::Nothing,
));

send_command_to_waiter(WaiterCommand::WakeUp(lock_rival_1_msg_id.into()));

let (lock_rival_2_msg_id, _) = send_command_to_waiter(WaiterCommand::RwLock(
RwLockType::Write,
RwLockContinuation::Nothing,
));

assert!(WaitlistOf::<Test>::contains(
&waiter_prog_id,
&lock_owner_msg_id
));
assert!(WaitlistOf::<Test>::contains(
&waiter_prog_id,
&lock_rival_1_msg_id
));
assert!(WaitlistOf::<Test>::contains(
&waiter_prog_id,
&lock_rival_2_msg_id
));

// Run for 1 block, so the lock owner wakes up after sleeping for 4 blocks,
// releases the mutex so the lock rival 1 can acquire and release it for
// the lock rival 2 to acquire it.
run_for_blocks(1, None);

assert_succeed(lock_owner_msg_id);
assert_succeed(lock_rival_1_msg_id);
assert_succeed(lock_rival_2_msg_id);
}

// For read lock
{
let (lock_owner_msg_id, _lock_owner_msg_block_number) = send_command_to_waiter(
WaiterCommand::RwLock(RwLockType::Write, RwLockContinuation::SleepFor(4)),
);

let (lock_rival_1_msg_id, _) = send_command_to_waiter(WaiterCommand::RwLock(
RwLockType::Read,
RwLockContinuation::Nothing,
));

send_command_to_waiter(WaiterCommand::WakeUp(lock_rival_1_msg_id.into()));

let (lock_rival_2_msg_id, _) = send_command_to_waiter(WaiterCommand::RwLock(
RwLockType::Write,
RwLockContinuation::Nothing,
));

assert!(WaitlistOf::<Test>::contains(
&waiter_prog_id,
&lock_owner_msg_id
));
assert!(WaitlistOf::<Test>::contains(
&waiter_prog_id,
&lock_rival_1_msg_id
));
assert!(WaitlistOf::<Test>::contains(
&waiter_prog_id,
&lock_rival_2_msg_id
));

// Run for 1 block, so the lock owner wakes up after sleeping for 4 blocks,
// releases the mutex so the lock rival 1 can acquire and release it for
// the lock rival 2 to acquire it.
run_for_blocks(1, None);

assert_succeed(lock_owner_msg_id);
assert_succeed(lock_rival_1_msg_id);
assert_succeed(lock_rival_2_msg_id);
}
};

init_logger();
new_test_ext().execute_with(execution);
}

#[test]
fn async_sleep_for() {
use demo_waiter::{
Expand Down

0 comments on commit 7625bbd

Please sign in to comment.