diff --git a/appmgr/.gitignore b/appmgr/.gitignore index 7d6402aed..d4898965a 100644 --- a/appmgr/.gitignore +++ b/appmgr/.gitignore @@ -3,4 +3,8 @@ .DS_Store .vscode secrets.db -*.s9pk \ No newline at end of file +*.s9pk +*.sqlite3 +.env +.editorconfig +proptest-regressions/* diff --git a/appmgr/Cargo.lock b/appmgr/Cargo.lock index d54d0961c..8cd146e3d 100644 --- a/appmgr/Cargo.lock +++ b/appmgr/Cargo.lock @@ -2313,6 +2313,7 @@ dependencies = [ "time 0.2.27", "tokio 1.10.1", "tokio-native-tls", + "tokio-socks", "url", "wasm-bindgen", "wasm-bindgen-futures", @@ -3260,6 +3261,18 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-socks" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51165dfa029d2a65969413a6cc96f354b86b464498702f174a4efa13608fd8c0" +dependencies = [ + "either", + "futures-util", + "thiserror", + "tokio 1.9.0", +] + [[package]] name = "tokio-stream" version = "0.1.7" diff --git a/appmgr/Cargo.toml b/appmgr/Cargo.toml index 39d855dac..3a4e951c6 100644 --- a/appmgr/Cargo.toml +++ b/appmgr/Cargo.toml @@ -81,7 +81,7 @@ proptest = "1.0.0" proptest-derive = "0.3.0" rand = "0.7.3" regex = "1.4.2" -reqwest = { version = "0.11.2", features = ["stream", "json"] } +reqwest = { version = "0.11.2", features = ["stream", "json", "socks"] } reqwest_cookie_store = "0.2.0" rpassword = "5.0.0" rpc-toolkit = { version = "*", path = "../../rpc-toolkit/rpc-toolkit" } diff --git a/appmgr/src/bin/embassyd.rs b/appmgr/src/bin/embassyd.rs index 72b609ac6..89a95552c 100644 --- a/appmgr/src/bin/embassyd.rs +++ b/appmgr/src/bin/embassyd.rs @@ -66,6 +66,8 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> { } }); + let tor_health_check_task = + embassy::daemon::tor_health_check::tor_health_check_daemon(&rpc_ctx.net_controller.tor); let ws_ctx = rpc_ctx.clone(); let ws_server = { let builder = Server::bind(&ws_ctx.bind_ws); @@ -136,6 +138,7 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> { e.context("Health Check daemon panicked!"), ErrorKind::Unknown )), + futures::FutureExt::map(tor_health_check_task, Ok) )?; Ok(()) } diff --git a/appmgr/src/context/rpc.rs b/appmgr/src/context/rpc.rs index c706ffb63..001c49861 100644 --- a/appmgr/src/context/rpc.rs +++ b/appmgr/src/context/rpc.rs @@ -82,6 +82,8 @@ impl RpcContext { let docker = Docker::connect_with_unix_defaults()?; let net_controller = Arc::new( NetController::init( + ([127, 0, 0, 1], 80).into(), + todo!("Grab Key from Database, Generate if it doesn't exist"), base.tor_control .unwrap_or(SocketAddr::from(([127, 0, 0, 1], 9051))), ) diff --git a/appmgr/src/daemon/mod.rs b/appmgr/src/daemon/mod.rs new file mode 100644 index 000000000..bf47aee65 --- /dev/null +++ b/appmgr/src/daemon/mod.rs @@ -0,0 +1 @@ +pub mod tor_health_check; diff --git a/appmgr/src/daemon/tor_health_check.rs b/appmgr/src/daemon/tor_health_check.rs new file mode 100644 index 000000000..ae6a384b8 --- /dev/null +++ b/appmgr/src/daemon/tor_health_check.rs @@ -0,0 +1,52 @@ +use std::time::Duration; + +use serde_json::json; + +use crate::net::tor::TorController; + +lazy_static::lazy_static! { + static ref PROXY: reqwest::Proxy = reqwest::Proxy::http("socks5h://localhost:9050").expect("PROXY"); + static ref CLIENT: reqwest::Client = reqwest::Client::builder().proxy(PROXY.clone()).build().expect("CLIENT"); +} + +pub async fn tor_health_check_daemon(tor_controller: &TorController) { + loop { + // call out to tor address + let onion = tor_controller.embassyd_onion().await; + let result = CLIENT + .post(format!("http://{}/rpc/v1", onion)) + .body( + json!({ + "jsonrpc": "2.0", + "method": "echo", + "params": [{ "message": "Follow the orange rabbit" }], + }) + .to_string() + .into_bytes(), + ) + .send() + .await; + match result { + // if success, do nothing + Ok(response) => {} + // if failure, disconnect tor control port, and restart tor controller + Err(e) => { + log::error!("Unable to reach self over tor: {}", e); + loop { + match tor_controller.replace().await { + Ok(restarted) => { + if restarted { + log::error!("Tor has been recently restarted, refusing to restart"); + } + break; + } + Err(e) => { + log::error!("Unable to restart tor: {}", e); + } + } + } + } + } + tokio::time::sleep(Duration::from_secs(300)).await; + } +} diff --git a/appmgr/src/lib.rs b/appmgr/src/lib.rs index 379cccdec..5799240e4 100644 --- a/appmgr/src/lib.rs +++ b/appmgr/src/lib.rs @@ -22,6 +22,7 @@ pub mod backup; pub mod config; pub mod context; pub mod control; +pub mod daemon; pub mod db; pub mod dependencies; pub mod developer; diff --git a/appmgr/src/net/mod.rs b/appmgr/src/net/mod.rs index 85279740e..785b78a06 100644 --- a/appmgr/src/net/mod.rs +++ b/appmgr/src/net/mod.rs @@ -6,6 +6,7 @@ use self::interface::{Interface, InterfaceId}; #[cfg(feature = "avahi")] use self::mdns::MdnsController; use self::tor::TorController; +use crate::net::interface::TorConfig; use crate::s9pk::manifest::PackageId; use crate::{Error, ResultExt}; @@ -16,15 +17,19 @@ pub mod tor; pub mod wifi; pub struct NetController { - tor: TorController, + pub tor: TorController, #[cfg(feature = "avahi")] - mdns: MdnsController, + pub mdns: MdnsController, // nginx: NginxController, // TODO } impl NetController { - pub async fn init(tor_control: SocketAddr) -> Result { + pub async fn init( + embassyd_addr: SocketAddr, + embassyd_tor_key: TorSecretKeyV3, + tor_control: SocketAddr, + ) -> Result { Ok(Self { - tor: TorController::init(tor_control).await?, + tor: TorController::init(embassyd_addr, embassyd_tor_key, tor_control).await?, #[cfg(feature = "avahi")] mdns: MdnsController::init(), }) @@ -39,7 +44,15 @@ impl NetController { ip: Ipv4Addr, interfaces: I, ) -> Result<(), Error> { - let (tor_res, _) = tokio::join!(self.tor.add(pkg_id, ip, interfaces.clone()), { + let interfaces_tor = interfaces + .clone() + .into_iter() + .filter_map(|i| match i.1.tor_config.clone() { + None => None, + Some(cfg) => Some((i.0, cfg, i.2)), + }) + .collect::>(); + let (tor_res, _) = tokio::join!(self.tor.add(pkg_id, ip, interfaces_tor), { #[cfg(feature = "avahi")] let mdns_fut = self.mdns.add( pkg_id, diff --git a/appmgr/src/net/tor.rs b/appmgr/src/net/tor.rs index 325eeba55..3294679bc 100644 --- a/appmgr/src/net/tor.rs +++ b/appmgr/src/net/tor.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::net::{Ipv4Addr, SocketAddr}; +use std::time::Duration; use anyhow::anyhow; use futures::future::BoxFuture; @@ -7,11 +8,11 @@ use futures::FutureExt; use tokio::net::TcpStream; use tokio::sync::Mutex; use torut::control::{AsyncEvent, AuthenticatedConn, ConnError}; -use torut::onion::TorSecretKeyV3; +use torut::onion::{OnionAddressV3, TorSecretKey, TorSecretKeyV3}; -use super::interface::{Interface, InterfaceId, TorConfig}; +use super::interface::{InterfaceId, TorConfig}; use crate::s9pk::manifest::PackageId; -use crate::{Error, ResultExt as _}; +use crate::{Error, ErrorKind, ResultExt as _}; fn event_handler(event: AsyncEvent<'static>) -> BoxFuture<'static, Result<(), ConnError>> { async move { Ok(()) }.boxed() @@ -19,16 +20,17 @@ fn event_handler(event: AsyncEvent<'static>) -> BoxFuture<'static, Result<(), Co pub struct TorController(Mutex); impl TorController { - pub async fn init(tor_control: SocketAddr) -> Result { + pub async fn init( + embassyd_addr: SocketAddr, + embassyd_tor_key: TorSecretKeyV3, + tor_control: SocketAddr, + ) -> Result { Ok(TorController(Mutex::new( - TorControllerInner::init(tor_control).await?, + TorControllerInner::init(embassyd_addr, embassyd_tor_key, tor_control).await?, ))) } - pub async fn add< - 'a, - I: IntoIterator + Clone, - >( + pub async fn add + Clone>( &self, pkg_id: &PackageId, ip: Ipv4Addr, @@ -44,6 +46,14 @@ impl TorController { ) -> Result<(), Error> { self.0.lock().await.remove(pkg_id, interfaces).await } + + pub async fn replace(&self) -> Result { + self.0.lock().await.replace().await + } + + pub async fn embassyd_onion(&self) -> OnionAddressV3 { + self.0.lock().await.embassyd_onion() + } } type AuthenticatedConnection = AuthenticatedConn< @@ -58,45 +68,53 @@ struct HiddenServiceConfig { } pub struct TorControllerInner { - connection: AuthenticatedConnection, - services: HashMap<(PackageId, InterfaceId), TorSecretKeyV3>, + embassyd_addr: SocketAddr, + embassyd_tor_key: TorSecretKeyV3, + control_addr: SocketAddr, + connection: Option, + services: HashMap<(PackageId, InterfaceId), (TorSecretKeyV3, TorConfig, Ipv4Addr)>, } impl TorControllerInner { - async fn add<'a, I: IntoIterator>( + async fn add<'a, I: IntoIterator>( &mut self, pkg_id: &PackageId, ip: Ipv4Addr, interfaces: I, ) -> Result<(), Error> { - for (interface_id, interface, key) in interfaces { + for (interface_id, tor_cfg, key) in interfaces { let id = (pkg_id.clone(), interface_id); match self.services.get(&id) { - Some(k) if k != &key => { + Some(k) if k.0 != key => { self.remove(pkg_id, std::iter::once(id.1.clone())).await?; } - Some(_) => return Ok(()), + Some(_) => continue, None => (), } - if let Some(tor_cfg) = &interface.tor_config { - self.connection - .add_onion_v3( - &key, - false, - false, - false, - None, - &mut tor_cfg - .port_mapping - .iter() - .map(|(external, internal)| { - (external.0, SocketAddr::from((ip, internal.0))) - }) - .collect::>() - .iter(), + self.connection + .as_mut() + .ok_or_else(|| { + Error::new( + anyhow!("Missing Tor Control Connection"), + ErrorKind::Unknown, ) - .await?; - } - self.services.insert(id, key); + })? + .add_onion_v3( + &key, + false, + false, + false, + None, + &mut tor_cfg + .port_mapping + .iter() + .map(|(external, internal)| { + (external.0, SocketAddr::from((ip, internal.0))) + }) + .collect::>() + .iter(), + ) + .await?; + self.services.insert(id, (key, tor_cfg, ip)); } Ok(()) } @@ -107,8 +125,15 @@ impl TorControllerInner { interfaces: I, ) -> Result<(), Error> { for interface_id in interfaces { - if let Some(key) = self.services.remove(&(pkg_id.clone(), interface_id)) { + if let Some((key, _cfg, _ip)) = self.services.remove(&(pkg_id.clone(), interface_id)) { self.connection + .as_mut() + .ok_or_else(|| { + Error::new( + anyhow!("Missing Tor Control Connection"), + ErrorKind::Unknown, + ) + })? .del_onion( &key.public() .get_onion_address() @@ -120,7 +145,11 @@ impl TorControllerInner { Ok(()) } - async fn init(tor_control: SocketAddr) -> Result { + async fn init( + embassyd_addr: SocketAddr, + embassyd_tor_key: TorSecretKeyV3, + tor_control: SocketAddr, + ) -> Result { let mut conn = torut::control::UnauthenticatedConn::new( TcpStream::connect(tor_control).await?, // TODO ); @@ -133,10 +162,107 @@ impl TorControllerInner { conn.authenticate(&auth).await?; let mut connection: AuthenticatedConnection = conn.into_authenticated().await; connection.set_async_event_handler(Some(event_handler)); - Ok(TorControllerInner { - connection, + + let mut controller = TorControllerInner { + embassyd_addr, + embassyd_tor_key, + control_addr: tor_control, + connection: Some(connection), services: HashMap::new(), - }) + }; + controller.add_embassyd_onion().await?; + Ok(controller) + } + + async fn add_embassyd_onion(&mut self) -> Result<(), Error> { + self.connection + .as_mut() + .expect("Tor Connection is None") + .add_onion_v3( + &self.embassyd_tor_key, + false, + false, + false, + None, + &mut std::iter::once(&(self.embassyd_addr.port(), self.embassyd_addr)), + ) + .await?; + Ok(()) + } + + async fn replace(&mut self) -> Result { + let connection = self.connection.take(); + let uptime = if let Some(mut c) = connection { + // this should be unreachable because the only time when this should be none is for the duration of tor's + // restart lower down in this method, which is held behind a Mutex + let uptime = c.get_info("uptime").await?.parse::()?; + // we never want to restart the tor daemon if it hasn't been up for at least a half hour + if uptime < 1800 { + return Ok(false); + } + // when connection closes below, tor daemon is restarted + c.take_ownership().await?; + // this should close the connection + drop(c); + Some(uptime) + } else { + None + }; + + // attempt to reconnect to the control socket, not clear how long this should take + let mut new_connection: AuthenticatedConnection; + loop { + match TcpStream::connect(self.control_addr).await { + Ok(stream) => { + let mut new_conn = torut::control::UnauthenticatedConn::new(stream); + let auth = new_conn + .load_protocol_info() + .await? + .make_auth_data()? + .ok_or_else(|| anyhow!("Cookie Auth Not Available")) + .with_kind(crate::ErrorKind::Tor)?; + new_conn.authenticate(&auth).await?; + new_connection = new_conn.into_authenticated().await; + let uptime_new = new_connection.get_info("uptime").await?.parse::()?; + // if the new uptime exceeds the one we got at the beginning, it's the same tor daemon, do not proceed + match uptime { + Some(uptime) if uptime_new < uptime => { + new_connection.set_async_event_handler(Some(event_handler)); + break; + } + _ => (), + } + } + Err(e) => { + log::info!("Failed to reconnect to tor control socket: {}", e); + } + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + // replace the connection object here on the new copy of the tor daemon + self.connection.replace(new_connection); + + // swap empty map for owned old service map + let old_services = std::mem::replace(&mut self.services, HashMap::new()); + + // re add all of the services on the new control socket + for ((package_id, interface_id), (tor_key, tor_cfg, ipv4)) in old_services { + self.add( + &package_id, + ipv4, + std::iter::once((interface_id, tor_cfg, tor_key)), + ) + .await?; + } + + // add embassyd hidden service again + self.add_embassyd_onion().await?; + + Ok(true) + } + + fn embassyd_onion(&self) -> OnionAddressV3 { + self.embassyd_tor_key.public().get_onion_address() } } diff --git a/compat/Cargo.lock b/compat/Cargo.lock index fda677b94..9def56f4c 100644 --- a/compat/Cargo.lock +++ b/compat/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "ahash" version = "0.7.4" @@ -675,6 +677,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "divrem" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69dde51e8fef5e12c1d65e0929b03d66e4c0c18282bc30ed2ca050ad6f44dd82" + [[package]] name = "dotenv" version = "0.15.0" @@ -734,13 +742,17 @@ dependencies = [ "clap", "cookie_store 0.15.0", "digest 0.9.0", + "divrem", "ed25519-dalek", "emver", + "fd-lock-rs", "futures", "git-version", + "hex", "http", "hyper-ws-listener", "indexmap", + "isocountry", "itertools 0.10.1", "jsonpath_lib", "lazy_static", @@ -752,6 +764,8 @@ dependencies = [ "patch-db", "pin-project", "prettytable-rs", + "proptest", + "proptest-derive", "rand 0.7.3", "regex", "reqwest", @@ -1301,6 +1315,16 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9" +[[package]] +name = "isocountry" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ea1dc4bf0fb4904ba83ffdb98af3d9c325274e92e6e295e4151e86c96363e04" +dependencies = [ + "serde", + "thiserror", +] + [[package]] name = "itertools" version = "0.8.2" @@ -1910,6 +1934,37 @@ dependencies = [ "unicode-xid 0.2.2", ] +[[package]] +name = "proptest" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e0d9cc07f18492d879586c92b485def06bc850da3118075cd45d50e9c95b0e5" +dependencies = [ + "bit-set", + "bitflags", + "byteorder", + "lazy_static", + "num-traits", + "quick-error 2.0.1", + "rand 0.8.4", + "rand_chacha 0.3.1", + "rand_xorshift", + "regex-syntax", + "rusty-fork", + "tempfile", +] + +[[package]] +name = "proptest-derive" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90b46295382dc76166cb7cf2bb4a97952464e4b7ed5a43e6cd34e1fec3349ddc" +dependencies = [ + "proc-macro2 0.4.30", + "quote 0.6.13", + "syn 0.15.44", +] + [[package]] name = "psl-types" version = "2.0.7" @@ -1938,6 +1993,18 @@ dependencies = [ "psl-types", ] +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + +[[package]] +name = "quick-error" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" + [[package]] name = "quote" version = "0.6.13" @@ -2051,6 +2118,15 @@ dependencies = [ "rand_core 0.6.3", ] +[[package]] +name = "rand_xorshift" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25bf25ec5ae4a3f1b92f929810509a2f53d7dca2f50b794ff57e3face536c8f" +dependencies = [ + "rand_core 0.6.3", +] + [[package]] name = "redox_syscall" version = "0.1.57" @@ -2272,6 +2348,18 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61b3909d758bb75c79f23d4736fac9433868679d3ad2ea7a61e3c25cfda9a088" +[[package]] +name = "rusty-fork" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb3dcc6e454c328bb824492db107ab7c0ae8fcffe4ad210136ef014458c1bc4f" +dependencies = [ + "fnv", + "quick-error 1.2.3", + "tempfile", + "wait-timeout", +] + [[package]] name = "ryu" version = "1.0.5" @@ -3318,6 +3406,15 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" +[[package]] +name = "wait-timeout" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f200f5b12eb75f8c1ed65abd4b2db8a6e1b138a20de009dacee265a2498f3f6" +dependencies = [ + "libc", +] + [[package]] name = "want" version = "0.3.0"