Skip to content

Latest commit

 

History

History
429 lines (369 loc) · 15.4 KB

05_处理两个内存泄漏场景.md

File metadata and controls

429 lines (369 loc) · 15.4 KB

在项目中使用了 <env_logger> 和 <自定义lock> 后, 发现了两处的内存问题.

第一处是 <env_logger> 导致的, 当执行 env_logger.init() 之后就会报 问题.

zt@zthome:~/Documents/untitled$ cat src/main.rs 
use std::env;

fn main() {
    env::set_var("RUST_LOG", "info");
    env_logger::init();
}

zt@zthome:~/Documents/untitled$ valgrind --leak-check=full --show-leak-kinds=all ./target/debug/untitled 
==21470== Memcheck, a memory error detector
==21470== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==21470== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==21470== Command: ./target/debug/untitled
==21470== 
==21470== 
==21470== HEAP SUMMARY:
==21470==     in use at exit: 352 bytes in 3 blocks
==21470==   total heap usage: 17 allocs, 14 frees, 2,946 bytes allocated
==21470== 
==21470== 40 bytes in 1 blocks are still reachable in loss record 1 of 3
==21470==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==21470==    by 0x15BF0A: alloc::alloc::alloc (alloc.rs:100)
==21470==    by 0x15C016: alloc::alloc::Global::alloc_impl (alloc.rs:183)
==21470==    by 0x15BE47: allocate (alloc.rs:243)
==21470==    by 0x15BE47: alloc::alloc::exchange_malloc (alloc.rs:332)
==21470==    by 0x1512E9: new<env_logger::fmt::{impl#5}::build::{closure_env#0}> (boxed.rs:218)
==21470==    by 0x1512E9: env_logger::fmt::Builder::build (mod.rs:237)
==21470==    by 0x14F4BF: env_logger::logger::Builder::build (logger.rs:493)
==21470==    by 0x14F264: env_logger::logger::Builder::try_init (logger.rs:456)
==21470==    by 0x14FF2A: env_logger::logger::try_init_from_env (logger.rs:896)
==21470==    by 0x14FEB9: env_logger::logger::try_init (logger.rs:846)
==21470==    by 0x14FED6: env_logger::logger::init (logger.rs:859)
==21470==    by 0x147373: untitled::main (main.rs:5)
==21470==    by 0x1472FA: core::ops::function::FnOnce::call_once (function.rs:250)
==21470== 
==21470== 128 bytes in 1 blocks are still reachable in loss record 2 of 3
==21470==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==21470==    by 0x3DA8BA: alloc::alloc::alloc (alloc.rs:100)
==21470==    by 0x3DA9C6: alloc::alloc::Global::alloc_impl (alloc.rs:183)
==21470==    by 0x3DBD58: <alloc::alloc::Global as core::alloc::Allocator>::allocate (alloc.rs:243)
==21470==    by 0x38EF5B: alloc::raw_vec::finish_grow (raw_vec.rs:573)
==21470==    by 0x14918B: alloc::raw_vec::RawVec<T,A>::grow_amortized (raw_vec.rs:485)
==21470==    by 0x149496: alloc::raw_vec::RawVec<T,A>::grow_one (raw_vec.rs:364)
==21470==    by 0x16114E: alloc::vec::Vec<T,A>::push (mod.rs:1999)
==21470==    by 0x16203E: env_logger::filter::Builder::insert_directive (mod.rs:133)
==21470==    by 0x162334: env_logger::filter::Builder::parse (mod.rs:170)
==21470==    by 0x14F1F1: env_logger::logger::Builder::parse_filters (logger.rs:380)
==21470==    by 0x14F033: env_logger::logger::Builder::parse_env (logger.rs:158)
==21470== 
==21470== 184 bytes in 1 blocks are still reachable in loss record 3 of 3
==21470==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==21470==    by 0x15BF0A: alloc::alloc::alloc (alloc.rs:100)
==21470==    by 0x15C016: alloc::alloc::Global::alloc_impl (alloc.rs:183)
==21470==    by 0x15BE47: allocate (alloc.rs:243)
==21470==    by 0x15BE47: alloc::alloc::exchange_malloc (alloc.rs:332)
==21470==    by 0x14F2FA: new<env_logger::logger::Logger> (boxed.rs:218)
==21470==    by 0x14F2FA: env_logger::logger::Builder::try_init (logger.rs:459)
==21470==    by 0x14FF2A: env_logger::logger::try_init_from_env (logger.rs:896)
==21470==    by 0x14FEB9: env_logger::logger::try_init (logger.rs:846)
==21470==    by 0x14FED6: env_logger::logger::init (logger.rs:859)
==21470==    by 0x147373: untitled::main (main.rs:5)
==21470==    by 0x1472FA: core::ops::function::FnOnce::call_once (function.rs:250)
==21470==    by 0x1472AD: std::sys_common::backtrace::__rust_begin_short_backtrace (backtrace.rs:155)
==21470==    by 0x147400: std::rt::lang_start::{{closure}} (rt.rs:159)
==21470== 
==21470== LEAK SUMMARY:
==21470==    definitely lost: 0 bytes in 0 blocks
==21470==    indirectly lost: 0 bytes in 0 blocks
==21470==      possibly lost: 0 bytes in 0 blocks
==21470==    still reachable: 352 bytes in 3 blocks
==21470==         suppressed: 0 bytes in 0 blocks
==21470== 
==21470== For lists of detected and suppressed errors, rerun with: -s
==21470== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

第二处是 <自定义锁> 导致的, 因为代码中拉起了一个线程跑 <deadlock_detect>, 它自己的drop trait实现中, 仅通知它退出但是没有join等待线程结束, 所以导致出现 问题.

解决办法:

  1. 禁用 <env_logger> 这个库, 仅使用 库, 通过自行封装打到一样的日志格式输出效果.
use std::{
    borrow::Cow,
    env,
    path::{Path, PathBuf},
    str::FromStr,
    sync::Arc,
};
mod lock;
use lazy_static::lazy_static;
use log::{info, LevelFilter, Metadata, Record};

pub static PROJECT_NAME: &str = env!("CARGO_PKG_NAME");

lazy_static! {
    static ref RUST_LOG: LevelFilter = {
        let level_str = env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string());
        LevelFilter::from_str(&level_str).unwrap_or(LevelFilter::Info)
    };
}

