Skip to content

Commit

Permalink
fix: Bad use of lock in socket synchronizer blocks agent exit
Browse files Browse the repository at this point in the history
  • Loading branch information
rvql committed Dec 19, 2024
1 parent 0b1bbc5 commit 73d0a57
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 135 deletions.
223 changes: 95 additions & 128 deletions agent/src/platform/platform_synchronizer/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

use std::{
net::{IpAddr, SocketAddr, SocketAddrV4},
sync::{Arc, Condvar, Mutex, MutexGuard, RwLock as SysRwLock},
sync::{Arc, Condvar, Mutex, RwLock as SysRwLock},
thread,
time::Duration,
};

use arc_swap::access::Access;
use log::{error, info, warn};
use log::{debug, error, info, warn};
use parking_lot::RwLock;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -191,122 +191,117 @@ impl SocketSynchronizer {
let mut last_entries: Vec<GpidEntry> = vec![];

loop {
let running_guard = running.lock().unwrap();
let sync_interval;

{
let conf_guard = config.load();
sync_interval = Duration::from_secs(
conf_guard.os_proc_scan_conf.os_proc_socket_sync_interval as u64,
);
let conf_guard = config.load();
let sync_interval = Duration::from_secs(
conf_guard.os_proc_scan_conf.os_proc_socket_sync_interval as u64,
);

if sync_interval == Duration::ZERO {
thread::sleep(Duration::from_secs(1));
continue;
}

if sync_interval == Duration::ZERO {
thread::sleep(Duration::from_secs(1));
continue;
// wait for config from server
if !conf_guard.os_proc_scan_conf.os_proc_sync_enabled {
if !Self::wait_timeout(&running, &stop_notify, sync_interval) {
return;
}
continue;
}

// wait for config from server
if !conf_guard.os_proc_scan_conf.os_proc_sync_enabled {
if !Self::wait_timeout(running_guard, stop_notify.clone(), sync_interval) {
let (ctrl_ip, ctrl_mac, team_id) = {
let id = agent_id.read();
(id.ip.to_string(), id.mac.to_string(), id.team_id.clone())
};
let mut policy_getter = policy_getter.lock().unwrap();
let pids = get_socket_pids();
let sock_entries = match get_all_socket(
&conf_guard.os_proc_scan_conf,
&mut policy_getter,
conf_guard.epc_id,
pids,
) {
Err(e) => {
error!("fetch socket info fail: {}", e);
if !Self::wait_timeout(&running, &stop_notify, sync_interval) {
return;
}
continue;
}

let (ctrl_ip, ctrl_mac, team_id) = {
let id = agent_id.read();
(id.ip.to_string(), id.mac.to_string(), id.team_id.clone())
};
let mut policy_getter = policy_getter.lock().unwrap();
let pids = get_socket_pids();
let sock_entries = match get_all_socket(
&conf_guard.os_proc_scan_conf,
&mut policy_getter,
conf_guard.epc_id,
pids,
) {
Err(e) => {
error!("fetch socket info fail: {}", e);
if !Self::wait_timeout(running_guard, stop_notify.clone(), sync_interval) {
return;
}
continue;
}
Ok(mut res) => {
// fill toa
let mut lru_toa = lru_toa_info.lock().unwrap();
for se in res.iter_mut() {
if se.role == Role::Server {
// the client addr
let sa = match se.remote.ip {
IpAddr::V4(v4) => SocketAddr::V4(SocketAddrV4::new(
v4.clone(),
se.remote.port,
)),
_ => continue,
};
// get real addr by client addr from toa
if let Some(real_addr) = lru_toa.get_mut(&sa) {
se.real_client = Some(SockAddrData {
epc_id: 0,
ip: real_addr.ip(),
port: real_addr.port(),
});
Ok(mut res) => {
// fill toa
let mut lru_toa = lru_toa_info.lock().unwrap();
for se in res.iter_mut() {
if se.role == Role::Server {
// the client addr
let sa = match se.remote.ip {
IpAddr::V4(v4) => {
SocketAddr::V4(SocketAddrV4::new(v4.clone(), se.remote.port))
}
_ => continue,
};
// get real addr by client addr from toa
if let Some(real_addr) = lru_toa.get_mut(&sa) {
se.real_client = Some(SockAddrData {
epc_id: 0,
ip: real_addr.ip(),
port: real_addr.port(),
});
}
}
res
}
};

match runtime.block_on(
session.grpc_gpid_sync(GpidSyncRequest {
ctrl_ip: Some(ctrl_ip),
ctrl_mac: Some(ctrl_mac),
team_id: Some(team_id),
agent_id: Some(conf_guard.agent_id as u32),
entries: sock_entries
.into_iter()
.filter_map(|sock| {
if let Ok(e) = sock.try_into() {
Some(e)
} else {
None
}
})
.collect(),
// TODO compress_algorithm
..Default::default()
}),
) {
Err(e) => error!("gpid sync fail: {}", e),
Ok(response) => {
let response: GpidSyncResponse = response.into_inner();
let mut current_entries = vec![];
for entry in response.entries.iter() {
let e = GpidEntry::try_from(entry);
if e.is_err() {
warn!("{:?}", e);
continue;
res
}
};

match runtime.block_on(
session.grpc_gpid_sync(GpidSyncRequest {
ctrl_ip: Some(ctrl_ip),
ctrl_mac: Some(ctrl_mac),
team_id: Some(team_id),
agent_id: Some(conf_guard.agent_id as u32),
entries: sock_entries
.into_iter()
.filter_map(|sock| {
if let Ok(e) = sock.try_into() {
Some(e)
} else {
None
}
current_entries.push(e.unwrap());
})
.collect(),
// TODO compress_algorithm
..Default::default()
}),
) {
Err(e) => error!("gpid sync fail: {}", e),
Ok(response) => {
let response: GpidSyncResponse = response.into_inner();
let mut current_entries = vec![];
for entry in response.entries.iter() {
let e = GpidEntry::try_from(entry);
if e.is_err() {
warn!("{:?}", e);
continue;
}
current_entries.push(e.unwrap());
}

if current_entries != last_entries {
policy_setter.update_gpids(&current_entries);
last_entries = current_entries
}
if current_entries != last_entries {
policy_setter.update_gpids(&current_entries);
last_entries = current_entries
}
}
}

if !Self::wait_timeout(running_guard, stop_notify.clone(), sync_interval) {
if !Self::wait_timeout(&running, &stop_notify, sync_interval) {
return;
}
}
}

pub fn stop(&mut self) {
debug!("stopping socket info sync");
let conf_guard = self.config.load();
if !process_info_enabled(conf_guard.agent_type) {
return;
Expand All @@ -322,8 +317,12 @@ impl SocketSynchronizer {
info!("socket info sync stop");
}

fn wait_timeout(guard: MutexGuard<bool>, stop_notify: Arc<Condvar>, timeout: Duration) -> bool {
*(stop_notify.wait_timeout(guard, timeout).unwrap().0)
fn wait_timeout(running: &Mutex<bool>, stop_notify: &Condvar, timeout: Duration) -> bool {
let guard = running.lock().unwrap();
if !*guard {
return true;
}
!*stop_notify.wait_timeout(guard, timeout).unwrap().0
}

fn sync_toa(
Expand All @@ -338,35 +337,3 @@ impl SocketSynchronizer {
info!("toa sync queue close");
}
}

mod config {
use public::proto::agent;
pub struct StaticConfig;
impl StaticConfig {
pub fn get_agent_type(&self) -> agent::AgentType {
todo!()
}

pub fn is_tt_pod(&self) -> bool {
todo!()
}
}
}

mod sniffer_builder {
use std::time::Duration;

use crate::handler::{IpInfo, LldpInfo};

pub struct Sniffer;

impl Sniffer {
pub fn get_ip_records(&self) -> (Duration, Vec<IpInfo>) {
(Duration::ZERO, vec![])
}

pub fn get_lldp_records(&self) -> Vec<LldpInfo> {
vec![]
}
}
}
10 changes: 4 additions & 6 deletions agent/src/platform/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ impl Synchronizer {
}

pub fn stop(&self) {
debug!("PlatformSynchronizer stopping");

let mut running_lock = self.running.lock().unwrap();
if !*running_lock {
let config_guard = self.config.load();
Expand Down Expand Up @@ -331,15 +333,11 @@ impl Synchronizer {
}
}

fn wait_timeout(running: &Arc<Mutex<bool>>, timer: &Arc<Condvar>, interval: Duration) -> bool {
fn wait_timeout(running: &Mutex<bool>, timer: &Condvar, interval: Duration) -> bool {
let guard = running.lock().unwrap();
if !*guard {
return true;
}
let (guard, _) = timer.wait_timeout(guard, interval).unwrap();
if !*guard {
return true;
}
false
!*timer.wait_timeout(guard, interval).unwrap().0
}
}
2 changes: 1 addition & 1 deletion agent/src/trident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2977,7 +2977,7 @@ impl AgentComponents {
if self.running.swap(true, Ordering::Relaxed) {
return;
}
info!("Staring agent components.");
info!("Starting agent components.");
self.stats_collector.start();

#[cfg(any(target_os = "linux", target_os = "android"))]
Expand Down

0 comments on commit 73d0a57

Please sign in to comment.