Skip to content

Commit

Permalink
use port instead of pidof to detect tor going down (#2320)
Browse files Browse the repository at this point in the history
* use port instead of pidof to detect tor going down

* fix errors

* healthcheck timeout
  • Loading branch information
dr-bonez authored Jun 23, 2023
1 parent ef416ef commit dd28ad2
Showing 1 changed file with 74 additions and 24 deletions.
98 changes: 74 additions & 24 deletions backend/src/net/tor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Weak};
use std::time::Duration;

Expand Down Expand Up @@ -31,6 +32,7 @@ use crate::util::{display_none, Invoke};
use crate::{Error, ErrorKind, ResultExt as _};

pub const SYSTEMD_UNIT: &str = "tor@default";
const STARTING_HEALTH_TIMEOUT: u64 = 120; // 2min

enum ErrorLogSeverity {
Fatal { wipe_state: bool },
Expand Down Expand Up @@ -256,15 +258,10 @@ async fn torctl(
tor_socks: SocketAddr,
recv: &mut mpsc::UnboundedReceiver<TorCommand>,
services: &mut BTreeMap<[u8; 64], BTreeMap<u16, BTreeMap<SocketAddr, Weak<()>>>>,
wipe_state: &AtomicBool,
health_timeout: &mut Duration,
) -> Result<(), Error> {
let bootstrap = async {
tokio::fs::create_dir_all("/var/lib/tor").await?;
Command::new("chown")
.arg("-R")
.arg("debian-tor")
.arg("/var/lib/tor")
.invoke(ErrorKind::Filesystem)
.await?;
if Command::new("systemctl")
.arg("is-active")
.arg("--quiet")
Expand All @@ -278,15 +275,30 @@ async fn torctl(
.arg("tor")
.invoke(ErrorKind::Tor)
.await?;
while Command::new("pidof")
.arg("tor")
.invoke(ErrorKind::Tor)
.await
.is_ok()
{
for _ in 0..30 {
if TcpStream::connect(tor_control).await.is_err() {
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
if TcpStream::connect(tor_control).await.is_ok() {
return Err(Error::new(
eyre!("Tor is failing to shut down"),
ErrorKind::Tor,
));
}
}
if wipe_state.load(std::sync::atomic::Ordering::SeqCst) {
tokio::fs::remove_dir_all("/var/lib/tor").await?;
wipe_state.store(false, std::sync::atomic::Ordering::SeqCst);
}
tokio::fs::create_dir_all("/var/lib/tor").await?;
Command::new("chown")
.arg("-R")
.arg("debian-tor")
.arg("/var/lib/tor")
.invoke(ErrorKind::Filesystem)
.await?;
Command::new("systemctl")
.arg("start")
.arg("tor")
Expand Down Expand Up @@ -393,7 +405,14 @@ async fn torctl(
)))
.unwrap_or_default();
}
_ => (),
TorCommand::GC { .. } => (),
TorCommand::Reset {
wipe_state: new_wipe_state,
context,
} => {
wipe_state.fetch_or(new_wipe_state, std::sync::atomic::Ordering::SeqCst);
return Err(context);
}
}
}
Ok(())
Expand Down Expand Up @@ -546,17 +565,10 @@ async fn torctl(
.unwrap_or_default();
}
TorCommand::Reset {
wipe_state,
wipe_state: new_wipe_state,
context,
} => {
if wipe_state {
Command::new("systemctl")
.arg("stop")
.arg("tor")
.invoke(ErrorKind::Tor)
.await?;
tokio::fs::remove_dir_all("/var/lib/tor").await?;
}
wipe_state.fetch_or(new_wipe_state, std::sync::atomic::Ordering::SeqCst);
return Err(context);
}
}
Expand Down Expand Up @@ -601,10 +613,37 @@ async fn torctl(
}
Err(Error::new(eyre!("Log stream terminated"), ErrorKind::Tor))
};
let health_checker = async {
let mut last_success = Instant::now();
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
if let Err(e) = tokio::time::timeout(
Duration::from_secs(30),
tokio_socks::tcp::Socks5Stream::connect(
tor_socks,
(hck_key.public().get_onion_address().to_string(), 80),
),
)
.await
.map_err(|e| e.to_string())
.and_then(|e| e.map_err(|e| e.to_string()))
{
if last_success.elapsed() > *health_timeout {
let err = Error::new(eyre!("Tor health check failed for longer than current timeout ({health_timeout:?})"), crate::ErrorKind::Tor);
*health_timeout *= 2;
wipe_state.store(true, std::sync::atomic::Ordering::SeqCst);
return Err(err);
}
} else {
last_success = Instant::now();
}
}
};

tokio::select! {
res = handler => res?,
res = log_parser => res?,
res = health_checker => res?,
}

Ok(())
Expand All @@ -620,7 +659,18 @@ impl TorControl {
Self {
_thread: tokio::spawn(async move {
let mut services = BTreeMap::new();
while let Err(e) = torctl(tor_control, tor_socks, &mut recv, &mut services).await {
let wipe_state = AtomicBool::new(false);
let mut health_timeout = Duration::from_secs(STARTING_HEALTH_TIMEOUT);
while let Err(e) = torctl(
tor_control,
tor_socks,
&mut recv,
&mut services,
&wipe_state,
&mut health_timeout,
)
.await
{
tracing::error!("{e}: Restarting tor");
tracing::debug!("{e:?}");
}
Expand Down

0 comments on commit dd28ad2

Please sign in to comment.