fn get_path_startswith_project<'a>(file_path_str: &str) -> Cow<str> {
    let word = "src";
    let file_path = Path::new(file_path_str);
    let components: Vec<_> = file_path.components().collect();

    // 等同于 startswith("src")
    if file_path.exists() {
        if components.len() > 0 && components[0].as_os_str() == word {
            let project_path = Path::new(PROJECT_NAME);
            let result = project_path.join(file_path).to_string_lossy().into_owned();
            return Cow::Owned(result);
        }
    }

    // 当 word 统计次数大于限定次数, 直接返回原始数据
    if components.iter().filter(|x| x.as_os_str() == word).count() > 2 {
        return Cow::Borrowed(file_path_str);
    }

    // 将路径组件转换为 Vec 并反转
    let mut reversed_components = components.iter().rev();

    // 找到 "src" 的上一级目录的位置
    if let Some(src_index) = reversed_components.position(|&part| part.as_os_str() == word) {
        let components_len = components.len();
        if components_len - src_index < 2 {
            let project_subpath: PathBuf = components.iter().collect();
            let result = project_subpath.to_string_lossy().to_string();
            return Cow::Owned(result);
        }

        let start_index = (components_len - src_index) - 2;
        let project_subpath: PathBuf = components[start_index..].iter().collect();
        let result = project_subpath.to_string_lossy().to_string();
        return Cow::Owned(result);
    }

    Cow::Borrowed(file_path_str)
}

struct SimpleLogger;

impl log::Log for SimpleLogger {
    fn enabled(&self, metadata: &Metadata) -> bool {
        metadata.level() <= *RUST_LOG
    }

    fn log(&self, record: &Record) {
        if self.enabled(record.metadata()) {
            println!(
                "[{} {}:L{} {}] {}",
                chrono::Local::now().format("%Y-%m-%d %H:%M:%S.%6f"),
                get_path_startswith_project(record.file().unwrap_or("unknown")), // 切割路径: 从项目名开始到具体的文件名
                record.line().unwrap_or(0),
                record.level(),
                record.args()
            );
        }
    }

