From fc142cfde8ca4320ac17fbc4973244a876087180 Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Thu, 8 Jun 2023 22:41:47 +0000 Subject: [PATCH] reset tor (#2296) * reset tor * Update tor.rs * timeout connect * handle stuck bootstrapping --- backend/Cargo.lock | 1 + backend/Cargo.toml | 1 + backend/src/context/rpc.rs | 1 + backend/src/init.rs | 24 -- backend/src/logs.rs | 12 +- backend/src/net/net_controller.rs | 11 +- backend/src/net/tor.rs | 645 ++++++++++++++++++++++++------ system-images/compat/Cargo.lock | 1 + 8 files changed, 544 insertions(+), 152 deletions(-) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 93b8198d0..78ec8ec5c 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1442,6 +1442,7 @@ dependencies = [ "thiserror", "tokio", "tokio-rustls", + "tokio-socks", "tokio-stream", "tokio-tar", "tokio-tungstenite", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index a829bd7f9..8406337d8 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -152,6 +152,7 @@ tokio-stream = { version = "0.1.11", features = ["io-util", "sync", "net"] } tokio-tar = { git = "https://github.com/dr-bonez/tokio-tar.git" } tokio-tungstenite = { version = "0.17.1", features = ["native-tls"] } tokio-rustls = "0.23.4" +tokio-socks = "0.5.1" tokio-util = { version = "0.7.3", features = ["io"] } torut = "0.2.1" tracing = "0.1.35" diff --git a/backend/src/context/rpc.rs b/backend/src/context/rpc.rs index 9acb1cd4e..cf2d59875 100644 --- a/backend/src/context/rpc.rs +++ b/backend/src/context/rpc.rs @@ -197,6 +197,7 @@ impl RpcContext { NetController::init( base.tor_control .unwrap_or(SocketAddr::from(([127, 0, 0, 1], 9051))), + tor_proxy, base.dns_bind .as_ref() .map(|v| v.as_slice()) diff --git a/backend/src/init.rs b/backend/src/init.rs index 9847e10c8..38192f993 100644 --- a/backend/src/init.rs +++ b/backend/src/init.rs @@ -40,10 +40,6 @@ pub async fn check_time_is_synchronized() -> Result { == "NTPSynchronized=yes") } -pub async fn check_tor_is_ready(tor_control: SocketAddr) -> bool { - tokio::net::TcpStream::connect(tor_control).await.is_ok() -} - pub struct InitReceipts { pub server_version: LockReceipt, pub version_range: LockReceipt, @@ -403,26 +399,6 @@ pub async fn init(cfg: &RpcContextConfig) -> Result { tracing::info!("Syncronized system clock"); } - Command::new("systemctl") - .arg("start") - .arg("tor") - .invoke(crate::ErrorKind::Tor) - .await?; - - let mut warn_tor_not_ready = true; - for _ in 0..60 { - if check_tor_is_ready(cfg.tor_control.unwrap_or(([127, 0, 0, 1], 9051).into())).await { - warn_tor_not_ready = false; - break; - } - tokio::time::sleep(Duration::from_secs(1)).await; - } - if warn_tor_not_ready { - tracing::warn!("Timed out waiting for tor to start"); - } else { - tracing::info!("Tor is started"); - } - receipts .ip_info .set(&mut handle, crate::net::dhcp::init_ips().await?) diff --git a/backend/src/logs.rs b/backend/src/logs.rs index af3a6898d..30d73f4d7 100644 --- a/backend/src/logs.rs +++ b/backend/src/logs.rs @@ -33,7 +33,7 @@ use crate::util::serde::Reversible; use crate::{Error, ErrorKind}; #[pin_project::pin_project] -struct LogStream { +pub struct LogStream { _child: Child, #[pin] entries: BoxStream<'static, Result>, @@ -141,14 +141,14 @@ impl std::fmt::Display for LogEntry { } #[derive(Serialize, Deserialize, Debug)] -struct JournalctlEntry { +pub struct JournalctlEntry { #[serde(rename = "__REALTIME_TIMESTAMP")] - timestamp: String, + pub timestamp: String, #[serde(rename = "MESSAGE")] #[serde(deserialize_with = "deserialize_string_or_utf8_array")] - message: String, + pub message: String, #[serde(rename = "__CURSOR")] - cursor: String, + pub cursor: String, } impl JournalctlEntry { fn log_entry(self) -> Result<(String, LogEntry), Error> { @@ -344,7 +344,7 @@ pub async fn cli_logs_generic_follow( Ok(()) } -async fn journalctl( +pub async fn journalctl( id: LogSource, limit: usize, cursor: Option<&str>, diff --git a/backend/src/net/net_controller.rs b/backend/src/net/net_controller.rs index 2bccd99f8..c04707d37 100644 --- a/backend/src/net/net_controller.rs +++ b/backend/src/net/net_controller.rs @@ -34,6 +34,7 @@ impl NetController { #[instrument(skip_all)] pub async fn init( tor_control: SocketAddr, + tor_socks: SocketAddr, dns_bind: &[SocketAddr], ssl: SslManager, hostname: &Hostname, @@ -41,7 +42,7 @@ impl NetController { ) -> Result { let ssl = Arc::new(ssl); let mut res = Self { - tor: TorController::init(tor_control).await?, + tor: TorController::new(tor_control, tor_socks), #[cfg(feature = "avahi")] mdns: MdnsController::init().await?, vhost: VHostController::new(ssl.clone()), @@ -114,7 +115,7 @@ impl NetController { // Tor (http) self.os_bindings.push( self.tor - .add(&key.tor_key(), 80, ([127, 0, 0, 1], 80).into()) + .add(key.tor_key(), 80, ([127, 0, 0, 1], 80).into()) .await?, ); @@ -132,7 +133,7 @@ impl NetController { ); self.os_bindings.push( self.tor - .add(&key.tor_key(), 443, ([127, 0, 0, 1], 443).into()) + .add(key.tor_key(), 443, ([127, 0, 0, 1], 443).into()) .await?, ); @@ -164,13 +165,13 @@ impl NetController { target: SocketAddr, ) -> Result>, Error> { let mut rcs = Vec::with_capacity(1); - rcs.push(self.tor.add(&key.tor_key(), external, target).await?); + rcs.push(self.tor.add(key.tor_key(), external, target).await?); Ok(rcs) } async fn remove_tor(&self, key: &Key, external: u16, rcs: Vec>) -> Result<(), Error> { drop(rcs); - self.tor.gc(&key.tor_key(), external).await + self.tor.gc(Some(key.tor_key()), Some(external)).await } async fn add_lan( diff --git a/backend/src/net/tor.rs b/backend/src/net/tor.rs index 2a7e57c2f..5aaddb755 100644 --- a/backend/src/net/tor.rs +++ b/backend/src/net/tor.rs @@ -1,32 +1,81 @@ use std::collections::BTreeMap; use std::net::SocketAddr; use std::sync::{Arc, Weak}; +use std::time::Duration; use clap::ArgMatches; use color_eyre::eyre::eyre; use futures::future::BoxFuture; -use futures::FutureExt; +use futures::{FutureExt, TryStreamExt}; +use helpers::NonDetachingJoinHandle; +use itertools::Itertools; +use lazy_static::lazy_static; +use regex::Regex; use rpc_toolkit::command; +use rpc_toolkit::yajrc::RpcError; use tokio::net::TcpStream; -use tokio::sync::Mutex; +use tokio::process::Command; +use tokio::sync::{mpsc, oneshot}; +use tokio::time::Instant; use torut::control::{AsyncEvent, AuthenticatedConn, ConnError}; use torut::onion::{OnionAddressV3, TorSecretKeyV3}; use tracing::instrument; -use crate::context::RpcContext; +use crate::context::{CliContext, RpcContext}; +use crate::logs::{ + cli_logs_generic_follow, cli_logs_generic_nofollow, fetch_logs, follow_logs, journalctl, + LogFollowResponse, LogResponse, LogSource, +}; use crate::util::serde::{display_serializable, IoFormat}; +use crate::util::{display_none, Invoke}; use crate::{Error, ErrorKind, ResultExt as _}; +pub const SYSTEMD_UNIT: &str = "tor@default"; + +enum ErrorLogSeverity { + Fatal { wipe_state: bool }, + Unknown { wipe_state: bool }, +} + +lazy_static! { + static ref LOG_REGEXES: Vec<(Regex, ErrorLogSeverity)> = vec![( + Regex::new("Most likely this means the Tor network is overloaded").unwrap(), + ErrorLogSeverity::Unknown { wipe_state: true } + ),( + Regex::new("This could indicate a route manipulation attack, network overload, bad local network connectivity, or a bug\\.").unwrap(), + ErrorLogSeverity::Unknown { wipe_state: true } + ),( + Regex::new("died due to an invalid selected path").unwrap(), + ErrorLogSeverity::Fatal { wipe_state: false } + ),( + Regex::new("Tor has not observed any network activity for the past").unwrap(), + ErrorLogSeverity::Unknown { wipe_state: false } + )]; + static ref PROGRESS_REGEX: Regex = Regex::new("PROGRESS=([0-9]+)").unwrap(); +} + #[test] fn random_key() { println!("x'{}'", hex::encode(rand::random::<[u8; 32]>())); } -#[command(subcommands(list_services))] +#[command(subcommands(list_services, logs, reset))] pub fn tor() -> Result<(), Error> { Ok(()) } +#[command(display(display_none))] +pub async fn reset( + #[context] ctx: RpcContext, + #[arg(rename = "wipe-state", short = 'w', long = "wipe-state")] wipe_state: bool, + #[arg] reason: String, +) -> Result<(), Error> { + ctx.net_controller + .tor + .reset(wipe_state, Error::new(eyre!("{reason}"), ErrorKind::Tor)) + .await +} + fn display_services(services: Vec, matches: &ArgMatches) { use prettytable::*; @@ -52,133 +101,209 @@ pub async fn list_services( ctx.net_controller.tor.list_services().await } +#[command( + custom_cli(cli_logs(async, context(CliContext))), + subcommands(self(logs_nofollow(async)), logs_follow), + display(display_none) +)] +pub async fn logs( + #[arg(short = 'l', long = "limit")] limit: Option, + #[arg(short = 'c', long = "cursor")] cursor: Option, + #[arg(short = 'B', long = "before", default)] before: bool, + #[arg(short = 'f', long = "follow", default)] follow: bool, +) -> Result<(Option, Option, bool, bool), Error> { + Ok((limit, cursor, before, follow)) +} +pub async fn cli_logs( + ctx: CliContext, + (limit, cursor, before, follow): (Option, Option, bool, bool), +) -> Result<(), RpcError> { + if follow { + if cursor.is_some() { + return Err(RpcError::from(Error::new( + eyre!("The argument '--cursor ' cannot be used with '--follow'"), + crate::ErrorKind::InvalidRequest, + ))); + } + if before { + return Err(RpcError::from(Error::new( + eyre!("The argument '--before' cannot be used with '--follow'"), + crate::ErrorKind::InvalidRequest, + ))); + } + cli_logs_generic_follow(ctx, "net.tor.logs.follow", None, limit).await + } else { + cli_logs_generic_nofollow(ctx, "net.tor.logs", None, limit, cursor, before).await + } +} +pub async fn logs_nofollow( + _ctx: (), + (limit, cursor, before, _): (Option, Option, bool, bool), +) -> Result { + fetch_logs(LogSource::Service(SYSTEMD_UNIT), limit, cursor, before).await +} + +#[command(rpc_only, rename = "follow", display(display_none))] +pub async fn logs_follow( + #[context] ctx: RpcContext, + #[parent_data] (limit, _, _, _): (Option, Option, bool, bool), +) -> Result { + follow_logs(ctx, LogSource::Service(SYSTEMD_UNIT), limit).await +} + fn event_handler(_event: AsyncEvent<'static>) -> BoxFuture<'static, Result<(), ConnError>> { async move { Ok(()) }.boxed() } -pub struct TorController(Mutex); +pub struct TorController(TorControl); impl TorController { - pub async fn init(tor_control: SocketAddr) -> Result { - Ok(TorController(Mutex::new( - TorControllerInner::init(tor_control).await?, - ))) + pub fn new(tor_control: SocketAddr, tor_socks: SocketAddr) -> Self { + TorController(TorControl::new(tor_control, tor_socks)) } pub async fn add( &self, - key: &TorSecretKeyV3, + key: TorSecretKeyV3, external: u16, target: SocketAddr, ) -> Result, Error> { - self.0.lock().await.add(key, external, target).await + let (reply, res) = oneshot::channel(); + self.0 + .send + .send(TorCommand::AddOnion { + key, + external, + target, + reply, + }) + .ok() + .ok_or_else(|| Error::new(eyre!("TorControl died"), ErrorKind::Tor))?; + res.await + .ok() + .ok_or_else(|| Error::new(eyre!("TorControl died"), ErrorKind::Tor)) } - pub async fn gc(&self, key: &TorSecretKeyV3, external: u16) -> Result<(), Error> { - self.0.lock().await.gc(key, external).await + pub async fn gc( + &self, + key: Option, + external: Option, + ) -> Result<(), Error> { + self.0 + .send + .send(TorCommand::GC { key, external }) + .ok() + .ok_or_else(|| Error::new(eyre!("TorControl died"), ErrorKind::Tor)) + } + + pub async fn reset(&self, wipe_state: bool, context: Error) -> Result<(), Error> { + self.0 + .send + .send(TorCommand::Reset { + wipe_state, + context, + }) + .ok() + .ok_or_else(|| Error::new(eyre!("TorControl died"), ErrorKind::Tor)) } pub async fn list_services(&self) -> Result, Error> { - self.0.lock().await.list_services().await + let (reply, res) = oneshot::channel(); + self.0 + .send + .send(TorCommand::GetInfo { + query: "onions/current".into(), + reply, + }) + .ok() + .ok_or_else(|| Error::new(eyre!("TorControl died"), ErrorKind::Tor))?; + res.await + .ok() + .ok_or_else(|| Error::new(eyre!("TorControl died"), ErrorKind::Tor))?? + .lines() + .map(|l| l.trim()) + .filter(|l| !l.is_empty()) + .map(|l| l.parse().with_kind(ErrorKind::Tor)) + .collect() } } type AuthenticatedConnection = AuthenticatedConn< TcpStream, - fn(AsyncEvent<'static>) -> BoxFuture<'static, Result<(), ConnError>>, + Box) -> BoxFuture<'static, Result<(), ConnError>> + Send + Sync>, >; -pub struct TorControllerInner { - control_addr: SocketAddr, - connection: AuthenticatedConnection, - services: BTreeMap>>>, -} -impl TorControllerInner { - #[instrument(skip_all)] - async fn add( - &mut self, - key: &TorSecretKeyV3, +enum TorCommand { + AddOnion { + key: TorSecretKeyV3, external: u16, target: SocketAddr, - ) -> Result, Error> { - let mut rm_res = Ok(()); - let onion_base = key - .public() - .get_onion_address() - .get_address_without_dot_onion(); - let mut service = if let Some(service) = self.services.remove(&onion_base) { - rm_res = self.connection.del_onion(&onion_base).await; - service - } else { - BTreeMap::new() - }; - let mut binding = service.remove(&external).unwrap_or_default(); - let rc = if let Some(rc) = Weak::upgrade(&binding.remove(&target).unwrap_or_default()) { - rc - } else { - Arc::new(()) - }; - binding.insert(target, Arc::downgrade(&rc)); - service.insert(external, binding); - let bindings = service - .iter() - .flat_map(|(ext, int)| { - int.iter() - .find(|(_, rc)| rc.strong_count() > 0) - .map(|(addr, _)| (*ext, SocketAddr::from(*addr))) - }) - .collect::>(); - self.services.insert(onion_base, service); - rm_res?; - self.connection - .add_onion_v3(key, false, false, false, None, &mut bindings.iter()) + reply: oneshot::Sender>, + }, + GC { + key: Option, + external: Option, + }, + GetInfo { + query: String, + reply: oneshot::Sender>, + }, + Reset { + wipe_state: bool, + context: Error, + }, +} + +#[instrument(skip_all)] +async fn torctl( + tor_control: SocketAddr, + tor_socks: SocketAddr, + recv: &mut mpsc::UnboundedReceiver, + services: &mut BTreeMap<[u8; 64], BTreeMap>>>, +) -> Result<(), Error> { + let bootstrap = async { + tokio::fs::create_dir_all("/var/lib/tor").await?; + Command::new("chown") + .arg("-R") + .arg("debian-tor") + .arg("/var/lib/tor") + .invoke(ErrorKind::Filesystem) .await?; - Ok(rc) - } - - #[instrument(skip_all)] - async fn gc(&mut self, key: &TorSecretKeyV3, external: u16) -> Result<(), Error> { - let onion_base = key - .public() - .get_onion_address() - .get_address_without_dot_onion(); - if let Some(mut service) = self.services.remove(&onion_base) { - if let Some(mut binding) = service.remove(&external) { - binding = binding - .into_iter() - .filter(|(_, rc)| rc.strong_count() > 0) - .collect(); - if !binding.is_empty() { - service.insert(external, binding); - } - } - let rm_res = self.connection.del_onion(&onion_base).await; - if !service.is_empty() { - let bindings = service - .iter() - .flat_map(|(ext, int)| { - int.iter() - .find(|(_, rc)| rc.strong_count() > 0) - .map(|(addr, _)| (*ext, SocketAddr::from(*addr))) - }) - .collect::>(); - self.services.insert(onion_base, service); - rm_res?; - self.connection - .add_onion_v3(&key, false, false, false, None, &mut bindings.iter()) - .await?; - } else { - rm_res?; - } + if Command::new("systemctl") + .arg("is-active") + .arg("--quiet") + .arg("tor") + .invoke(ErrorKind::Tor) + .await + .is_ok() + { + Command::new("systemctl") + .arg("stop") + .arg("tor") + .invoke(ErrorKind::Tor) + .await?; } + Command::new("systemctl") + .arg("start") + .arg("tor") + .invoke(ErrorKind::Tor) + .await?; - Ok(()) - } + let logs = journalctl(LogSource::Service(SYSTEMD_UNIT), 0, None, false, true).await?; - #[instrument(skip_all)] - async fn init(tor_control: SocketAddr) -> Result { - let mut conn = torut::control::UnauthenticatedConn::new( - TcpStream::connect(tor_control).await?, // TODO - ); + let mut tcp_stream = None; + for _ in 0..60 { + if let Ok(conn) = TcpStream::connect(tor_control).await { + tcp_stream = Some(conn); + break; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + let tcp_stream = tcp_stream.ok_or_else(|| { + Error::new(eyre!("Timed out waiting for tor to start"), ErrorKind::Tor) + })?; + tracing::info!("Tor is started"); + + let mut conn = torut::control::UnauthenticatedConn::new(tcp_stream); let auth = conn .load_protocol_info() .await? @@ -187,25 +312,311 @@ impl TorControllerInner { .with_kind(crate::ErrorKind::Tor)?; conn.authenticate(&auth).await?; let mut connection: AuthenticatedConnection = conn.into_authenticated().await; - connection.set_async_event_handler(Some(event_handler)); + connection.set_async_event_handler(Some(Box::new(|event| event_handler(event)))); - Ok(Self { - control_addr: tor_control, - connection, - services: BTreeMap::new(), - }) + let mut bootstrapped = false; + let mut last_increment = (String::new(), Instant::now()); + for _ in 0..300 { + match connection.get_info("status/bootstrap-phase").await { + Ok(a) => { + if a.contains("TAG=done") { + bootstrapped = true; + break; + } + if let Some(p) = PROGRESS_REGEX.captures(&a) { + if let Some(p) = p.get(1) { + if p.as_str() != &*last_increment.0 { + last_increment = (p.as_str().into(), Instant::now()); + } + } + } + } + Err(e) => { + let e = Error::from(e); + tracing::error!("{}", e); + tracing::debug!("{:?}", e); + } + } + if last_increment.1.elapsed() > Duration::from_secs(30) { + return Err(Error::new( + eyre!("Tor stuck bootstrapping at {}%", last_increment.0), + ErrorKind::Tor, + )); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + if !bootstrapped { + return Err(Error::new( + eyre!("Timed out waiting for tor to bootstrap"), + ErrorKind::Tor, + )); + } + Ok((connection, logs)) + }; + let pre_handler = async { + while let Some(command) = recv.recv().await { + match command { + TorCommand::AddOnion { + key, + external, + target, + reply, + } => { + let mut service = if let Some(service) = services.remove(&key.as_bytes()) { + service + } else { + BTreeMap::new() + }; + let mut binding = service.remove(&external).unwrap_or_default(); + let rc = if let Some(rc) = + Weak::upgrade(&binding.remove(&target).unwrap_or_default()) + { + rc + } else { + Arc::new(()) + }; + binding.insert(target, Arc::downgrade(&rc)); + service.insert(external, binding); + services.insert(key.as_bytes(), service); + reply.send(rc).unwrap_or_default(); + } + TorCommand::GetInfo { reply, .. } => { + reply + .send(Err(Error::new( + eyre!("Tor has not finished bootstrapping..."), + ErrorKind::Tor, + ))) + .unwrap_or_default(); + } + _ => (), + } + } + Ok(()) + }; + + let (mut connection, mut logs) = tokio::select! { + res = bootstrap => res?, + res = pre_handler => return res, + }; + + let hck_key = TorSecretKeyV3::generate(); + connection + .add_onion_v3( + &hck_key, + false, + false, + false, + None, + &mut [(80, SocketAddr::from(([127, 0, 0, 1], 80)))].iter(), + ) + .await?; + + for (key, service) in &*services { + let key = TorSecretKeyV3::from(*key); + let bindings = service + .iter() + .flat_map(|(ext, int)| { + int.iter() + .find(|(_, rc)| rc.strong_count() > 0) + .map(|(addr, _)| (*ext, SocketAddr::from(*addr))) + }) + .collect::>(); + connection + .add_onion_v3(&key, false, false, false, None, &mut bindings.iter()) + .await?; } - #[instrument(skip_all)] - async fn list_services(&mut self) -> Result, Error> { - self.connection - .get_info("onions/current") - .await? - .lines() - .map(|l| l.trim()) - .filter(|l| !l.is_empty()) - .map(|l| l.parse().with_kind(ErrorKind::Tor)) - .collect() + let handler = async { + while let Some(command) = recv.recv().await { + match command { + TorCommand::AddOnion { + key, + external, + target, + reply, + } => { + let mut rm_res = Ok(()); + let onion_base = key + .public() + .get_onion_address() + .get_address_without_dot_onion(); + let mut service = if let Some(service) = services.remove(&key.as_bytes()) { + rm_res = connection.del_onion(&onion_base).await; + service + } else { + BTreeMap::new() + }; + let mut binding = service.remove(&external).unwrap_or_default(); + let rc = if let Some(rc) = + Weak::upgrade(&binding.remove(&target).unwrap_or_default()) + { + rc + } else { + Arc::new(()) + }; + binding.insert(target, Arc::downgrade(&rc)); + service.insert(external, binding); + let bindings = service + .iter() + .flat_map(|(ext, int)| { + int.iter() + .find(|(_, rc)| rc.strong_count() > 0) + .map(|(addr, _)| (*ext, SocketAddr::from(*addr))) + }) + .collect::>(); + services.insert(key.as_bytes(), service); + reply.send(rc).unwrap_or_default(); + rm_res?; + connection + .add_onion_v3(&key, false, false, false, None, &mut bindings.iter()) + .await?; + } + TorCommand::GC { key, external } => { + for key in if key.is_some() { + itertools::Either::Left(key.into_iter().map(|k| k.as_bytes())) + } else { + itertools::Either::Right(services.keys().cloned().collect_vec().into_iter()) + } { + let key = TorSecretKeyV3::from(key); + let onion_base = key + .public() + .get_onion_address() + .get_address_without_dot_onion(); + if let Some(mut service) = services.remove(&key.as_bytes()) { + for external in if external.is_some() { + itertools::Either::Left(external.into_iter()) + } else { + itertools::Either::Right( + service.keys().copied().collect_vec().into_iter(), + ) + } { + if let Some(mut binding) = service.remove(&external) { + binding = binding + .into_iter() + .filter(|(_, rc)| rc.strong_count() > 0) + .collect(); + if !binding.is_empty() { + service.insert(external, binding); + } + } + } + let rm_res = connection.del_onion(&onion_base).await; + if !service.is_empty() { + let bindings = service + .iter() + .flat_map(|(ext, int)| { + int.iter() + .find(|(_, rc)| rc.strong_count() > 0) + .map(|(addr, _)| (*ext, SocketAddr::from(*addr))) + }) + .collect::>(); + services.insert(key.as_bytes(), service); + rm_res?; + connection + .add_onion_v3( + &key, + false, + false, + false, + None, + &mut bindings.iter(), + ) + .await?; + } else { + rm_res?; + } + } + } + } + TorCommand::GetInfo { query, reply } => { + reply + .send(connection.get_info(&query).await.with_kind(ErrorKind::Tor)) + .unwrap_or_default(); + } + TorCommand::Reset { + wipe_state, + context, + } => { + if wipe_state { + Command::new("systemctl") + .arg("stop") + .arg("tor") + .invoke(ErrorKind::Tor) + .await?; + tokio::fs::remove_dir_all("/var/lib/tor").await?; + } + return Err(context); + } + } + } + Ok(()) + }; + let log_parser = async { + while let Some(log) = logs.try_next().await? { + for (regex, severity) in &*LOG_REGEXES { + if regex.is_match(&log.message) { + let (check, wipe_state) = match severity { + ErrorLogSeverity::Fatal { wipe_state } => (false, *wipe_state), + ErrorLogSeverity::Unknown { wipe_state } => (true, *wipe_state), + }; + if !check + || tokio::time::timeout( + Duration::from_secs(30), + tokio_socks::tcp::Socks5Stream::connect( + tor_socks, + (hck_key.public().get_onion_address().to_string(), 80), + ), + ) + .await + .map_err(|e| tracing::warn!("Tor is confirmed to be down: {e}")) + .and_then(|a| { + a.map_err(|e| tracing::warn!("Tor is confirmed to be down: {e}")) + }) + .is_err() + { + if wipe_state { + Command::new("systemctl") + .arg("stop") + .arg("tor") + .invoke(ErrorKind::Tor) + .await?; + tokio::fs::remove_dir_all("/var/lib/tor").await?; + } + return Err(Error::new(eyre!("{}", log.message), ErrorKind::Tor)); + } + } + } + } + Err(Error::new(eyre!("Log stream terminated"), ErrorKind::Tor)) + }; + + tokio::select! { + res = handler => res?, + res = log_parser => res?, + } + + Ok(()) +} + +struct TorControl { + _thread: NonDetachingJoinHandle<()>, + send: mpsc::UnboundedSender, +} +impl TorControl { + pub fn new(tor_control: SocketAddr, tor_socks: SocketAddr) -> Self { + let (send, mut recv) = mpsc::unbounded_channel(); + Self { + _thread: tokio::spawn(async move { + let mut services = BTreeMap::new(); + while let Err(e) = torctl(tor_control, tor_socks, &mut recv, &mut services).await { + tracing::error!("{e}: Restarting tor"); + tracing::debug!("{e:?}"); + } + tracing::info!("TorControl is shut down.") + }) + .into(), + send, + } } } diff --git a/system-images/compat/Cargo.lock b/system-images/compat/Cargo.lock index 924abcd5e..2ad4711bc 100644 --- a/system-images/compat/Cargo.lock +++ b/system-images/compat/Cargo.lock @@ -1257,6 +1257,7 @@ dependencies = [ "thiserror", "tokio", "tokio-rustls", + "tokio-socks", "tokio-stream", "tokio-tar", "tokio-tungstenite",