From dd28ad20ef2e89a851e4bae3a635f375dc1d345a Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Fri, 23 Jun 2023 13:06:00 -0600 Subject: [PATCH] use port instead of pidof to detect tor going down (#2320) * use port instead of pidof to detect tor going down * fix errors * healthcheck timeout --- backend/src/net/tor.rs | 98 +++++++++++++++++++++++++++++++----------- 1 file changed, 74 insertions(+), 24 deletions(-) diff --git a/backend/src/net/tor.rs b/backend/src/net/tor.rs index 70125ad29..dcccbfe54 100644 --- a/backend/src/net/tor.rs +++ b/backend/src/net/tor.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; use std::net::SocketAddr; +use std::sync::atomic::AtomicBool; use std::sync::{Arc, Weak}; use std::time::Duration; @@ -31,6 +32,7 @@ use crate::util::{display_none, Invoke}; use crate::{Error, ErrorKind, ResultExt as _}; pub const SYSTEMD_UNIT: &str = "tor@default"; +const STARTING_HEALTH_TIMEOUT: u64 = 120; // 2min enum ErrorLogSeverity { Fatal { wipe_state: bool }, @@ -256,15 +258,10 @@ async fn torctl( tor_socks: SocketAddr, recv: &mut mpsc::UnboundedReceiver, services: &mut BTreeMap<[u8; 64], BTreeMap>>>, + wipe_state: &AtomicBool, + health_timeout: &mut Duration, ) -> Result<(), Error> { let bootstrap = async { - tokio::fs::create_dir_all("/var/lib/tor").await?; - Command::new("chown") - .arg("-R") - .arg("debian-tor") - .arg("/var/lib/tor") - .invoke(ErrorKind::Filesystem) - .await?; if Command::new("systemctl") .arg("is-active") .arg("--quiet") @@ -278,15 +275,30 @@ async fn torctl( .arg("tor") .invoke(ErrorKind::Tor) .await?; - while Command::new("pidof") - .arg("tor") - .invoke(ErrorKind::Tor) - .await - .is_ok() - { + for _ in 0..30 { + if TcpStream::connect(tor_control).await.is_err() { + break; + } tokio::time::sleep(Duration::from_secs(1)).await; } + if TcpStream::connect(tor_control).await.is_ok() { + return Err(Error::new( + eyre!("Tor is failing to shut down"), + ErrorKind::Tor, + )); + } } + if wipe_state.load(std::sync::atomic::Ordering::SeqCst) { + tokio::fs::remove_dir_all("/var/lib/tor").await?; + wipe_state.store(false, std::sync::atomic::Ordering::SeqCst); + } + tokio::fs::create_dir_all("/var/lib/tor").await?; + Command::new("chown") + .arg("-R") + .arg("debian-tor") + .arg("/var/lib/tor") + .invoke(ErrorKind::Filesystem) + .await?; Command::new("systemctl") .arg("start") .arg("tor") @@ -393,7 +405,14 @@ async fn torctl( ))) .unwrap_or_default(); } - _ => (), + TorCommand::GC { .. } => (), + TorCommand::Reset { + wipe_state: new_wipe_state, + context, + } => { + wipe_state.fetch_or(new_wipe_state, std::sync::atomic::Ordering::SeqCst); + return Err(context); + } } } Ok(()) @@ -546,17 +565,10 @@ async fn torctl( .unwrap_or_default(); } TorCommand::Reset { - wipe_state, + wipe_state: new_wipe_state, context, } => { - if wipe_state { - Command::new("systemctl") - .arg("stop") - .arg("tor") - .invoke(ErrorKind::Tor) - .await?; - tokio::fs::remove_dir_all("/var/lib/tor").await?; - } + wipe_state.fetch_or(new_wipe_state, std::sync::atomic::Ordering::SeqCst); return Err(context); } } @@ -601,10 +613,37 @@ async fn torctl( } Err(Error::new(eyre!("Log stream terminated"), ErrorKind::Tor)) }; + let health_checker = async { + let mut last_success = Instant::now(); + loop { + tokio::time::sleep(Duration::from_secs(30)).await; + if let Err(e) = tokio::time::timeout( + Duration::from_secs(30), + tokio_socks::tcp::Socks5Stream::connect( + tor_socks, + (hck_key.public().get_onion_address().to_string(), 80), + ), + ) + .await + .map_err(|e| e.to_string()) + .and_then(|e| e.map_err(|e| e.to_string())) + { + if last_success.elapsed() > *health_timeout { + let err = Error::new(eyre!("Tor health check failed for longer than current timeout ({health_timeout:?})"), crate::ErrorKind::Tor); + *health_timeout *= 2; + wipe_state.store(true, std::sync::atomic::Ordering::SeqCst); + return Err(err); + } + } else { + last_success = Instant::now(); + } + } + }; tokio::select! { res = handler => res?, res = log_parser => res?, + res = health_checker => res?, } Ok(()) @@ -620,7 +659,18 @@ impl TorControl { Self { _thread: tokio::spawn(async move { let mut services = BTreeMap::new(); - while let Err(e) = torctl(tor_control, tor_socks, &mut recv, &mut services).await { + let wipe_state = AtomicBool::new(false); + let mut health_timeout = Duration::from_secs(STARTING_HEALTH_TIMEOUT); + while let Err(e) = torctl( + tor_control, + tor_socks, + &mut recv, + &mut services, + &wipe_state, + &mut health_timeout, + ) + .await + { tracing::error!("{e}: Restarting tor"); tracing::debug!("{e:?}"); }