    fn flush(&self) {}
}

static LOGGER: SimpleLogger = SimpleLogger {};

fn logger_init() {
    log::set_logger(&LOGGER).unwrap();
    log::set_max_level(*RUST_LOG);
}

fn main() {
    logger_init();
}
  1. 修复 <自定义锁> 没有等待线程结束问题
// E00400 - E00499
use anyhow::{anyhow as aw, Result};
use log::error;
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock};
use std::thread::{self, JoinHandle};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

pub fn get_timestamp() -> Duration {
    SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
}

#[allow(dead_code)]
static DEADLOCK_TIMEOUT: u128 = 50_000; // 50微妙

struct LockExtra {
    hold: u128,
    wait: u128,
}

pub struct RwLock<T: ?Sized> {
    rwlock: Arc<StdRwLock<T>>,
    extra_map: Arc<StdRwLock<HashMap<String, LockExtra>>>,
    running: Arc<AtomicBool>, // 增加属性
    handle: Option<JoinHandle<()>>, // 增加属性
}

unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
unsafe impl<T: ?Sized + Send> Sync for RwLock<T> {}

impl<T: ?Sized> Drop for RwLock<T> {
    fn drop(&mut self) {
        self.running.store(false, Ordering::SeqCst);

        if let Some(v) = self.handle.take() {
            if let Err(e) = v.join() {
                error!("\nE00400: {e:?}");
            }
        }
    }
}

impl<T: ?Sized> RwLock<T> {
    pub fn new(data: T) -> Self
    where
        T: Sized + Send + Sync + 'static,
    {
        let extra = Arc::new(StdRwLock::new(HashMap::new()));
        let running = Arc::new(AtomicBool::new(true));
        let rwlock = Arc::new(StdRwLock::new(data));

        let extra_clone = extra.clone();
        let running_clone = running.clone();
        let handle = thread::spawn(move || Self::deadlock_detector(extra_clone, running_clone));

        let instance = Self {
            rwlock: rwlock.clone(),
            extra_map: extra.clone(),
            handle: Some(handle),
            running,
        };

        instance
    }

    pub fn read(&self, uuid: &str) -> Result<RwLockReadGuard<'_, T>> {
        let start = std::time::Instant::now();
        let read = self.rwlock.read().map_err(|e| aw!("\nR00401: {e}"))?;
        let wait = start.elapsed().as_nanos();
        let hold = get_timestamp().as_nanos();

        {
            let mut extra_write = self.extra_map.write().map_err(|e| aw!("\nW00402: {e}"))?;
            extra_write.insert(uuid.to_string(), LockExtra { hold, wait });
        }

        Ok(RwLockReadGuard {
            uuid: uuid.to_string(),
            extra_map: self.extra_map.clone(),
            guard: Some(read),
        })
    }

    pub fn write(&self, uuid: &str) -> Result<RwLockWriteGuard<'_, T>> {
        let start = std::time::Instant::now();
        let write = self.rwlock.write().map_err(|e| aw!("\nW00403: {e}"))?;
        let wait = start.elapsed().as_nanos();
        let hold = get_timestamp().as_nanos();

        {
            let mut extra_write = self.extra_map.write().map_err(|e| aw!("\nE00404: {e}"))?;
            extra_write.insert(uuid.to_string(), LockExtra { hold, wait });
        }

        Ok(RwLockWriteGuard {
            uuid: uuid.to_string(),
            extra_map: self.extra_map.clone(),
            guard: Some(write),
        })
    }

    fn deadlock_detector(
        extra_map: Arc<StdRwLock<HashMap<String, LockExtra>>>,
        running: Arc<AtomicBool>,
    ) {
        let mut log_freq: HashMap<String, u64> = HashMap::new(); // 抑制锁超时刷日志的频率, 相同uuid 30秒打印一次
        loop {
            if !running.load(Ordering::SeqCst) {
                break;
            }

            std::thread::sleep(Duration::from_micros(10));
            let read = match extra_map.try_read() {
                Ok(v) => v,
                Err(e) => {
                    error!("\nR00405: special: extra.try_read() failed: {e}");
                    continue;
                }
            };

            let now = get_timestamp();
            let mut vec: Vec<_> = read.iter().collect();
            vec.sort_by_key(|&(_, extra)| extra.hold);
            for (uuid, extra) in vec {
                let hold = now.as_nanos() - extra.hold;
                let wait = extra.wait;
                let errmsg = format!("\nE00406: {uuid} 锁超时; wait: {wait}; hold: {hold}; 纳秒");
                // if hold > DEADLOCK_TIMEOUT {
                match log_freq.get(uuid) {
                    Some(v) => {
                        if (now.as_secs() - *v) > 30 {
                            log_freq.insert(uuid.clone(), now.as_secs());
                            error!("{errmsg}");
                        }
                    }
                    None => {
                        log_freq.insert(uuid.clone(), now.as_secs());
                        error!("{errmsg}");
                    }
                }
                // }
            }
        }
    }
}

