From 8163db7ac3fdb14aa72a92f6bf64ce5be2ceb73d Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Fri, 29 Aug 2025 11:19:30 -0600 Subject: [PATCH] socks5 proxy working --- core/Cargo.lock | 27 +- core/startos/Cargo.toml | 2 +- core/startos/src/context/rpc.rs | 23 +- core/startos/src/init.rs | 18 +- core/startos/src/net/host/binding.rs | 14 +- core/startos/src/net/mod.rs | 1 + core/startos/src/net/net_controller.rs | 21 +- core/startos/src/net/socks.rs | 169 +++++++++ core/startos/src/net/tor.rs | 348 +++++++++++++++--- .../src/service/effects/subcontainer/mod.rs | 6 +- core/startos/src/util/io.rs | 7 +- 11 files changed, 543 insertions(+), 93 deletions(-) create mode 100644 core/startos/src/net/socks.rs diff --git a/core/Cargo.lock b/core/Cargo.lock index 27335bf8c..fc37c5d09 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -7033,6 +7033,19 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "socks5-impl" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "214a7c0af583e8f7abbd3394f235b4df7cc65db9b98ef80506cac7c7eefc0c75" +dependencies = [ + "async-trait", + "bytes", + "percent-encoding", + "thiserror 2.0.16", + "tokio", +] + [[package]] name = "solana-nohash-hasher" version = "0.2.1" @@ -7366,6 +7379,7 @@ dependencies = [ "signal-hook", "simple-logging", "socket2 0.6.0", + "socks5-impl", "sqlx", "sscanf", "ssh-key", @@ -7375,7 +7389,6 @@ dependencies = [ "thiserror 2.0.16", "tokio", "tokio-rustls 0.26.2", - "tokio-socks", "tokio-stream", "tokio-tar", "tokio-tungstenite", @@ -7840,18 +7853,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-socks" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d4770b8024672c1101b3f6733eab95b18007dbe0847a8afe341fcf79e06043f" -dependencies = [ - "either", - "futures-util", - "thiserror 1.0.69", - "tokio", -] - [[package]] name = "tokio-stream" version = "0.1.17" diff --git a/core/startos/Cargo.toml b/core/startos/Cargo.toml index c07729d86..48d60b793 100644 --- a/core/startos/Cargo.toml +++ b/core/startos/Cargo.toml @@ -212,6 +212,7 @@ shell-words = "1" signal-hook = "0.3.17" simple-logging = "2.0.2" socket2 = { version = "0.6.0", features = ["all"] } +socks5-impl = { version = "0.7.2", features = ["server"] } sqlx = { version = "0.8.6", features = [ "runtime-tokio-rustls", "postgres", @@ -224,7 +225,6 @@ thiserror = "2.0.12" textwrap = "0.16.1" tokio = { version = "1.38.1", features = ["full"] } tokio-rustls = "0.26.0" -tokio-socks = "0.5.1" tokio-stream = { version = "0.1.14", features = ["io-util", "sync", "net"] } tokio-tar = { git = "https://github.com/dr-bonez/tokio-tar.git" } tokio-tungstenite = { version = "0.26.2", features = ["native-tls", "url"] } diff --git a/core/startos/src/context/rpc.rs b/core/startos/src/context/rpc.rs index 6bf1d7785..5830b7950 100644 --- a/core/startos/src/context/rpc.rs +++ b/core/startos/src/context/rpc.rs @@ -4,8 +4,8 @@ use std::future::Future; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::ops::Deref; use std::path::{Path, PathBuf}; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::time::Duration; use chrono::{TimeDelta, Utc}; @@ -18,35 +18,36 @@ use models::{ActionId, PackageId}; use reqwest::{Client, Proxy}; use rpc_toolkit::yajrc::RpcError; use rpc_toolkit::{CallRemote, Context, Empty}; -use tokio::sync::{RwLock, broadcast, oneshot, watch}; +use tokio::sync::{broadcast, oneshot, watch, RwLock}; use tokio::time::Instant; use tracing::instrument; use super::setup::CURRENT_SECRET; -use crate::DATA_DIR; use crate::account::AccountInfo; use crate::auth::Sessions; use crate::context::config::ServerConfig; -use crate::db::model::Database; use crate::db::model::package::TaskSeverity; +use crate::db::model::Database; use crate::disk::OsPartitionInfo; -use crate::init::{InitResult, check_time_is_synchronized}; +use crate::init::{check_time_is_synchronized, InitResult}; use crate::install::PKG_ARCHIVE_DIR; use crate::lxc::LxcManager; use crate::net::net_controller::{NetController, NetService}; +use crate::net::socks::DEFAULT_SOCKS_LISTEN; use crate::net::utils::{find_eth_iface, find_wifi_iface}; use crate::net::web_server::{UpgradableListener, WebServerAcceptorSetter}; use crate::net::wifi::WpaCli; use crate::prelude::*; use crate::progress::{FullProgressTracker, PhaseProgressTrackerHandle}; use crate::rpc_continuations::{Guid, OpenAuthedContinuations, RpcContinuations}; -use crate::service::ServiceMap; use crate::service::action::update_tasks; use crate::service::effects::callbacks::ServiceCallbacks; +use crate::service::ServiceMap; use crate::shutdown::Shutdown; use crate::util::io::delete_file; use crate::util::lshw::LshwDevice; use crate::util::sync::{SyncMutex, Watch}; +use crate::{DATA_DIR, HOST_IP}; pub struct RpcContextSeed { is_closed: AtomicBool, @@ -131,12 +132,7 @@ impl RpcContext { run_migrations, }: InitRpcContextPhases, ) -> Result { - let socks_proxy = config - .socks_listen - .unwrap_or(SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(127, 0, 0, 1), - 9050, - ))); + let socks_proxy = config.socks_listen.unwrap_or(DEFAULT_SOCKS_LISTEN); let (shutdown, _) = tokio::sync::broadcast::channel(1); load_db.start(); @@ -158,7 +154,8 @@ impl RpcContext { { (net_ctrl, os_net_service) } else { - let net_ctrl = Arc::new(NetController::init(db.clone(), &account.hostname).await?); + let net_ctrl = + Arc::new(NetController::init(db.clone(), &account.hostname, socks_proxy).await?); webserver.try_upgrade(|a| net_ctrl.net_iface.watcher.upgrade_listener(a))?; let os_net_service = net_ctrl.os_bindings().await?; (net_ctrl, os_net_service) diff --git a/core/startos/src/init.rs b/core/startos/src/init.rs index e96131639..dbdb1349d 100644 --- a/core/startos/src/init.rs +++ b/core/startos/src/init.rs @@ -8,7 +8,7 @@ use const_format::formatcp; use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use models::ResultExt; -use rpc_toolkit::{Context, Empty, HandlerArgs, HandlerExt, ParentHandler, from_fn_async}; +use rpc_toolkit::{from_fn_async, Context, Empty, HandlerArgs, HandlerExt, ParentHandler}; use serde::{Deserialize, Serialize}; use tokio::process::Command; use tracing::instrument; @@ -17,12 +17,13 @@ use ts_rs::TS; use crate::account::AccountInfo; use crate::context::config::ServerConfig; use crate::context::{CliContext, InitContext, RpcContext}; -use crate::db::model::Database; use crate::db::model::public::ServerStatus; +use crate::db::model::Database; use crate::developer::OS_DEVELOPER_KEY_PATH; use crate::hostname::Hostname; use crate::middleware::auth::AuthContext; use crate::net::net_controller::{NetController, NetService}; +use crate::net::socks::DEFAULT_SOCKS_LISTEN; use crate::net::utils::find_wifi_iface; use crate::net::web_server::{UpgradableListener, WebServerAcceptorSetter}; use crate::prelude::*; @@ -33,10 +34,10 @@ use crate::rpc_continuations::{Guid, RpcContinuation}; use crate::s9pk::v2::pack::{CONTAINER_DATADIR, CONTAINER_TOOL}; use crate::ssh::SSH_DIR; use crate::system::{get_mem_info, sync_kiosk}; -use crate::util::io::{IOHook, open_file}; +use crate::util::io::{open_file, IOHook}; use crate::util::lshw::lshw; use crate::util::net::WebSocketExt; -use crate::util::{Invoke, cpupower}; +use crate::util::{cpupower, Invoke}; use crate::{Error, MAIN_DATA, PACKAGE_DATA}; pub const SYSTEM_REBUILD_PATH: &str = "/media/startos/config/system-rebuild"; @@ -202,7 +203,14 @@ pub async fn init( let account = AccountInfo::load(&peek)?; start_net.start(); - let net_ctrl = Arc::new(NetController::init(db.clone(), &account.hostname).await?); + let net_ctrl = Arc::new( + NetController::init( + db.clone(), + &account.hostname, + cfg.socks_listen.unwrap_or(DEFAULT_SOCKS_LISTEN), + ) + .await?, + ); webserver.try_upgrade(|a| net_ctrl.net_iface.watcher.upgrade_listener(a))?; let os_net_service = net_ctrl.os_bindings().await?; start_net.complete(); diff --git a/core/startos/src/net/host/binding.rs b/core/startos/src/net/host/binding.rs index a83ea48e1..d37a74dad 100644 --- a/core/startos/src/net/host/binding.rs +++ b/core/startos/src/net/host/binding.rs @@ -1,11 +1,11 @@ use std::collections::{BTreeMap, BTreeSet}; use std::str::FromStr; -use clap::Parser; use clap::builder::ValueParserFactory; +use clap::Parser; use imbl::OrdSet; use models::{FromStrParser, GatewayId, HostId}; -use rpc_toolkit::{Context, Empty, HandlerArgs, HandlerExt, ParentHandler, from_fn_async}; +use rpc_toolkit::{from_fn_async, Context, Empty, HandlerArgs, HandlerExt, ParentHandler}; use serde::{Deserialize, Serialize}; use ts_rs::TS; @@ -16,7 +16,7 @@ use crate::net::gateway::InterfaceFilter; use crate::net::host::HostApiKind; use crate::net::vhost::AlpnInfo; use crate::prelude::*; -use crate::util::serde::{HandlerExtSerde, display_serializable}; +use crate::util::serde::{display_serializable, HandlerExtSerde}; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, TS)] #[ts(export)] @@ -136,9 +136,9 @@ impl BindInfo { impl InterfaceFilter for NetInfo { fn filter(&self, id: &GatewayId, info: &NetworkInterfaceInfo) -> bool { if info.public() { - dbg!(self.public_enabled.contains(id)) + self.public_enabled.contains(id) } else { - dbg!(!self.private_disabled.contains(id)) + !self.private_disabled.contains(id) } } } @@ -169,8 +169,8 @@ pub struct AddSslOptions { pub alpn: Option, } -pub fn binding() --> ParentHandler { +pub fn binding( +) -> ParentHandler { ParentHandler::::new() .subcommand( "list", diff --git a/core/startos/src/net/mod.rs b/core/startos/src/net/mod.rs index 95a1b01d5..9d1f139a0 100644 --- a/core/startos/src/net/mod.rs +++ b/core/startos/src/net/mod.rs @@ -9,6 +9,7 @@ pub mod keys; pub mod mdns; pub mod net_controller; pub mod service_interface; +pub mod socks; pub mod ssl; pub mod static_server; pub mod tor; diff --git a/core/startos/src/net/net_controller.rs b/core/startos/src/net/net_controller.rs index 2a6173415..6ccf901dd 100644 --- a/core/startos/src/net/net_controller.rs +++ b/core/startos/src/net/net_controller.rs @@ -3,7 +3,7 @@ use std::net::{Ipv4Addr, SocketAddr}; use std::sync::{Arc, Weak}; use color_eyre::eyre::eyre; -use imbl::{OrdMap, vector}; +use imbl::{vector, OrdMap}; use imbl_value::InternedString; use ipnet::IpNet; use models::{HostId, OptionExt, PackageId}; @@ -11,9 +11,8 @@ use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::instrument; -use crate::HOST_IP; -use crate::db::model::Database; use crate::db::model::public::NetworkInterfaceInfo; +use crate::db::model::Database; use crate::error::ErrorCollection; use crate::hostname::Hostname; use crate::net::dns::DnsController; @@ -24,14 +23,16 @@ use crate::net::gateway::{ }; use crate::net::host::address::HostAddress; use crate::net::host::binding::{AddSslOptions, BindId, BindOptions}; -use crate::net::host::{Host, Hosts, host_for}; +use crate::net::host::{host_for, Host, Hosts}; use crate::net::service_interface::{HostnameInfo, IpHostname, OnionHostname}; +use crate::net::socks::SocksController; use crate::net::tor::{OnionAddress, TorController, TorSecretKey}; use crate::net::utils::ipv6_is_local; use crate::net::vhost::{AlpnInfo, TargetInfo, VHostController}; use crate::prelude::*; use crate::service::effects::callbacks::ServiceCallbacks; use crate::util::serde::MaybeUtf8String; +use crate::HOST_IP; pub struct NetController { pub(crate) db: TypedPatchDb, @@ -40,20 +41,28 @@ pub struct NetController { pub(crate) net_iface: Arc, pub(super) dns: DnsController, pub(super) forward: PortForwardController, + pub(super) socks: SocksController, pub(super) server_hostnames: Vec>, pub(crate) callbacks: Arc, } impl NetController { - pub async fn init(db: TypedPatchDb, hostname: &Hostname) -> Result { + pub async fn init( + db: TypedPatchDb, + hostname: &Hostname, + socks_listen: SocketAddr, + ) -> Result { let net_iface = Arc::new(NetworkInterfaceController::new(db.clone())); + let tor = TorController::new()?; + let socks = SocksController::new(socks_listen, tor.clone())?; Ok(Self { db: db.clone(), - tor: TorController::new().await?, + tor, vhost: VHostController::new(db.clone(), net_iface.clone()), dns: DnsController::init(db, &net_iface.watcher).await?, forward: PortForwardController::new(net_iface.watcher.subscribe()), net_iface, + socks, server_hostnames: vec![ // LAN IP None, diff --git a/core/startos/src/net/socks.rs b/core/startos/src/net/socks.rs new file mode 100644 index 000000000..871cfc87a --- /dev/null +++ b/core/startos/src/net/socks.rs @@ -0,0 +1,169 @@ +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; +use std::sync::Arc; + +use helpers::NonDetachingJoinHandle; +use socks5_impl::protocol::{Address, Reply}; +use socks5_impl::server::auth::NoAuth; +use socks5_impl::server::{AuthAdaptor, ClientConnection, Server}; +use tokio::net::{TcpListener, TcpStream}; + +use crate::net::tor::TorController; +use crate::prelude::*; +use crate::util::actor::background::BackgroundJobQueue; +use crate::HOST_IP; + +pub const DEFAULT_SOCKS_LISTEN: SocketAddr = SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(HOST_IP[0], HOST_IP[1], HOST_IP[2], HOST_IP[3]), + 9050, +)); + +pub struct SocksController { + _thread: NonDetachingJoinHandle<()>, +} +impl SocksController { + pub fn new(listen: SocketAddr, tor: TorController) -> Result { + let auth: AuthAdaptor<()> = Arc::new(NoAuth); + let listener = TcpListener::from_std( + mio::net::TcpListener::bind(listen) + .with_kind(ErrorKind::Network)? + .into(), + ) + .with_kind(ErrorKind::Network)?; + Ok(Self { + _thread: tokio::spawn(async move { + let (bg, mut runner) = BackgroundJobQueue::new(); + runner + .run_while(async { + let server = Server::new(listener, auth); + loop { + match server.accept().await { + Ok((stream, _)) => { + let tor = tor.clone(); + bg.add_job(async move { + if let Err(e) = async { + match stream + .authenticate() + .await + .with_kind(ErrorKind::Network)? + .0 + .wait_request() + .await + .with_kind(ErrorKind::Network)? + { + ClientConnection::Connect( + reply, + Address::DomainAddress(domain, port), + ) if domain.ends_with(".onion") => { + if let Ok(mut target) = tor + .connect_onion(&domain.parse()?, port) + .await + { + let mut sock = reply + .reply( + Reply::Succeeded, + Address::unspecified(), + ) + .await + .with_kind(ErrorKind::Network)?; + tokio::io::copy_bidirectional( + &mut sock, + &mut target, + ) + .await + .with_kind(ErrorKind::Network)?; + } else { + let mut sock = reply + .reply( + Reply::HostUnreachable, + Address::unspecified(), + ) + .await + .with_kind(ErrorKind::Network)?; + sock.shutdown() + .await + .with_kind(ErrorKind::Network)?; + } + } + ClientConnection::Connect(reply, addr) => { + if let Ok(mut target) = match addr { + Address::DomainAddress(domain, port) => { + TcpStream::connect((domain, port)).await + } + Address::SocketAddress(addr) => { + TcpStream::connect(addr).await + } + } { + let mut sock = reply + .reply( + Reply::Succeeded, + Address::unspecified(), + ) + .await + .with_kind(ErrorKind::Network)?; + tokio::io::copy_bidirectional( + &mut sock, + &mut target, + ) + .await + .with_kind(ErrorKind::Network)?; + } else { + let mut sock = reply + .reply( + Reply::HostUnreachable, + Address::unspecified(), + ) + .await + .with_kind(ErrorKind::Network)?; + sock.shutdown() + .await + .with_kind(ErrorKind::Network)?; + } + } + ClientConnection::Bind(bind, _) => { + let mut sock = bind + .reply( + Reply::CommandNotSupported, + Address::unspecified(), + ) + .await + .with_kind(ErrorKind::Network)?; + sock.shutdown() + .await + .with_kind(ErrorKind::Network)?; + } + ClientConnection::UdpAssociate(associate, _) => { + let mut sock = associate + .reply( + Reply::CommandNotSupported, + Address::unspecified(), + ) + .await + .with_kind(ErrorKind::Network)?; + sock.shutdown() + .await + .with_kind(ErrorKind::Network)?; + } + } + + Ok::<_, Error>(()) + } + .await + { + tracing::error!("SOCKS5 Stream Error: {e}"); + tracing::debug!("{e:?}"); + } + }); + } + Err(e) => { + tracing::error!("SOCKS5 TCP Accept Error: {e}"); + tracing::debug!("{e:?}"); + } + } + } + }) + .await; + }) + .into(), + }) + } +} diff --git a/core/startos/src/net/tor.rs b/core/startos/src/net/tor.rs index 502bd106b..db5e28b08 100644 --- a/core/startos/src/net/tor.rs +++ b/core/startos/src/net/tor.rs @@ -3,19 +3,22 @@ use std::collections::{BTreeMap, BTreeSet}; use std::net::SocketAddr; use std::str::FromStr; use std::sync::{Arc, Weak}; +use std::time::{Duration, Instant}; use arti_client::config::onion_service::OnionServiceConfigBuilder; -use arti_client::{TorClient, TorClientConfig}; +use arti_client::{DataStream, TorClient, TorClientConfig}; use base64::Engine; use clap::Parser; use color_eyre::eyre::eyre; -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; use helpers::NonDetachingJoinHandle; use imbl_value::InternedString; +use itertools::Itertools; use rpc_toolkit::{from_fn_async, Context, Empty, HandlerExt, ParentHandler}; use serde::{Deserialize, Serialize}; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; +use tokio::sync::Notify; use tor_cell::relaycell::msg::Connected; use tor_hscrypto::pk::{HsId, HsIdKeypair}; use tor_hsservice::status::State as ArtiOnionServiceState; @@ -28,11 +31,19 @@ use ts_rs::TS; use crate::context::{CliContext, RpcContext}; use crate::prelude::*; use crate::util::actor::background::BackgroundJobQueue; +use crate::util::future::Until; +use crate::util::io::ReadWriter; use crate::util::serde::{ deserialize_from_str, display_serializable, serialize_display, Base64, HandlerExtSerde, WithIoFormat, BASE64, }; -use crate::util::sync::{SyncMutex, SyncRwLock}; +use crate::util::sync::{SyncMutex, SyncRwLock, Watch}; + +const BOOTSTRAP_PROGRESS_TIMEOUT: Duration = Duration::from_secs(300); +const HS_BOOTSTRAP_TIMEOUT: Duration = Duration::from_secs(300); +const RETRY_COOLDOWN: Duration = Duration::from_secs(15); +const HEALTH_CHECK_FAILURE_ALLOWANCE: usize = 5; +const HEALTH_CHECK_COOLDOWN: Duration = Duration::from_secs(120); #[derive(Debug, Clone, Copy)] pub struct OnionAddress(pub HsId); @@ -320,7 +331,7 @@ pub async fn reset(ctx: RpcContext, ResetParams { wipe_state }: ResetParams) -> pub fn display_services( params: WithIoFormat, - services: BTreeMap, + services: BTreeMap, ) -> Result<(), Error> { use prettytable::*; @@ -329,8 +340,17 @@ pub fn display_services( } let mut table = Table::new(); - for (service, status) in services { - let row = row![&service.to_string(), &format!("{status:?}")]; + table.add_row(row![bc => "ADDRESS", "STATE", "BINDINGS"]); + for (service, info) in services { + let row = row![ + &service.to_string(), + &format!("{:?}", info.state), + &info + .bindings + .into_iter() + .map(|(port, addr)| lazy_format!("{port} -> {addr}")) + .join("; ") + ]; table.add_row(row); } table.print_tty(false)?; @@ -363,44 +383,233 @@ impl From for OnionServiceState { } } +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OnionServiceInfo { + pub state: OnionServiceState, + pub bindings: BTreeMap, +} + pub async fn list_services( ctx: RpcContext, _: Empty, -) -> Result, Error> { +) -> Result, Error> { ctx.net_controller.tor.list_services().await } -pub struct TorController { - client: Arc>>, +#[derive(Clone)] +pub struct TorController(Arc); +struct TorControllerInner { + client: Watch<(usize, TorClient)>, + _bootstrapper: NonDetachingJoinHandle<()>, services: SyncMutex>, + reset: Arc, } impl TorController { - pub async fn new() -> Result { + pub fn new() -> Result { let mut config = TorClientConfig::builder(); config .storage() .keystore() .primary() .kind(ArtiKeystoreKind::Ephemeral.into()); - Ok(Self { - client: Arc::new(SyncRwLock::new( - TorClient::with_runtime(TokioRustlsRuntime::current()?) - .config(config.build().with_kind(ErrorKind::Tor)?) - .create_unbootstrapped_async() - .await?, - )), - services: SyncMutex::new(BTreeMap::new()), + let client = Watch::new(( + 0, + TorClient::with_runtime(TokioRustlsRuntime::current()?) + .config(config.build().with_kind(ErrorKind::Tor)?) + .create_unbootstrapped()?, + )); + let reset = Arc::new(Notify::new()); + let bootstrapper_reset = reset.clone(); + let bootstrapper_client = client.clone(); + let bootstrapper = tokio::spawn(async move { + loop { + if let Err(e) = Until::new() + .with_async_fn(|| bootstrapper_reset.notified().map(Ok)) + .run(async { + let (epoch, client): (usize, _) = bootstrapper_client.read(); + let mut events = client.bootstrap_events(); + let bootstrap_fut = + client.bootstrap().map(|res| res.with_kind(ErrorKind::Tor)); + let failure_fut = async { + let mut prev_frac = 0_f32; + let mut prev_inst = Instant::now(); + while let Some(event) = + tokio::time::timeout(BOOTSTRAP_PROGRESS_TIMEOUT, events.next()) + .await + .with_kind(ErrorKind::Tor)? + { + if event.ready_for_traffic() { + return Ok::<_, Error>(()); + } + let frac = event.as_frac(); + if frac == prev_frac { + if prev_inst.elapsed() > BOOTSTRAP_PROGRESS_TIMEOUT { + return Err(Error::new( + eyre!( + "Bootstrap has not made progress for {}", + crate::util::serde::Duration::from( + BOOTSTRAP_PROGRESS_TIMEOUT + ) + ), + ErrorKind::Tor, + )); + } + } else { + prev_frac = frac; + prev_inst = Instant::now(); + } + } + futures::future::pending().await + }; + if let Err::<(), Error>(e) = tokio::select! { + res = bootstrap_fut => res, + res = failure_fut => res, + } { + tracing::error!("Tor Bootstrap Error: {e}"); + tracing::debug!("{e:?}"); + } else { + bootstrapper_client.send_modify(|_| ()); + + for _ in 0..HEALTH_CHECK_FAILURE_ALLOWANCE { + if let Err::<(), Error>(e) = async { + loop { + let (bg, mut runner) = BackgroundJobQueue::new(); + runner + .run_while(async { + const PING_BUF_LEN: usize = 8; + let key = TorSecretKey::generate(); + let onion = key.onion_address(); + let (hs, stream) = client + .launch_onion_service_with_hsid( + OnionServiceConfigBuilder::default() + .nickname( + onion + .to_string() + .trim_end_matches(".onion") + .parse::() + .with_kind(ErrorKind::Tor)?, + ) + .build() + .with_kind(ErrorKind::Tor)?, + key.clone().0, + ) + .with_kind(ErrorKind::Tor)?; + bg.add_job(async move { + if let Err(e) = async { + let mut stream = + tor_hsservice::handle_rend_requests( + stream, + ); + while let Some(req) = stream.next().await { + let mut stream = req + .accept(Connected::new_empty()) + .await + .with_kind(ErrorKind::Tor)?; + let mut buf = [0; PING_BUF_LEN]; + stream.read_exact(&mut buf).await?; + stream.write_all(&buf).await?; + stream.flush().await?; + stream.shutdown().await?; + } + Ok::<_, Error>(()) + } + .await + { + tracing::error!("Tor Health Error: {e}"); + tracing::debug!("{e:?}"); + } + }); + + tokio::time::timeout(HS_BOOTSTRAP_TIMEOUT, async { + let mut status = hs.status_events(); + while let Some(status) = status.next().await { + if status.state().is_fully_reachable() { + return Ok(()); + } + } + Err(Error::new( + eyre!("status event stream ended"), + ErrorKind::Tor, + )) + }) + .await + .with_kind(ErrorKind::Tor)??; + + let mut stream = client + .connect((onion.to_string(), 8080)) + .await?; + let mut ping_buf = [0; PING_BUF_LEN]; + rand::fill(&mut ping_buf); + stream.write_all(&ping_buf).await?; + stream.flush().await?; + let mut ping_res = [0; PING_BUF_LEN]; + stream.read_exact(&mut ping_res).await?; + ensure_code!( + ping_buf == ping_res, + ErrorKind::Tor, + "ping buffer mismatch" + ); + stream.shutdown().await?; + + Ok::<_, Error>(()) + }) + .await?; + tokio::time::sleep(HEALTH_CHECK_COOLDOWN).await; + } + } + .await + { + tracing::error!("Tor Client Creation Error: {e}"); + tracing::debug!("{e:?}"); + } + } + tracing::error!( + "Client failed health check {} times, recycling", + HEALTH_CHECK_FAILURE_ALLOWANCE + ); + } + if let Err::<(), Error>(e) = async { + tokio::time::sleep(RETRY_COOLDOWN).await; + bootstrapper_client.send(( + epoch.wrapping_add(1), + TorClient::with_runtime(TokioRustlsRuntime::current()?) + .config(config.build().with_kind(ErrorKind::Tor)?) + .create_unbootstrapped()?, + )); + Ok(()) + } + .await + { + tracing::error!("Tor Client Creation Error: {e}"); + tracing::debug!("{e:?}"); + } + Ok(()) + }) + .await + { + tracing::error!("Tor Bootstrapper Error: {e}"); + tracing::debug!("{e:?}"); + } + } }) + .into(); + Ok(Self(Arc::new(TorControllerInner { + client, + _bootstrapper: bootstrapper, + services: SyncMutex::new(BTreeMap::new()), + reset, + }))) } pub fn service(&self, key: TorSecretKey) -> Result { - self.services.mutate(|s| { + self.0.services.mutate(|s| { use std::collections::btree_map::Entry; let addr = key.onion_address(); match s.entry(addr) { Entry::Occupied(e) => Ok(e.get().clone()), Entry::Vacant(e) => Ok(e - .insert(OnionService::launch(self.client.clone(), key)?) + .insert(OnionService::launch(self.0.client.clone(), key)?) .clone()), } }) @@ -408,7 +617,7 @@ impl TorController { pub async fn gc(&self, addr: Option) -> Result<(), Error> { if let Some(addr) = addr { - if let Some(s) = self.services.mutate(|s| { + if let Some(s) = self.0.services.mutate(|s| { let rm = if let Some(s) = s.get(&addr) { !s.gc() } else { @@ -425,7 +634,7 @@ impl TorController { Ok(()) } } else { - for s in self.services.mutate(|s| { + for s in self.0.services.mutate(|s| { let mut rm = Vec::new(); s.retain(|_, s| { if s.gc() { @@ -444,13 +653,51 @@ impl TorController { } pub async fn reset(&self, wipe_state: bool) -> Result<(), Error> { + self.0.reset.notify_waiters(); Ok(()) } - pub async fn list_services(&self) -> Result, Error> { + pub async fn list_services(&self) -> Result, Error> { Ok(self + .0 .services - .peek(|s| s.iter().map(|(a, s)| (a.clone(), s.state())).collect())) + .peek(|s| s.iter().map(|(a, s)| (a.clone(), s.info())).collect())) + } + + pub async fn connect_onion( + &self, + addr: &OnionAddress, + port: u16, + ) -> Result, Error> { + if let Some(target) = self.0.services.peek(|s| { + s.get(addr).and_then(|s| { + s.0.bindings.peek(|b| { + b.get(&port).and_then(|b| { + b.iter() + .find(|(_, rc)| rc.strong_count() > 0) + .map(|(a, _)| *a) + }) + }) + }) + }) { + Ok(Box::new( + TcpStream::connect(target) + .await + .with_kind(ErrorKind::Network)?, + )) + } else { + let mut client = self.0.client.clone(); + client + .wait_for(|(_, c)| c.bootstrap_status().ready_for_traffic()) + .await; + let stream = client + .read() + .1 + .connect((addr.to_string(), port)) + .await + .with_kind(ErrorKind::Tor)?; + Ok(Box::new(stream)) + } } } @@ -463,7 +710,7 @@ struct OnionServiceData { } impl OnionService { fn launch( - client: Arc>>, + mut client: Watch<(usize, TorClient)>, key: TorSecretKey, ) -> Result { let service = Arc::new(SyncMutex::new(None)); @@ -480,7 +727,12 @@ impl OnionService { .run_while(async { loop { if let Err(e) = async { - let (new_service, stream) = client.peek(|c| { + client.wait_for(|(_,c)| c.bootstrap_status().ready_for_traffic()).await; + let epoch = client.peek(|(e, c)| { + ensure_code!(c.bootstrap_status().ready_for_traffic(), ErrorKind::Tor, "client recycled"); + Ok::<_, Error>(*e) + })?; + let (new_service, stream) = client.peek(|(_, c)| { c.launch_onion_service_with_hsid( OnionServiceConfigBuilder::default() .nickname( @@ -496,19 +748,18 @@ impl OnionService { ) .with_kind(ErrorKind::Tor) })?; - let addr = new_service.onion_address().map(|a| safelog::DisplayRedacted::display_unredacted(&a).to_string()); let mut status_stream = new_service.status_events(); bg.add_job(async move { while let Some(status) = status_stream.next().await { - tracing::debug!("{addr:?} status: {status:?}"); - if let Some(err) = status.current_problem() { - tracing::error!("{err:?}"); - } + // TODO: health daemon? } }); service.replace(Some(new_service)); let mut stream = tor_hsservice::handle_rend_requests(stream); - while let Some(req) = stream.next().await { + while let Some(req) = tokio::select! { + req = stream.next() => req, + _ = client.wait_for(|(e, _)| *e != epoch) => None + } { bg.add_job({ let bg = bg.clone(); let bindings = bindings.clone(); @@ -555,28 +806,24 @@ impl OnionService { ) .await { - tracing::error!("{e}"); + tracing::error!("Tor Stream Error: {e}"); tracing::debug!("{e:?}"); } - incoming.flush().await?; - outgoing.flush().await?; - incoming.shutdown().await?; - outgoing.shutdown().await?; Ok::<_, Error>(()) } .await { - tracing::error!("{e}"); - tracing::debug!("{e:?}"); + tracing::trace!("Tor Stream Error: {e}"); + tracing::trace!("{e:?}"); } }); Ok::<_, Error>(()) } .await { - tracing::error!("{e}"); - tracing::debug!("{e:?}"); + tracing::trace!("Tor Request Error: {e}"); + tracing::trace!("{e:?}"); } } }); @@ -585,7 +832,7 @@ impl OnionService { } .await { - tracing::error!("{e}"); + tracing::error!("Tor Client Error: {e}"); tracing::debug!("{e:?}"); } } @@ -639,4 +886,19 @@ impl OnionService { .peek(|s| s.as_ref().map(|s| s.status().state().into())) .unwrap_or(OnionServiceState::Bootstrapping) } + + pub fn info(&self) -> OnionServiceInfo { + OnionServiceInfo { + state: self.state(), + bindings: self.0.bindings.peek(|b| { + b.iter() + .filter_map(|(port, b)| { + b.iter() + .find(|(_, rc)| rc.strong_count() > 0) + .map(|(addr, _)| (*port, *addr)) + }) + .collect() + }), + } + } } diff --git a/core/startos/src/service/effects/subcontainer/mod.rs b/core/startos/src/service/effects/subcontainer/mod.rs index 92114e363..395f5fcd1 100644 --- a/core/startos/src/service/effects/subcontainer/mod.rs +++ b/core/startos/src/service/effects/subcontainer/mod.rs @@ -11,14 +11,14 @@ use crate::service::effects::prelude::*; use crate::service::persistent_container::Subcontainer; use crate::util::Invoke; -#[cfg(feature = "start-container")] +#[cfg(feature = "cli-container")] mod sync; -#[cfg(not(feature = "start-container"))] +#[cfg(not(feature = "cli-container"))] mod sync_dummy; pub use sync::*; -#[cfg(not(feature = "start-container"))] +#[cfg(not(feature = "cli-container"))] use sync_dummy as sync; #[derive(Debug, Deserialize, Serialize, Parser, TS)] diff --git a/core/startos/src/util/io.rs b/core/startos/src/util/io.rs index ef7e36801..b8cab713f 100644 --- a/core/startos/src/util/io.rs +++ b/core/startos/src/util/io.rs @@ -6,8 +6,8 @@ use std::os::unix::prelude::MetadataExt; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::str::FromStr; -use std::sync::Arc; use std::sync::atomic::AtomicU64; +use std::sync::Arc; use std::task::{Poll, Waker}; use std::time::Duration; @@ -22,7 +22,7 @@ use nix::unistd::{Gid, Uid}; use serde::{Deserialize, Serialize}; use tokio::fs::{File, OpenOptions}; use tokio::io::{ - AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, DuplexStream, ReadBuf, WriteHalf, duplex, + duplex, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, DuplexStream, ReadBuf, WriteHalf, }; use tokio::net::TcpStream; use tokio::sync::{Notify, OwnedMutexGuard}; @@ -1528,6 +1528,9 @@ impl ValueParserFactory for TermSize { } } +pub trait ReadWriter: AsyncRead + AsyncWrite {} +impl ReadWriter for T {} + #[instrument(skip_all)] async fn wait_for_created(stream: &mut EventStream<[u8; 1024]>, path: &Path) -> Result<(), Error> { let parent = stream