From 7625bbd699e7b6ff3f3cdc516c4aee6b11909901 Mon Sep 17 00:00:00 2001 From: Dennis Diatlov Date: Thu, 6 Jul 2023 10:09:00 +0100 Subject: [PATCH] fix(gstd): Fix deadlocks in mutex/rw-lock (#2906) --- examples/binaries/waiter/src/code.rs | 53 +++++-- examples/binaries/waiter/src/lib.rs | 20 +++ gstd/src/lock/access.rs | 6 + gstd/src/lock/mutex.rs | 11 +- gstd/src/lock/rwlock.rs | 20 ++- pallets/gear/src/tests.rs | 197 +++++++++++++++++++++++++++ 6 files changed, 293 insertions(+), 14 deletions(-) diff --git a/examples/binaries/waiter/src/code.rs b/examples/binaries/waiter/src/code.rs index 5b6b384cf51..7ad9ee6b3d6 100644 --- a/examples/binaries/waiter/src/code.rs +++ b/examples/binaries/waiter/src/code.rs @@ -1,15 +1,12 @@ -use crate::{Command, SleepForWaitType, WaitSubcommand}; +use crate::{ + Command, MxLockContinuation, RwLockContinuation, RwLockType, SleepForWaitType, WaitSubcommand, +}; use futures::future; -use gstd::{errors::Error, exec, format, msg, MessageId}; +use gstd::{errors::Error, exec, format, lock, msg, MessageId}; -fn process_wait_subcommand(subcommand: WaitSubcommand) { - match subcommand { - WaitSubcommand::Wait => exec::wait(), - WaitSubcommand::WaitFor(duration) => exec::wait_for(duration), - WaitSubcommand::WaitUpTo(duration) => exec::wait_up_to(duration), - } -} +static mut MUTEX: lock::Mutex<()> = lock::Mutex::new(()); +static mut RW_LOCK: lock::RwLock<()> = lock::RwLock::new(()); #[gstd::async_main] async fn main() { @@ -79,5 +76,43 @@ async fn main() { Command::WakeUp(msg_id) => { exec::wake(msg_id.into()).expect("Failed to wake up the message"); } + Command::MxLock(continuation) => { + let _lock_guard = unsafe { MUTEX.lock().await }; + process_mx_lock_continuation(continuation).await; + } + Command::RwLock(lock_type, continuation) => { + match lock_type { + RwLockType::Read => { + let _lock_guard = unsafe { RW_LOCK.read().await }; + process_rw_lock_continuation(continuation).await; + } + RwLockType::Write => { + let _lock_guard = unsafe { RW_LOCK.write().await }; + process_rw_lock_continuation(continuation).await; + } + }; + } + } +} + +fn process_wait_subcommand(subcommand: WaitSubcommand) { + match subcommand { + WaitSubcommand::Wait => exec::wait(), + WaitSubcommand::WaitFor(duration) => exec::wait_for(duration), + WaitSubcommand::WaitUpTo(duration) => exec::wait_up_to(duration), + } +} + +async fn process_mx_lock_continuation(continuation: MxLockContinuation) { + match continuation { + MxLockContinuation::Nothing => {} + MxLockContinuation::SleepFor(duration) => exec::sleep_for(duration).await, + } +} + +async fn process_rw_lock_continuation(continuation: RwLockContinuation) { + match continuation { + RwLockContinuation::Nothing => {} + RwLockContinuation::SleepFor(duration) => exec::sleep_for(duration).await, } } diff --git a/examples/binaries/waiter/src/lib.rs b/examples/binaries/waiter/src/lib.rs index 87d5a0ad596..f895c3d22f7 100644 --- a/examples/binaries/waiter/src/lib.rs +++ b/examples/binaries/waiter/src/lib.rs @@ -55,6 +55,24 @@ pub enum SleepForWaitType { Any, } +#[derive(Debug, Encode, Decode)] +pub enum MxLockContinuation { + Nothing, + SleepFor(u32), +} + +#[derive(Debug, Encode, Decode)] +pub enum RwLockType { + Read, + Write, +} + +#[derive(Debug, Encode, Decode)] +pub enum RwLockContinuation { + Nothing, + SleepFor(u32), +} + #[derive(Debug, Encode, Decode)] pub enum Command { Wait(WaitSubcommand), @@ -65,4 +83,6 @@ pub enum Command { ReplyAndWait(WaitSubcommand), SleepFor(Vec, SleepForWaitType), WakeUp([u8; 32]), + MxLock(MxLockContinuation), + RwLock(RwLockType, RwLockContinuation), } diff --git a/gstd/src/lock/access.rs b/gstd/src/lock/access.rs index b6a7f4d4efc..5454a40b945 100644 --- a/gstd/src/lock/access.rs +++ b/gstd/src/lock/access.rs @@ -38,6 +38,12 @@ impl AccessQueue { inner.as_mut().and_then(|v| v.pop_front()) } + pub fn contains(&self, message_id: &MessageId) -> bool { + let inner = unsafe { &*self.0.get() }; + + inner.as_ref().map_or(false, |v| v.contains(message_id)) + } + pub const fn new() -> Self { AccessQueue(UnsafeCell::new(None)) } diff --git a/gstd/src/lock/mutex.rs b/gstd/src/lock/mutex.rs index 57ac8adda0c..e36c85ff22e 100644 --- a/gstd/src/lock/mutex.rs +++ b/gstd/src/lock/mutex.rs @@ -193,12 +193,19 @@ impl<'a, T> Future for MutexLockFuture<'a, T> { // In case of locked mutex and an `.await`, function `poll` checks if the // mutex can be taken, else it waits (goes into *waiting queue*). fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + let current_msg_id = crate::msg::id(); let lock = unsafe { &mut *self.mutex.locked.get() }; if lock.is_none() { - *lock = Some(crate::msg::id()); + *lock = Some(current_msg_id); Poll::Ready(MutexGuard { mutex: self.mutex }) } else { - self.mutex.queue.enqueue(crate::msg::id()); + // If the message is already in the access queue, and we come here, + // it means the message has just been woken up from the waitlist. + // In that case we do not want to register yet another access attempt + // and just go back to the waitlist. + if !self.mutex.queue.contains(¤t_msg_id) { + self.mutex.queue.enqueue(crate::msg::id()); + } Poll::Pending } } diff --git a/gstd/src/lock/rwlock.rs b/gstd/src/lock/rwlock.rs index fece9c693c9..c7e17a5f946 100644 --- a/gstd/src/lock/rwlock.rs +++ b/gstd/src/lock/rwlock.rs @@ -309,12 +309,19 @@ impl<'a, T> Future for RwLockReadFuture<'a, T> { let readers = &self.lock.readers; let readers_count = readers.get().saturating_add(1); + let current_msg_id = crate::msg::id(); let lock = unsafe { &mut *self.lock.locked.get() }; if lock.is_none() && readers_count <= READERS_LIMIT { readers.replace(readers_count); Poll::Ready(RwLockReadGuard { lock: self.lock }) } else { - self.lock.queue.enqueue(crate::msg::id()); + // If the message is already in the access queue, and we come here, + // it means the message has just been woken up from the waitlist. + // In that case we do not want to register yet another access attempt + // and just go back to the waitlist. + if !self.lock.queue.contains(¤t_msg_id) { + self.lock.queue.enqueue(current_msg_id); + } Poll::Pending } } @@ -324,12 +331,19 @@ impl<'a, T> Future for RwLockWriteFuture<'a, T> { type Output = RwLockWriteGuard<'a, T>; fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + let current_msg_id = crate::msg::id(); let lock = unsafe { &mut *self.lock.locked.get() }; if lock.is_none() && self.lock.readers.get() == 0 { - *lock = Some(crate::msg::id()); + *lock = Some(current_msg_id); Poll::Ready(RwLockWriteGuard { lock: self.lock }) } else { - self.lock.queue.enqueue(crate::msg::id()); + // If the message is already in the access queue, and we come here, + // it means the message has just been woken up from the waitlist. + // In that case we do not want to register yet another access attempt + // and just go back to the waitlist. + if !self.lock.queue.contains(¤t_msg_id) { + self.lock.queue.enqueue(current_msg_id); + } Poll::Pending } } diff --git a/pallets/gear/src/tests.rs b/pallets/gear/src/tests.rs index e5da71fd66a..5b2a1676d9d 100644 --- a/pallets/gear/src/tests.rs +++ b/pallets/gear/src/tests.rs @@ -8555,6 +8555,203 @@ fn call_forbidden_function() { }); } +#[test] +fn waking_message_waiting_for_mx_lock_does_not_lead_to_deadlock() { + use demo_waiter::{Command as WaiterCommand, MxLockContinuation, WASM_BINARY as WAITER_WASM}; + + let execution = || { + System::reset_events(); + + Gear::upload_program( + RuntimeOrigin::signed(USER_1), + WAITER_WASM.to_vec(), + DEFAULT_SALT.to_vec(), + EMPTY_PAYLOAD.to_vec(), + BlockGasLimitOf::::get(), + 0, + ) + .expect("Failed to upload Waiter"); + let waiter_prog_id = get_last_program_id(); + run_to_next_block(None); + + let send_command_to_waiter = |command: WaiterCommand| { + MailboxOf::::clear(); + Gear::send_message( + RuntimeOrigin::signed(USER_1), + waiter_prog_id, + command.encode(), + BlockGasLimitOf::::get(), + 0, + ) + .unwrap_or_else(|_| panic!("Failed to send command {:?} to Waiter", command)); + let msg_id = get_last_message_id(); + let msg_block_number = System::block_number() + 1; + run_to_next_block(None); + (msg_id, msg_block_number) + }; + + let (lock_owner_msg_id, _lock_owner_msg_block_number) = + send_command_to_waiter(WaiterCommand::MxLock(MxLockContinuation::SleepFor(4))); + + let (lock_rival_1_msg_id, _) = + send_command_to_waiter(WaiterCommand::MxLock(MxLockContinuation::Nothing)); + + send_command_to_waiter(WaiterCommand::WakeUp(lock_rival_1_msg_id.into())); + + let (lock_rival_2_msg_id, _) = + send_command_to_waiter(WaiterCommand::MxLock(MxLockContinuation::Nothing)); + + assert!(WaitlistOf::::contains( + &waiter_prog_id, + &lock_owner_msg_id + )); + assert!(WaitlistOf::::contains( + &waiter_prog_id, + &lock_rival_1_msg_id + )); + assert!(WaitlistOf::::contains( + &waiter_prog_id, + &lock_rival_2_msg_id + )); + + // Run for 1 block, so the lock owner wakes up after sleeping for 4 blocks, + // releases the mutex so the lock rival 1 can acquire and release it for + // the lock rival 2 to acquire it. + run_for_blocks(1, None); + + assert_succeed(lock_owner_msg_id); + assert_succeed(lock_rival_1_msg_id); + assert_succeed(lock_rival_2_msg_id); + }; + + init_logger(); + new_test_ext().execute_with(execution); +} + +#[test] +fn waking_message_waiting_for_rw_lock_does_not_lead_to_deadlock() { + use demo_waiter::{ + Command as WaiterCommand, RwLockContinuation, RwLockType, WASM_BINARY as WAITER_WASM, + }; + + let execution = || { + System::reset_events(); + + Gear::upload_program( + RuntimeOrigin::signed(USER_1), + WAITER_WASM.to_vec(), + DEFAULT_SALT.to_vec(), + EMPTY_PAYLOAD.to_vec(), + BlockGasLimitOf::::get(), + 0, + ) + .expect("Failed to upload Waiter"); + let waiter_prog_id = get_last_program_id(); + run_to_next_block(None); + + let send_command_to_waiter = |command: WaiterCommand| { + MailboxOf::::clear(); + Gear::send_message( + RuntimeOrigin::signed(USER_1), + waiter_prog_id, + command.encode(), + BlockGasLimitOf::::get(), + 0, + ) + .unwrap_or_else(|_| panic!("Failed to send command {:?} to Waiter", command)); + let msg_id = get_last_message_id(); + let msg_block_number = System::block_number() + 1; + run_to_next_block(None); + (msg_id, msg_block_number) + }; + + // For write lock + { + let (lock_owner_msg_id, _lock_owner_msg_block_number) = send_command_to_waiter( + WaiterCommand::RwLock(RwLockType::Read, RwLockContinuation::SleepFor(4)), + ); + + let (lock_rival_1_msg_id, _) = send_command_to_waiter(WaiterCommand::RwLock( + RwLockType::Write, + RwLockContinuation::Nothing, + )); + + send_command_to_waiter(WaiterCommand::WakeUp(lock_rival_1_msg_id.into())); + + let (lock_rival_2_msg_id, _) = send_command_to_waiter(WaiterCommand::RwLock( + RwLockType::Write, + RwLockContinuation::Nothing, + )); + + assert!(WaitlistOf::::contains( + &waiter_prog_id, + &lock_owner_msg_id + )); + assert!(WaitlistOf::::contains( + &waiter_prog_id, + &lock_rival_1_msg_id + )); + assert!(WaitlistOf::::contains( + &waiter_prog_id, + &lock_rival_2_msg_id + )); + + // Run for 1 block, so the lock owner wakes up after sleeping for 4 blocks, + // releases the mutex so the lock rival 1 can acquire and release it for + // the lock rival 2 to acquire it. + run_for_blocks(1, None); + + assert_succeed(lock_owner_msg_id); + assert_succeed(lock_rival_1_msg_id); + assert_succeed(lock_rival_2_msg_id); + } + + // For read lock + { + let (lock_owner_msg_id, _lock_owner_msg_block_number) = send_command_to_waiter( + WaiterCommand::RwLock(RwLockType::Write, RwLockContinuation::SleepFor(4)), + ); + + let (lock_rival_1_msg_id, _) = send_command_to_waiter(WaiterCommand::RwLock( + RwLockType::Read, + RwLockContinuation::Nothing, + )); + + send_command_to_waiter(WaiterCommand::WakeUp(lock_rival_1_msg_id.into())); + + let (lock_rival_2_msg_id, _) = send_command_to_waiter(WaiterCommand::RwLock( + RwLockType::Write, + RwLockContinuation::Nothing, + )); + + assert!(WaitlistOf::::contains( + &waiter_prog_id, + &lock_owner_msg_id + )); + assert!(WaitlistOf::::contains( + &waiter_prog_id, + &lock_rival_1_msg_id + )); + assert!(WaitlistOf::::contains( + &waiter_prog_id, + &lock_rival_2_msg_id + )); + + // Run for 1 block, so the lock owner wakes up after sleeping for 4 blocks, + // releases the mutex so the lock rival 1 can acquire and release it for + // the lock rival 2 to acquire it. + run_for_blocks(1, None); + + assert_succeed(lock_owner_msg_id); + assert_succeed(lock_rival_1_msg_id); + assert_succeed(lock_rival_2_msg_id); + } + }; + + init_logger(); + new_test_ext().execute_with(execution); +} + #[test] fn async_sleep_for() { use demo_waiter::{