mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-03-26 02:11:53 +00:00
remove tor health daemon (#2101)
This commit is contained in:
@@ -1,6 +1,5 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use color_eyre::eyre::eyre;
|
||||
use embassy::context::{DiagnosticContext, RpcContext};
|
||||
@@ -10,14 +9,11 @@ use embassy::net::mdns::MdnsController;
|
||||
use embassy::net::net_controller::NetController;
|
||||
use embassy::net::net_utils::ResourceFqdn;
|
||||
use embassy::net::static_server::diag_ui_file_router;
|
||||
use embassy::net::tor::tor_health_check;
|
||||
use embassy::shutdown::Shutdown;
|
||||
use embassy::system::launch_metrics_task;
|
||||
use embassy::util::daemon;
|
||||
use embassy::util::logger::EmbassyLogger;
|
||||
use embassy::{Error, ErrorKind, ResultExt};
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use reqwest::{Client, Proxy};
|
||||
use tokio::signal::unix::signal;
|
||||
use tracing::instrument;
|
||||
|
||||
@@ -80,44 +76,17 @@ async fn inner_main(cfg_path: Option<PathBuf>) -> Result<Option<Shutdown>, Error
|
||||
.await
|
||||
});
|
||||
|
||||
let tor_health_ctx = rpc_ctx.clone();
|
||||
let tor_client = Client::builder()
|
||||
.proxy(
|
||||
Proxy::http(format!(
|
||||
"socks5h://{}:{}",
|
||||
rpc_ctx.tor_socks.ip(),
|
||||
rpc_ctx.tor_socks.port()
|
||||
))
|
||||
.with_kind(crate::ErrorKind::Network)?,
|
||||
)
|
||||
.build()
|
||||
.with_kind(crate::ErrorKind::Network)?;
|
||||
let tor_health_daemon = daemon(
|
||||
move || {
|
||||
let ctx = tor_health_ctx.clone();
|
||||
let client = tor_client.clone();
|
||||
async move { tor_health_check(&client, &ctx.net_controller.tor).await }
|
||||
},
|
||||
Duration::from_secs(300),
|
||||
rpc_ctx.shutdown.subscribe(),
|
||||
);
|
||||
|
||||
embassy::sound::CHIME.play().await?;
|
||||
|
||||
futures::try_join!(
|
||||
metrics_task
|
||||
.map_err(|e| Error::new(
|
||||
metrics_task
|
||||
.map_err(|e| {
|
||||
Error::new(
|
||||
eyre!("{}", e).wrap_err("Metrics daemon panicked!"),
|
||||
ErrorKind::Unknown
|
||||
))
|
||||
.map_ok(|_| tracing::debug!("Metrics daemon Shutdown")),
|
||||
tor_health_daemon
|
||||
.map_err(|e| Error::new(
|
||||
e.wrap_err("Tor Health daemon panicked!"),
|
||||
ErrorKind::Unknown
|
||||
))
|
||||
.map_ok(|_| tracing::debug!("Tor Health daemon Shutdown")),
|
||||
)?;
|
||||
ErrorKind::Unknown,
|
||||
)
|
||||
})
|
||||
.map_ok(|_| tracing::debug!("Metrics daemon Shutdown"))
|
||||
.await?;
|
||||
|
||||
let mut shutdown = shutdown_recv
|
||||
.recv()
|
||||
|
||||
@@ -1,14 +1,11 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::net::{Ipv4Addr, SocketAddr};
|
||||
use std::time::Duration;
|
||||
|
||||
use clap::ArgMatches;
|
||||
use color_eyre::eyre::eyre;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::FutureExt;
|
||||
use reqwest::Client;
|
||||
use rpc_toolkit::command;
|
||||
use serde_json::json;
|
||||
use sqlx::{Executor, Postgres};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::Mutex;
|
||||
@@ -109,10 +106,6 @@ impl TorController {
|
||||
self.0.lock().await.remove(pkg_id, interfaces).await
|
||||
}
|
||||
|
||||
pub async fn replace(&self) -> Result<bool, Error> {
|
||||
self.0.lock().await.replace().await
|
||||
}
|
||||
|
||||
pub async fn embassyd_tor_key(&self) -> TorSecretKeyV3 {
|
||||
self.0.lock().await.embassyd_tor_key.clone()
|
||||
}
|
||||
@@ -266,80 +259,6 @@ impl TorControllerInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
async fn replace(&mut self) -> Result<bool, Error> {
|
||||
let connection = self.connection.take();
|
||||
let uptime = if let Some(mut c) = connection {
|
||||
// this should be unreachable because the only time when this should be none is for the duration of tor's
|
||||
// restart lower down in this method, which is held behind a Mutex
|
||||
let uptime = c.get_info("uptime").await?.parse::<u64>()?;
|
||||
// we never want to restart the tor daemon if it hasn't been up for at least a half hour
|
||||
if uptime < 1800 {
|
||||
self.connection = Some(c); // put it back
|
||||
return Ok(false);
|
||||
}
|
||||
// when connection closes below, tor daemon is restarted
|
||||
c.take_ownership().await?;
|
||||
// this should close the connection
|
||||
drop(c);
|
||||
Some(uptime)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// attempt to reconnect to the control socket, not clear how long this should take
|
||||
let mut new_connection: AuthenticatedConnection;
|
||||
loop {
|
||||
match TcpStream::connect(self.control_addr).await {
|
||||
Ok(stream) => {
|
||||
let mut new_conn = torut::control::UnauthenticatedConn::new(stream);
|
||||
let auth = new_conn
|
||||
.load_protocol_info()
|
||||
.await?
|
||||
.make_auth_data()?
|
||||
.ok_or_else(|| eyre!("Cookie Auth Not Available"))
|
||||
.with_kind(crate::ErrorKind::Tor)?;
|
||||
new_conn.authenticate(&auth).await?;
|
||||
new_connection = new_conn.into_authenticated().await;
|
||||
let uptime_new = new_connection.get_info("uptime").await?.parse::<u64>()?;
|
||||
// if the new uptime exceeds the one we got at the beginning, it's the same tor daemon, do not proceed
|
||||
match uptime {
|
||||
Some(uptime) if uptime_new > uptime => (),
|
||||
_ => {
|
||||
new_connection.set_async_event_handler(Some(event_handler));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::info!("Failed to reconnect to tor control socket: {}", e);
|
||||
tracing::info!("Trying again in one second");
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
// replace the connection object here on the new copy of the tor daemon
|
||||
self.connection.replace(new_connection);
|
||||
|
||||
// swap empty map for owned old service map
|
||||
let old_services = std::mem::take(&mut self.services);
|
||||
|
||||
// re add all of the services on the new control socket
|
||||
for ((package_id, interface_id), (tor_key, tor_cfg, ipv4)) in old_services {
|
||||
self.add(
|
||||
&package_id,
|
||||
ipv4,
|
||||
std::iter::once((interface_id, tor_cfg, tor_key)),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// add embassyd hidden service again
|
||||
self.add_embassyd_onion().await?;
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn embassyd_onion(&self) -> OnionAddressV3 {
|
||||
self.embassyd_tor_key.public().get_onion_address()
|
||||
}
|
||||
@@ -359,54 +278,6 @@ impl TorControllerInner {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn tor_health_check(client: &Client, tor_controller: &TorController) {
|
||||
tracing::debug!("Attempting to self-check tor address");
|
||||
let onion_addr = tor_controller.embassyd_onion().await;
|
||||
let result = client
|
||||
.post(format!("http://{}/rpc/v1", onion_addr))
|
||||
.body(
|
||||
json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "echo",
|
||||
"params": { "message": "Follow the orange rabbit" },
|
||||
})
|
||||
.to_string()
|
||||
.into_bytes(),
|
||||
)
|
||||
.send()
|
||||
.await;
|
||||
if let Err(e) = result {
|
||||
let mut num_attempt = 1;
|
||||
tracing::error!("Unable to reach self over tor, we will retry now...");
|
||||
tracing::error!("The first TOR error: {}", e);
|
||||
|
||||
loop {
|
||||
tracing::debug!("TOR Reconnecting retry number: {num_attempt}");
|
||||
|
||||
match tor_controller.replace().await {
|
||||
Ok(restarted) => {
|
||||
if restarted {
|
||||
tracing::error!("Tor has been recently restarted, refusing to restart again right now...");
|
||||
}
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("TOR retry error: {}", e);
|
||||
tracing::error!("Unable to restart tor on attempt {num_attempt}...Retrying");
|
||||
|
||||
num_attempt += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing::debug!(
|
||||
"Successfully verified main tor address liveness at {}",
|
||||
onion_addr
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test() {
|
||||
let mut conn = torut::control::UnauthenticatedConn::new(
|
||||
|
||||
Reference in New Issue
Block a user