pub struct RwLockReadGuard<'a, T: ?Sized + 'a> {
    uuid: String,
    extra_map: Arc<StdRwLock<HashMap<String, LockExtra>>>,
    guard: Option<std::sync::RwLockReadGuard<'a, T>>,
}

unsafe impl<'a, T: ?Sized + Send + 'a> Send for RwLockReadGuard<'a, T> {}
unsafe impl<'a, T: ?Sized + Send + 'a> Sync for RwLockReadGuard<'a, T> {}

impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> {
    type Target = T;

    fn deref(&self) -> &T {
        self.guard.as_deref().unwrap()
    }
}

impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
    fn drop(&mut self) {
        if let Some(guard) = self.guard.take() {
            match self.extra_map.write() {
                Ok(mut v) => {
                    let _ = v.remove(&self.uuid);
                }
                Err(e) => {
                    error!("\nW00407: RwLockReadGuard: 获取 extra 写锁出错: {e:?}")
                }
            }
            drop(guard);
        }
    }
}

pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> {
    uuid: String,
    extra_map: Arc<StdRwLock<HashMap<String, LockExtra>>>,
    guard: Option<std::sync::RwLockWriteGuard<'a, T>>,
}

unsafe impl<'a, T: ?Sized + Send + 'a> Send for RwLockWriteGuard<'a, T> {}
unsafe impl<'a, T: ?Sized + Send + 'a> Sync for RwLockWriteGuard<'a, T> {}

impl<T: ?Sized> Deref for RwLockWriteGuard<'_, T> {
    type Target = T;

    fn deref(&self) -> &T {
        self.guard.as_deref().unwrap()
    }
}

impl<T: ?Sized> DerefMut for RwLockWriteGuard<'_, T> {
    fn deref_mut(&mut self) -> &mut T {
        self.guard.as_deref_mut().unwrap()
    }
}

impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {
    fn drop(&mut self) {
        if let Some(guard) = self.guard.take() {
            match self.extra_map.write() {
                Ok(mut v) => {
                    let _ = v.remove(&self.uuid);
                }
                Err(e) => {
                    error!("\nW00408: RwLockWriteGuard: 获取 extra 写锁出错: {e:?}")
                }
            }
            drop(guard);
        }
    }
}

修复后valgrind的检测结果是: no leaks are possible

zt@zthome:~/Documents/untitled$ valgrind --leak-check=full --show-leak-kinds=all ./target/debug/untitled 
==19347== Memcheck, a memory error detector
==19347== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==19347== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==19347== Command: ./target/debug/untitled
==19347== 
[2024-07-18 01:22:15.802788 untitled/src/main.rs:L97 INFO] good:1
good:2
==19347== 
==19347== HEAP SUMMARY:
==19347==     in use at exit: 0 bytes in 0 blocks
==19347==   total heap usage: 39 allocs, 39 frees, 6,425 bytes allocated
==19347== 
==19347== All heap blocks were freed -- no leaks are possible
==19347== 
==19347== For lists of detected and suppressed errors, rerun with: -s
==19347== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)