思路:
- 重新定义RwLock
- 定义 extra结构 记录每个锁请求的信息
- 每个 RwLock 启动一个线程来跟踪锁状态
- 当锁持有时间超过 DEADLOCK_TIMEOUT 记录死锁信息
// E001900 - E001999
use crate::utils;
use anyhow::{anyhow, Result};
use log::error;
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, RwLock as StdRwLock};
pub use std::sync::{RwLockReadGuard, RwLockWriteGuard};
use std::thread;
use std::time::Duration;
static DEADLOCK_TIMEOUT: Duration = Duration::from_secs(1); // 1000毫秒 = 1秒
pub struct RwLock<T: ?Sized + Send + Sync> {
extra: Arc<StdRwLock<HashMap<String, Duration>>>,
rwlock: Arc<StdRwLock<T>>,
}
impl<T: ?Sized + Send + Sync + 'static> RwLock<T> {
pub fn new(data: T) -> Self
where
T: Sized + Send + Sync + 'static,
{
let extra = Arc::new(StdRwLock::new(HashMap::new()));
let rwlock = Arc::new(StdRwLock::new(data));
let instance = Self {
extra: extra.clone(),
rwlock: rwlock.clone(),
};
thread::spawn(move || Self::deadlock_detector(extra));
instance
}
pub fn read(&self, uuid: &str) -> Result<RwLockReadGuardWrapper<'_, T>> {
let read = self.rwlock.read().map_err(|e| anyhow!("\nE001900: {e}"))?;
{
let mut extra_write = self.extra.write().map_err(|e| anyhow!("\nE001908: {e}"))?;
extra_write.insert(uuid.to_string(), utils::get_current_timestamp());
}
Ok(RwLockReadGuardWrapper {
uuid: uuid.to_string(),
extra: self.extra.clone(),
guard: Some(read),
})
}
pub fn write(&self, uuid: &str) -> Result<RwLockWriteGuardWrapper<'_, T>> {
let write = self.rwlock.write().map_err(|e| anyhow!("\nE001901: {e}"))?;
{
let mut extra_write = self.extra.write().map_err(|e| anyhow!("\nE001902: {e}"))?;
extra_write.insert(uuid.to_string(), utils::get_current_timestamp());
}
Ok(RwLockWriteGuardWrapper {
uuid: uuid.to_string(),
extra: self.extra.clone(),
guard: Some(write),
})
}
fn deadlock_detector(extra: Arc<StdRwLock<HashMap<String, Duration>>>) {
// 实现死锁检测逻辑
loop {
std::thread::sleep(Duration::from_millis(1));
let extra = match extra.try_read() {
Ok(v) => v,
Err(e) => {
error!("\nE001904: special: extra.try_read() failed: {e}");
continue;
}
};
let mut found_deadlock = false;
let now = utils::get_current_timestamp();
let mut vec: Vec<_> = extra.iter().collect();
vec.sort_by_key(|&(_, dura)| dura);
for (uuid, dura) in vec {
if (now - *dura) > DEADLOCK_TIMEOUT {
found_deadlock = true;
error!(
"\nE001905: 发现 min_uuid: {uuid} 锁超时: {} 毫秒",
dura.as_millis(),
);
}
}
if found_deadlock {
thread::sleep(Duration::from_secs(10));
}
}
}
}
pub struct RwLockReadGuardWrapper<'a, T: ?Sized + 'a> {
uuid: String,
extra: Arc<StdRwLock<HashMap<String, Duration>>>,
guard: Option<RwLockReadGuard<'a, T>>,
}
impl<T: ?Sized> Deref for RwLockReadGuardWrapper<'_, T> {
type Target = T;
fn deref(&self) -> &T {
self.guard.as_deref().unwrap()
}
}
impl<T: ?Sized> Drop for RwLockReadGuardWrapper<'_, T> {
fn drop(&mut self) {
if let Some(guard) = self.guard.take() {
match self.extra.write() {
Ok(mut v) => {
let _ = v.remove(&self.uuid);
}
Err(e) => {
error!("\nE001907: RwLockReadGuardWrapper: 获取 extra 读锁出错: {e:?}")
}
}
drop(guard);
}
}
}
pub struct RwLockWriteGuardWrapper<'a, T: ?Sized + 'a> {
uuid: String,
extra: Arc<StdRwLock<HashMap<String, Duration>>>,
guard: Option<RwLockWriteGuard<'a, T>>,
}
impl<T: ?Sized> Deref for RwLockWriteGuardWrapper<'_, T> {
type Target = T;
fn deref(&self) -> &T {
self.guard.as_deref().unwrap()
}
}
impl<T: ?Sized> DerefMut for RwLockWriteGuardWrapper<'_, T> {
fn deref_mut(&mut self) -> &mut T {
self.guard.as_deref_mut().unwrap()
}
}
impl<T: ?Sized> Drop for RwLockWriteGuardWrapper<'_, T> {
fn drop(&mut self) {
if let Some(guard) = self.guard.take() {
match self.extra.write() {
Ok(mut v) => {
let _ = v.remove(&self.uuid);
}
Err(e) => {
error!("\nE001906: RwLockWriteGuardWrapper: 获取 extra 写锁出错: {e:?}")
}
}
drop(guard);
}
}
}