From 73d0a5770be72f034e3617081c32c9843b1fe985 Mon Sep 17 00:00:00 2001 From: JIN Jie Date: Thu, 19 Dec 2024 17:27:08 +0800 Subject: [PATCH] fix: Bad use of lock in socket synchronizer blocks agent exit --- .../platform/platform_synchronizer/linux.rs | 223 ++++++++---------- agent/src/platform/synchronizer.rs | 10 +- agent/src/trident.rs | 2 +- 3 files changed, 100 insertions(+), 135 deletions(-) diff --git a/agent/src/platform/platform_synchronizer/linux.rs b/agent/src/platform/platform_synchronizer/linux.rs index dea2d03f16c..3997ac083ee 100644 --- a/agent/src/platform/platform_synchronizer/linux.rs +++ b/agent/src/platform/platform_synchronizer/linux.rs @@ -16,13 +16,13 @@ use std::{ net::{IpAddr, SocketAddr, SocketAddrV4}, - sync::{Arc, Condvar, Mutex, MutexGuard, RwLock as SysRwLock}, + sync::{Arc, Condvar, Mutex, RwLock as SysRwLock}, thread, time::Duration, }; use arc_swap::access::Access; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; use parking_lot::RwLock; use tokio::runtime::Runtime; @@ -191,122 +191,117 @@ impl SocketSynchronizer { let mut last_entries: Vec = vec![]; loop { - let running_guard = running.lock().unwrap(); - let sync_interval; - - { - let conf_guard = config.load(); - sync_interval = Duration::from_secs( - conf_guard.os_proc_scan_conf.os_proc_socket_sync_interval as u64, - ); + let conf_guard = config.load(); + let sync_interval = Duration::from_secs( + conf_guard.os_proc_scan_conf.os_proc_socket_sync_interval as u64, + ); + + if sync_interval == Duration::ZERO { + thread::sleep(Duration::from_secs(1)); + continue; + } - if sync_interval == Duration::ZERO { - thread::sleep(Duration::from_secs(1)); - continue; + // wait for config from server + if !conf_guard.os_proc_scan_conf.os_proc_sync_enabled { + if !Self::wait_timeout(&running, &stop_notify, sync_interval) { + return; } + continue; + } - // wait for config from server - if !conf_guard.os_proc_scan_conf.os_proc_sync_enabled { - if !Self::wait_timeout(running_guard, stop_notify.clone(), sync_interval) { + let (ctrl_ip, ctrl_mac, team_id) = { + let id = agent_id.read(); + (id.ip.to_string(), id.mac.to_string(), id.team_id.clone()) + }; + let mut policy_getter = policy_getter.lock().unwrap(); + let pids = get_socket_pids(); + let sock_entries = match get_all_socket( + &conf_guard.os_proc_scan_conf, + &mut policy_getter, + conf_guard.epc_id, + pids, + ) { + Err(e) => { + error!("fetch socket info fail: {}", e); + if !Self::wait_timeout(&running, &stop_notify, sync_interval) { return; } continue; } - - let (ctrl_ip, ctrl_mac, team_id) = { - let id = agent_id.read(); - (id.ip.to_string(), id.mac.to_string(), id.team_id.clone()) - }; - let mut policy_getter = policy_getter.lock().unwrap(); - let pids = get_socket_pids(); - let sock_entries = match get_all_socket( - &conf_guard.os_proc_scan_conf, - &mut policy_getter, - conf_guard.epc_id, - pids, - ) { - Err(e) => { - error!("fetch socket info fail: {}", e); - if !Self::wait_timeout(running_guard, stop_notify.clone(), sync_interval) { - return; - } - continue; - } - Ok(mut res) => { - // fill toa - let mut lru_toa = lru_toa_info.lock().unwrap(); - for se in res.iter_mut() { - if se.role == Role::Server { - // the client addr - let sa = match se.remote.ip { - IpAddr::V4(v4) => SocketAddr::V4(SocketAddrV4::new( - v4.clone(), - se.remote.port, - )), - _ => continue, - }; - // get real addr by client addr from toa - if let Some(real_addr) = lru_toa.get_mut(&sa) { - se.real_client = Some(SockAddrData { - epc_id: 0, - ip: real_addr.ip(), - port: real_addr.port(), - }); + Ok(mut res) => { + // fill toa + let mut lru_toa = lru_toa_info.lock().unwrap(); + for se in res.iter_mut() { + if se.role == Role::Server { + // the client addr + let sa = match se.remote.ip { + IpAddr::V4(v4) => { + SocketAddr::V4(SocketAddrV4::new(v4.clone(), se.remote.port)) } + _ => continue, + }; + // get real addr by client addr from toa + if let Some(real_addr) = lru_toa.get_mut(&sa) { + se.real_client = Some(SockAddrData { + epc_id: 0, + ip: real_addr.ip(), + port: real_addr.port(), + }); } } - res } - }; - - match runtime.block_on( - session.grpc_gpid_sync(GpidSyncRequest { - ctrl_ip: Some(ctrl_ip), - ctrl_mac: Some(ctrl_mac), - team_id: Some(team_id), - agent_id: Some(conf_guard.agent_id as u32), - entries: sock_entries - .into_iter() - .filter_map(|sock| { - if let Ok(e) = sock.try_into() { - Some(e) - } else { - None - } - }) - .collect(), - // TODO compress_algorithm - ..Default::default() - }), - ) { - Err(e) => error!("gpid sync fail: {}", e), - Ok(response) => { - let response: GpidSyncResponse = response.into_inner(); - let mut current_entries = vec![]; - for entry in response.entries.iter() { - let e = GpidEntry::try_from(entry); - if e.is_err() { - warn!("{:?}", e); - continue; + res + } + }; + + match runtime.block_on( + session.grpc_gpid_sync(GpidSyncRequest { + ctrl_ip: Some(ctrl_ip), + ctrl_mac: Some(ctrl_mac), + team_id: Some(team_id), + agent_id: Some(conf_guard.agent_id as u32), + entries: sock_entries + .into_iter() + .filter_map(|sock| { + if let Ok(e) = sock.try_into() { + Some(e) + } else { + None } - current_entries.push(e.unwrap()); + }) + .collect(), + // TODO compress_algorithm + ..Default::default() + }), + ) { + Err(e) => error!("gpid sync fail: {}", e), + Ok(response) => { + let response: GpidSyncResponse = response.into_inner(); + let mut current_entries = vec![]; + for entry in response.entries.iter() { + let e = GpidEntry::try_from(entry); + if e.is_err() { + warn!("{:?}", e); + continue; } + current_entries.push(e.unwrap()); + } - if current_entries != last_entries { - policy_setter.update_gpids(¤t_entries); - last_entries = current_entries - } + if current_entries != last_entries { + policy_setter.update_gpids(¤t_entries); + last_entries = current_entries } } } - if !Self::wait_timeout(running_guard, stop_notify.clone(), sync_interval) { + if !Self::wait_timeout(&running, &stop_notify, sync_interval) { return; } } } pub fn stop(&mut self) { + debug!("stopping socket info sync"); let conf_guard = self.config.load(); if !process_info_enabled(conf_guard.agent_type) { return; @@ -322,8 +317,12 @@ impl SocketSynchronizer { info!("socket info sync stop"); } - fn wait_timeout(guard: MutexGuard, stop_notify: Arc, timeout: Duration) -> bool { - *(stop_notify.wait_timeout(guard, timeout).unwrap().0) + fn wait_timeout(running: &Mutex, stop_notify: &Condvar, timeout: Duration) -> bool { + let guard = running.lock().unwrap(); + if !*guard { + return true; + } + !*stop_notify.wait_timeout(guard, timeout).unwrap().0 } fn sync_toa( @@ -338,35 +337,3 @@ impl SocketSynchronizer { info!("toa sync queue close"); } } - -mod config { - use public::proto::agent; - pub struct StaticConfig; - impl StaticConfig { - pub fn get_agent_type(&self) -> agent::AgentType { - todo!() - } - - pub fn is_tt_pod(&self) -> bool { - todo!() - } - } -} - -mod sniffer_builder { - use std::time::Duration; - - use crate::handler::{IpInfo, LldpInfo}; - - pub struct Sniffer; - - impl Sniffer { - pub fn get_ip_records(&self) -> (Duration, Vec) { - (Duration::ZERO, vec![]) - } - - pub fn get_lldp_records(&self) -> Vec { - vec![] - } - } -} diff --git a/agent/src/platform/synchronizer.rs b/agent/src/platform/synchronizer.rs index b94ddc73c3a..7ed79f3a1d4 100644 --- a/agent/src/platform/synchronizer.rs +++ b/agent/src/platform/synchronizer.rs @@ -143,6 +143,8 @@ impl Synchronizer { } pub fn stop(&self) { + debug!("PlatformSynchronizer stopping"); + let mut running_lock = self.running.lock().unwrap(); if !*running_lock { let config_guard = self.config.load(); @@ -331,15 +333,11 @@ impl Synchronizer { } } - fn wait_timeout(running: &Arc>, timer: &Arc, interval: Duration) -> bool { + fn wait_timeout(running: &Mutex, timer: &Condvar, interval: Duration) -> bool { let guard = running.lock().unwrap(); if !*guard { return true; } - let (guard, _) = timer.wait_timeout(guard, interval).unwrap(); - if !*guard { - return true; - } - false + !*timer.wait_timeout(guard, interval).unwrap().0 } } diff --git a/agent/src/trident.rs b/agent/src/trident.rs index 5f28db6528d..37009f61545 100644 --- a/agent/src/trident.rs +++ b/agent/src/trident.rs @@ -2977,7 +2977,7 @@ impl AgentComponents { if self.running.swap(true, Ordering::Relaxed) { return; } - info!("Staring agent components."); + info!("Starting agent components."); self.stats_collector.start(); #[cfg(any(target_os = "linux", target_os = "android"))]