socks5 proxy working

This commit is contained in:
Aiden McClelland
2025-08-29 11:19:30 -06:00
parent b3b031ed47
commit 8163db7ac3
11 changed files with 543 additions and 93 deletions

27
core/Cargo.lock generated
View File

@@ -7033,6 +7033,19 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "socks5-impl"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "214a7c0af583e8f7abbd3394f235b4df7cc65db9b98ef80506cac7c7eefc0c75"
dependencies = [
"async-trait",
"bytes",
"percent-encoding",
"thiserror 2.0.16",
"tokio",
]
[[package]]
name = "solana-nohash-hasher"
version = "0.2.1"
@@ -7366,6 +7379,7 @@ dependencies = [
"signal-hook",
"simple-logging",
"socket2 0.6.0",
"socks5-impl",
"sqlx",
"sscanf",
"ssh-key",
@@ -7375,7 +7389,6 @@ dependencies = [
"thiserror 2.0.16",
"tokio",
"tokio-rustls 0.26.2",
"tokio-socks",
"tokio-stream",
"tokio-tar",
"tokio-tungstenite",
@@ -7840,18 +7853,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-socks"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d4770b8024672c1101b3f6733eab95b18007dbe0847a8afe341fcf79e06043f"
dependencies = [
"either",
"futures-util",
"thiserror 1.0.69",
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.17"

View File

@@ -212,6 +212,7 @@ shell-words = "1"
signal-hook = "0.3.17"
simple-logging = "2.0.2"
socket2 = { version = "0.6.0", features = ["all"] }
socks5-impl = { version = "0.7.2", features = ["server"] }
sqlx = { version = "0.8.6", features = [
"runtime-tokio-rustls",
"postgres",
@@ -224,7 +225,6 @@ thiserror = "2.0.12"
textwrap = "0.16.1"
tokio = { version = "1.38.1", features = ["full"] }
tokio-rustls = "0.26.0"
tokio-socks = "0.5.1"
tokio-stream = { version = "0.1.14", features = ["io-util", "sync", "net"] }
tokio-tar = { git = "https://github.com/dr-bonez/tokio-tar.git" }
tokio-tungstenite = { version = "0.26.2", features = ["native-tls", "url"] }

View File

@@ -4,8 +4,8 @@ use std::future::Future;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use chrono::{TimeDelta, Utc};
@@ -18,35 +18,36 @@ use models::{ActionId, PackageId};
use reqwest::{Client, Proxy};
use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::{CallRemote, Context, Empty};
use tokio::sync::{RwLock, broadcast, oneshot, watch};
use tokio::sync::{broadcast, oneshot, watch, RwLock};
use tokio::time::Instant;
use tracing::instrument;
use super::setup::CURRENT_SECRET;
use crate::DATA_DIR;
use crate::account::AccountInfo;
use crate::auth::Sessions;
use crate::context::config::ServerConfig;
use crate::db::model::Database;
use crate::db::model::package::TaskSeverity;
use crate::db::model::Database;
use crate::disk::OsPartitionInfo;
use crate::init::{InitResult, check_time_is_synchronized};
use crate::init::{check_time_is_synchronized, InitResult};
use crate::install::PKG_ARCHIVE_DIR;
use crate::lxc::LxcManager;
use crate::net::net_controller::{NetController, NetService};
use crate::net::socks::DEFAULT_SOCKS_LISTEN;
use crate::net::utils::{find_eth_iface, find_wifi_iface};
use crate::net::web_server::{UpgradableListener, WebServerAcceptorSetter};
use crate::net::wifi::WpaCli;
use crate::prelude::*;
use crate::progress::{FullProgressTracker, PhaseProgressTrackerHandle};
use crate::rpc_continuations::{Guid, OpenAuthedContinuations, RpcContinuations};
use crate::service::ServiceMap;
use crate::service::action::update_tasks;
use crate::service::effects::callbacks::ServiceCallbacks;
use crate::service::ServiceMap;
use crate::shutdown::Shutdown;
use crate::util::io::delete_file;
use crate::util::lshw::LshwDevice;
use crate::util::sync::{SyncMutex, Watch};
use crate::{DATA_DIR, HOST_IP};
pub struct RpcContextSeed {
is_closed: AtomicBool,
@@ -131,12 +132,7 @@ impl RpcContext {
run_migrations,
}: InitRpcContextPhases,
) -> Result<Self, Error> {
let socks_proxy = config
.socks_listen
.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
9050,
)));
let socks_proxy = config.socks_listen.unwrap_or(DEFAULT_SOCKS_LISTEN);
let (shutdown, _) = tokio::sync::broadcast::channel(1);
load_db.start();
@@ -158,7 +154,8 @@ impl RpcContext {
{
(net_ctrl, os_net_service)
} else {
let net_ctrl = Arc::new(NetController::init(db.clone(), &account.hostname).await?);
let net_ctrl =
Arc::new(NetController::init(db.clone(), &account.hostname, socks_proxy).await?);
webserver.try_upgrade(|a| net_ctrl.net_iface.watcher.upgrade_listener(a))?;
let os_net_service = net_ctrl.os_bindings().await?;
(net_ctrl, os_net_service)

View File

@@ -8,7 +8,7 @@ use const_format::formatcp;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use models::ResultExt;
use rpc_toolkit::{Context, Empty, HandlerArgs, HandlerExt, ParentHandler, from_fn_async};
use rpc_toolkit::{from_fn_async, Context, Empty, HandlerArgs, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
use tokio::process::Command;
use tracing::instrument;
@@ -17,12 +17,13 @@ use ts_rs::TS;
use crate::account::AccountInfo;
use crate::context::config::ServerConfig;
use crate::context::{CliContext, InitContext, RpcContext};
use crate::db::model::Database;
use crate::db::model::public::ServerStatus;
use crate::db::model::Database;
use crate::developer::OS_DEVELOPER_KEY_PATH;
use crate::hostname::Hostname;
use crate::middleware::auth::AuthContext;
use crate::net::net_controller::{NetController, NetService};
use crate::net::socks::DEFAULT_SOCKS_LISTEN;
use crate::net::utils::find_wifi_iface;
use crate::net::web_server::{UpgradableListener, WebServerAcceptorSetter};
use crate::prelude::*;
@@ -33,10 +34,10 @@ use crate::rpc_continuations::{Guid, RpcContinuation};
use crate::s9pk::v2::pack::{CONTAINER_DATADIR, CONTAINER_TOOL};
use crate::ssh::SSH_DIR;
use crate::system::{get_mem_info, sync_kiosk};
use crate::util::io::{IOHook, open_file};
use crate::util::io::{open_file, IOHook};
use crate::util::lshw::lshw;
use crate::util::net::WebSocketExt;
use crate::util::{Invoke, cpupower};
use crate::util::{cpupower, Invoke};
use crate::{Error, MAIN_DATA, PACKAGE_DATA};
pub const SYSTEM_REBUILD_PATH: &str = "/media/startos/config/system-rebuild";
@@ -202,7 +203,14 @@ pub async fn init(
let account = AccountInfo::load(&peek)?;
start_net.start();
let net_ctrl = Arc::new(NetController::init(db.clone(), &account.hostname).await?);
let net_ctrl = Arc::new(
NetController::init(
db.clone(),
&account.hostname,
cfg.socks_listen.unwrap_or(DEFAULT_SOCKS_LISTEN),
)
.await?,
);
webserver.try_upgrade(|a| net_ctrl.net_iface.watcher.upgrade_listener(a))?;
let os_net_service = net_ctrl.os_bindings().await?;
start_net.complete();

View File

@@ -1,11 +1,11 @@
use std::collections::{BTreeMap, BTreeSet};
use std::str::FromStr;
use clap::Parser;
use clap::builder::ValueParserFactory;
use clap::Parser;
use imbl::OrdSet;
use models::{FromStrParser, GatewayId, HostId};
use rpc_toolkit::{Context, Empty, HandlerArgs, HandlerExt, ParentHandler, from_fn_async};
use rpc_toolkit::{from_fn_async, Context, Empty, HandlerArgs, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
use ts_rs::TS;
@@ -16,7 +16,7 @@ use crate::net::gateway::InterfaceFilter;
use crate::net::host::HostApiKind;
use crate::net::vhost::AlpnInfo;
use crate::prelude::*;
use crate::util::serde::{HandlerExtSerde, display_serializable};
use crate::util::serde::{display_serializable, HandlerExtSerde};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, TS)]
#[ts(export)]
@@ -136,9 +136,9 @@ impl BindInfo {
impl InterfaceFilter for NetInfo {
fn filter(&self, id: &GatewayId, info: &NetworkInterfaceInfo) -> bool {
if info.public() {
dbg!(self.public_enabled.contains(id))
self.public_enabled.contains(id)
} else {
dbg!(!self.private_disabled.contains(id))
!self.private_disabled.contains(id)
}
}
}
@@ -169,8 +169,8 @@ pub struct AddSslOptions {
pub alpn: Option<AlpnInfo>,
}
pub fn binding<C: Context, Kind: HostApiKind>()
-> ParentHandler<C, Kind::Params, Kind::InheritedParams> {
pub fn binding<C: Context, Kind: HostApiKind>(
) -> ParentHandler<C, Kind::Params, Kind::InheritedParams> {
ParentHandler::<C, Kind::Params, Kind::InheritedParams>::new()
.subcommand(
"list",

View File

@@ -9,6 +9,7 @@ pub mod keys;
pub mod mdns;
pub mod net_controller;
pub mod service_interface;
pub mod socks;
pub mod ssl;
pub mod static_server;
pub mod tor;

View File

@@ -3,7 +3,7 @@ use std::net::{Ipv4Addr, SocketAddr};
use std::sync::{Arc, Weak};
use color_eyre::eyre::eyre;
use imbl::{OrdMap, vector};
use imbl::{vector, OrdMap};
use imbl_value::InternedString;
use ipnet::IpNet;
use models::{HostId, OptionExt, PackageId};
@@ -11,9 +11,8 @@ use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::instrument;
use crate::HOST_IP;
use crate::db::model::Database;
use crate::db::model::public::NetworkInterfaceInfo;
use crate::db::model::Database;
use crate::error::ErrorCollection;
use crate::hostname::Hostname;
use crate::net::dns::DnsController;
@@ -24,14 +23,16 @@ use crate::net::gateway::{
};
use crate::net::host::address::HostAddress;
use crate::net::host::binding::{AddSslOptions, BindId, BindOptions};
use crate::net::host::{Host, Hosts, host_for};
use crate::net::host::{host_for, Host, Hosts};
use crate::net::service_interface::{HostnameInfo, IpHostname, OnionHostname};
use crate::net::socks::SocksController;
use crate::net::tor::{OnionAddress, TorController, TorSecretKey};
use crate::net::utils::ipv6_is_local;
use crate::net::vhost::{AlpnInfo, TargetInfo, VHostController};
use crate::prelude::*;
use crate::service::effects::callbacks::ServiceCallbacks;
use crate::util::serde::MaybeUtf8String;
use crate::HOST_IP;
pub struct NetController {
pub(crate) db: TypedPatchDb<Database>,
@@ -40,20 +41,28 @@ pub struct NetController {
pub(crate) net_iface: Arc<NetworkInterfaceController>,
pub(super) dns: DnsController,
pub(super) forward: PortForwardController,
pub(super) socks: SocksController,
pub(super) server_hostnames: Vec<Option<InternedString>>,
pub(crate) callbacks: Arc<ServiceCallbacks>,
}
impl NetController {
pub async fn init(db: TypedPatchDb<Database>, hostname: &Hostname) -> Result<Self, Error> {
pub async fn init(
db: TypedPatchDb<Database>,
hostname: &Hostname,
socks_listen: SocketAddr,
) -> Result<Self, Error> {
let net_iface = Arc::new(NetworkInterfaceController::new(db.clone()));
let tor = TorController::new()?;
let socks = SocksController::new(socks_listen, tor.clone())?;
Ok(Self {
db: db.clone(),
tor: TorController::new().await?,
tor,
vhost: VHostController::new(db.clone(), net_iface.clone()),
dns: DnsController::init(db, &net_iface.watcher).await?,
forward: PortForwardController::new(net_iface.watcher.subscribe()),
net_iface,
socks,
server_hostnames: vec![
// LAN IP
None,

View File

@@ -0,0 +1,169 @@
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use helpers::NonDetachingJoinHandle;
use socks5_impl::protocol::{Address, Reply};
use socks5_impl::server::auth::NoAuth;
use socks5_impl::server::{AuthAdaptor, ClientConnection, Server};
use tokio::net::{TcpListener, TcpStream};
use crate::net::tor::TorController;
use crate::prelude::*;
use crate::util::actor::background::BackgroundJobQueue;
use crate::HOST_IP;
pub const DEFAULT_SOCKS_LISTEN: SocketAddr = SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(HOST_IP[0], HOST_IP[1], HOST_IP[2], HOST_IP[3]),
9050,
));
pub struct SocksController {
_thread: NonDetachingJoinHandle<()>,
}
impl SocksController {
pub fn new(listen: SocketAddr, tor: TorController) -> Result<Self, Error> {
let auth: AuthAdaptor<()> = Arc::new(NoAuth);
let listener = TcpListener::from_std(
mio::net::TcpListener::bind(listen)
.with_kind(ErrorKind::Network)?
.into(),
)
.with_kind(ErrorKind::Network)?;
Ok(Self {
_thread: tokio::spawn(async move {
let (bg, mut runner) = BackgroundJobQueue::new();
runner
.run_while(async {
let server = Server::new(listener, auth);
loop {
match server.accept().await {
Ok((stream, _)) => {
let tor = tor.clone();
bg.add_job(async move {
if let Err(e) = async {
match stream
.authenticate()
.await
.with_kind(ErrorKind::Network)?
.0
.wait_request()
.await
.with_kind(ErrorKind::Network)?
{
ClientConnection::Connect(
reply,
Address::DomainAddress(domain, port),
) if domain.ends_with(".onion") => {
if let Ok(mut target) = tor
.connect_onion(&domain.parse()?, port)
.await
{
let mut sock = reply
.reply(
Reply::Succeeded,
Address::unspecified(),
)
.await
.with_kind(ErrorKind::Network)?;
tokio::io::copy_bidirectional(
&mut sock,
&mut target,
)
.await
.with_kind(ErrorKind::Network)?;
} else {
let mut sock = reply
.reply(
Reply::HostUnreachable,
Address::unspecified(),
)
.await
.with_kind(ErrorKind::Network)?;
sock.shutdown()
.await
.with_kind(ErrorKind::Network)?;
}
}
ClientConnection::Connect(reply, addr) => {
if let Ok(mut target) = match addr {
Address::DomainAddress(domain, port) => {
TcpStream::connect((domain, port)).await
}
Address::SocketAddress(addr) => {
TcpStream::connect(addr).await
}
} {
let mut sock = reply
.reply(
Reply::Succeeded,
Address::unspecified(),
)
.await
.with_kind(ErrorKind::Network)?;
tokio::io::copy_bidirectional(
&mut sock,
&mut target,
)
.await
.with_kind(ErrorKind::Network)?;
} else {
let mut sock = reply
.reply(
Reply::HostUnreachable,
Address::unspecified(),
)
.await
.with_kind(ErrorKind::Network)?;
sock.shutdown()
.await
.with_kind(ErrorKind::Network)?;
}
}
ClientConnection::Bind(bind, _) => {
let mut sock = bind
.reply(
Reply::CommandNotSupported,
Address::unspecified(),
)
.await
.with_kind(ErrorKind::Network)?;
sock.shutdown()
.await
.with_kind(ErrorKind::Network)?;
}
ClientConnection::UdpAssociate(associate, _) => {
let mut sock = associate
.reply(
Reply::CommandNotSupported,
Address::unspecified(),
)
.await
.with_kind(ErrorKind::Network)?;
sock.shutdown()
.await
.with_kind(ErrorKind::Network)?;
}
}
Ok::<_, Error>(())
}
.await
{
tracing::error!("SOCKS5 Stream Error: {e}");
tracing::debug!("{e:?}");
}
});
}
Err(e) => {
tracing::error!("SOCKS5 TCP Accept Error: {e}");
tracing::debug!("{e:?}");
}
}
}
})
.await;
})
.into(),
})
}
}

View File

@@ -3,19 +3,22 @@ use std::collections::{BTreeMap, BTreeSet};
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use arti_client::config::onion_service::OnionServiceConfigBuilder;
use arti_client::{TorClient, TorClientConfig};
use arti_client::{DataStream, TorClient, TorClientConfig};
use base64::Engine;
use clap::Parser;
use color_eyre::eyre::eyre;
use futures::StreamExt;
use futures::{FutureExt, StreamExt};
use helpers::NonDetachingJoinHandle;
use imbl_value::InternedString;
use itertools::Itertools;
use rpc_toolkit::{from_fn_async, Context, Empty, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::Notify;
use tor_cell::relaycell::msg::Connected;
use tor_hscrypto::pk::{HsId, HsIdKeypair};
use tor_hsservice::status::State as ArtiOnionServiceState;
@@ -28,11 +31,19 @@ use ts_rs::TS;
use crate::context::{CliContext, RpcContext};
use crate::prelude::*;
use crate::util::actor::background::BackgroundJobQueue;
use crate::util::future::Until;
use crate::util::io::ReadWriter;
use crate::util::serde::{
deserialize_from_str, display_serializable, serialize_display, Base64, HandlerExtSerde,
WithIoFormat, BASE64,
};
use crate::util::sync::{SyncMutex, SyncRwLock};
use crate::util::sync::{SyncMutex, SyncRwLock, Watch};
const BOOTSTRAP_PROGRESS_TIMEOUT: Duration = Duration::from_secs(300);
const HS_BOOTSTRAP_TIMEOUT: Duration = Duration::from_secs(300);
const RETRY_COOLDOWN: Duration = Duration::from_secs(15);
const HEALTH_CHECK_FAILURE_ALLOWANCE: usize = 5;
const HEALTH_CHECK_COOLDOWN: Duration = Duration::from_secs(120);
#[derive(Debug, Clone, Copy)]
pub struct OnionAddress(pub HsId);
@@ -320,7 +331,7 @@ pub async fn reset(ctx: RpcContext, ResetParams { wipe_state }: ResetParams) ->
pub fn display_services(
params: WithIoFormat<Empty>,
services: BTreeMap<OnionAddress, OnionServiceState>,
services: BTreeMap<OnionAddress, OnionServiceInfo>,
) -> Result<(), Error> {
use prettytable::*;
@@ -329,8 +340,17 @@ pub fn display_services(
}
let mut table = Table::new();
for (service, status) in services {
let row = row![&service.to_string(), &format!("{status:?}")];
table.add_row(row![bc => "ADDRESS", "STATE", "BINDINGS"]);
for (service, info) in services {
let row = row![
&service.to_string(),
&format!("{:?}", info.state),
&info
.bindings
.into_iter()
.map(|(port, addr)| lazy_format!("{port} -> {addr}"))
.join("; ")
];
table.add_row(row);
}
table.print_tty(false)?;
@@ -363,44 +383,233 @@ impl From<ArtiOnionServiceState> for OnionServiceState {
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct OnionServiceInfo {
pub state: OnionServiceState,
pub bindings: BTreeMap<u16, SocketAddr>,
}
pub async fn list_services(
ctx: RpcContext,
_: Empty,
) -> Result<BTreeMap<OnionAddress, OnionServiceState>, Error> {
) -> Result<BTreeMap<OnionAddress, OnionServiceInfo>, Error> {
ctx.net_controller.tor.list_services().await
}
pub struct TorController {
client: Arc<SyncRwLock<TorClient<TokioRustlsRuntime>>>,
#[derive(Clone)]
pub struct TorController(Arc<TorControllerInner>);
struct TorControllerInner {
client: Watch<(usize, TorClient<TokioRustlsRuntime>)>,
_bootstrapper: NonDetachingJoinHandle<()>,
services: SyncMutex<BTreeMap<OnionAddress, OnionService>>,
reset: Arc<Notify>,
}
impl TorController {
pub async fn new() -> Result<Self, Error> {
pub fn new() -> Result<Self, Error> {
let mut config = TorClientConfig::builder();
config
.storage()
.keystore()
.primary()
.kind(ArtiKeystoreKind::Ephemeral.into());
Ok(Self {
client: Arc::new(SyncRwLock::new(
TorClient::with_runtime(TokioRustlsRuntime::current()?)
.config(config.build().with_kind(ErrorKind::Tor)?)
.create_unbootstrapped_async()
.await?,
)),
services: SyncMutex::new(BTreeMap::new()),
let client = Watch::new((
0,
TorClient::with_runtime(TokioRustlsRuntime::current()?)
.config(config.build().with_kind(ErrorKind::Tor)?)
.create_unbootstrapped()?,
));
let reset = Arc::new(Notify::new());
let bootstrapper_reset = reset.clone();
let bootstrapper_client = client.clone();
let bootstrapper = tokio::spawn(async move {
loop {
if let Err(e) = Until::new()
.with_async_fn(|| bootstrapper_reset.notified().map(Ok))
.run(async {
let (epoch, client): (usize, _) = bootstrapper_client.read();
let mut events = client.bootstrap_events();
let bootstrap_fut =
client.bootstrap().map(|res| res.with_kind(ErrorKind::Tor));
let failure_fut = async {
let mut prev_frac = 0_f32;
let mut prev_inst = Instant::now();
while let Some(event) =
tokio::time::timeout(BOOTSTRAP_PROGRESS_TIMEOUT, events.next())
.await
.with_kind(ErrorKind::Tor)?
{
if event.ready_for_traffic() {
return Ok::<_, Error>(());
}
let frac = event.as_frac();
if frac == prev_frac {
if prev_inst.elapsed() > BOOTSTRAP_PROGRESS_TIMEOUT {
return Err(Error::new(
eyre!(
"Bootstrap has not made progress for {}",
crate::util::serde::Duration::from(
BOOTSTRAP_PROGRESS_TIMEOUT
)
),
ErrorKind::Tor,
));
}
} else {
prev_frac = frac;
prev_inst = Instant::now();
}
}
futures::future::pending().await
};
if let Err::<(), Error>(e) = tokio::select! {
res = bootstrap_fut => res,
res = failure_fut => res,
} {
tracing::error!("Tor Bootstrap Error: {e}");
tracing::debug!("{e:?}");
} else {
bootstrapper_client.send_modify(|_| ());
for _ in 0..HEALTH_CHECK_FAILURE_ALLOWANCE {
if let Err::<(), Error>(e) = async {
loop {
let (bg, mut runner) = BackgroundJobQueue::new();
runner
.run_while(async {
const PING_BUF_LEN: usize = 8;
let key = TorSecretKey::generate();
let onion = key.onion_address();
let (hs, stream) = client
.launch_onion_service_with_hsid(
OnionServiceConfigBuilder::default()
.nickname(
onion
.to_string()
.trim_end_matches(".onion")
.parse::<HsNickname>()
.with_kind(ErrorKind::Tor)?,
)
.build()
.with_kind(ErrorKind::Tor)?,
key.clone().0,
)
.with_kind(ErrorKind::Tor)?;
bg.add_job(async move {
if let Err(e) = async {
let mut stream =
tor_hsservice::handle_rend_requests(
stream,
);
while let Some(req) = stream.next().await {
let mut stream = req
.accept(Connected::new_empty())
.await
.with_kind(ErrorKind::Tor)?;
let mut buf = [0; PING_BUF_LEN];
stream.read_exact(&mut buf).await?;
stream.write_all(&buf).await?;
stream.flush().await?;
stream.shutdown().await?;
}
Ok::<_, Error>(())
}
.await
{
tracing::error!("Tor Health Error: {e}");
tracing::debug!("{e:?}");
}
});
tokio::time::timeout(HS_BOOTSTRAP_TIMEOUT, async {
let mut status = hs.status_events();
while let Some(status) = status.next().await {
if status.state().is_fully_reachable() {
return Ok(());
}
}
Err(Error::new(
eyre!("status event stream ended"),
ErrorKind::Tor,
))
})
.await
.with_kind(ErrorKind::Tor)??;
let mut stream = client
.connect((onion.to_string(), 8080))
.await?;
let mut ping_buf = [0; PING_BUF_LEN];
rand::fill(&mut ping_buf);
stream.write_all(&ping_buf).await?;
stream.flush().await?;
let mut ping_res = [0; PING_BUF_LEN];
stream.read_exact(&mut ping_res).await?;
ensure_code!(
ping_buf == ping_res,
ErrorKind::Tor,
"ping buffer mismatch"
);
stream.shutdown().await?;
Ok::<_, Error>(())
})
.await?;
tokio::time::sleep(HEALTH_CHECK_COOLDOWN).await;
}
}
.await
{
tracing::error!("Tor Client Creation Error: {e}");
tracing::debug!("{e:?}");
}
}
tracing::error!(
"Client failed health check {} times, recycling",
HEALTH_CHECK_FAILURE_ALLOWANCE
);
}
if let Err::<(), Error>(e) = async {
tokio::time::sleep(RETRY_COOLDOWN).await;
bootstrapper_client.send((
epoch.wrapping_add(1),
TorClient::with_runtime(TokioRustlsRuntime::current()?)
.config(config.build().with_kind(ErrorKind::Tor)?)
.create_unbootstrapped()?,
));
Ok(())
}
.await
{
tracing::error!("Tor Client Creation Error: {e}");
tracing::debug!("{e:?}");
}
Ok(())
})
.await
{
tracing::error!("Tor Bootstrapper Error: {e}");
tracing::debug!("{e:?}");
}
}
})
.into();
Ok(Self(Arc::new(TorControllerInner {
client,
_bootstrapper: bootstrapper,
services: SyncMutex::new(BTreeMap::new()),
reset,
})))
}
pub fn service(&self, key: TorSecretKey) -> Result<OnionService, Error> {
self.services.mutate(|s| {
self.0.services.mutate(|s| {
use std::collections::btree_map::Entry;
let addr = key.onion_address();
match s.entry(addr) {
Entry::Occupied(e) => Ok(e.get().clone()),
Entry::Vacant(e) => Ok(e
.insert(OnionService::launch(self.client.clone(), key)?)
.insert(OnionService::launch(self.0.client.clone(), key)?)
.clone()),
}
})
@@ -408,7 +617,7 @@ impl TorController {
pub async fn gc(&self, addr: Option<OnionAddress>) -> Result<(), Error> {
if let Some(addr) = addr {
if let Some(s) = self.services.mutate(|s| {
if let Some(s) = self.0.services.mutate(|s| {
let rm = if let Some(s) = s.get(&addr) {
!s.gc()
} else {
@@ -425,7 +634,7 @@ impl TorController {
Ok(())
}
} else {
for s in self.services.mutate(|s| {
for s in self.0.services.mutate(|s| {
let mut rm = Vec::new();
s.retain(|_, s| {
if s.gc() {
@@ -444,13 +653,51 @@ impl TorController {
}
pub async fn reset(&self, wipe_state: bool) -> Result<(), Error> {
self.0.reset.notify_waiters();
Ok(())
}
pub async fn list_services(&self) -> Result<BTreeMap<OnionAddress, OnionServiceState>, Error> {
pub async fn list_services(&self) -> Result<BTreeMap<OnionAddress, OnionServiceInfo>, Error> {
Ok(self
.0
.services
.peek(|s| s.iter().map(|(a, s)| (a.clone(), s.state())).collect()))
.peek(|s| s.iter().map(|(a, s)| (a.clone(), s.info())).collect()))
}
pub async fn connect_onion(
&self,
addr: &OnionAddress,
port: u16,
) -> Result<Box<dyn ReadWriter + Unpin + Send + Sync + 'static>, Error> {
if let Some(target) = self.0.services.peek(|s| {
s.get(addr).and_then(|s| {
s.0.bindings.peek(|b| {
b.get(&port).and_then(|b| {
b.iter()
.find(|(_, rc)| rc.strong_count() > 0)
.map(|(a, _)| *a)
})
})
})
}) {
Ok(Box::new(
TcpStream::connect(target)
.await
.with_kind(ErrorKind::Network)?,
))
} else {
let mut client = self.0.client.clone();
client
.wait_for(|(_, c)| c.bootstrap_status().ready_for_traffic())
.await;
let stream = client
.read()
.1
.connect((addr.to_string(), port))
.await
.with_kind(ErrorKind::Tor)?;
Ok(Box::new(stream))
}
}
}
@@ -463,7 +710,7 @@ struct OnionServiceData {
}
impl OnionService {
fn launch(
client: Arc<SyncRwLock<TorClient<TokioRustlsRuntime>>>,
mut client: Watch<(usize, TorClient<TokioRustlsRuntime>)>,
key: TorSecretKey,
) -> Result<Self, Error> {
let service = Arc::new(SyncMutex::new(None));
@@ -480,7 +727,12 @@ impl OnionService {
.run_while(async {
loop {
if let Err(e) = async {
let (new_service, stream) = client.peek(|c| {
client.wait_for(|(_,c)| c.bootstrap_status().ready_for_traffic()).await;
let epoch = client.peek(|(e, c)| {
ensure_code!(c.bootstrap_status().ready_for_traffic(), ErrorKind::Tor, "client recycled");
Ok::<_, Error>(*e)
})?;
let (new_service, stream) = client.peek(|(_, c)| {
c.launch_onion_service_with_hsid(
OnionServiceConfigBuilder::default()
.nickname(
@@ -496,19 +748,18 @@ impl OnionService {
)
.with_kind(ErrorKind::Tor)
})?;
let addr = new_service.onion_address().map(|a| safelog::DisplayRedacted::display_unredacted(&a).to_string());
let mut status_stream = new_service.status_events();
bg.add_job(async move {
while let Some(status) = status_stream.next().await {
tracing::debug!("{addr:?} status: {status:?}");
if let Some(err) = status.current_problem() {
tracing::error!("{err:?}");
}
// TODO: health daemon?
}
});
service.replace(Some(new_service));
let mut stream = tor_hsservice::handle_rend_requests(stream);
while let Some(req) = stream.next().await {
while let Some(req) = tokio::select! {
req = stream.next() => req,
_ = client.wait_for(|(e, _)| *e != epoch) => None
} {
bg.add_job({
let bg = bg.clone();
let bindings = bindings.clone();
@@ -555,28 +806,24 @@ impl OnionService {
)
.await
{
tracing::error!("{e}");
tracing::error!("Tor Stream Error: {e}");
tracing::debug!("{e:?}");
}
incoming.flush().await?;
outgoing.flush().await?;
incoming.shutdown().await?;
outgoing.shutdown().await?;
Ok::<_, Error>(())
}
.await
{
tracing::error!("{e}");
tracing::debug!("{e:?}");
tracing::trace!("Tor Stream Error: {e}");
tracing::trace!("{e:?}");
}
});
Ok::<_, Error>(())
}
.await
{
tracing::error!("{e}");
tracing::debug!("{e:?}");
tracing::trace!("Tor Request Error: {e}");
tracing::trace!("{e:?}");
}
}
});
@@ -585,7 +832,7 @@ impl OnionService {
}
.await
{
tracing::error!("{e}");
tracing::error!("Tor Client Error: {e}");
tracing::debug!("{e:?}");
}
}
@@ -639,4 +886,19 @@ impl OnionService {
.peek(|s| s.as_ref().map(|s| s.status().state().into()))
.unwrap_or(OnionServiceState::Bootstrapping)
}
pub fn info(&self) -> OnionServiceInfo {
OnionServiceInfo {
state: self.state(),
bindings: self.0.bindings.peek(|b| {
b.iter()
.filter_map(|(port, b)| {
b.iter()
.find(|(_, rc)| rc.strong_count() > 0)
.map(|(addr, _)| (*port, *addr))
})
.collect()
}),
}
}
}

View File

@@ -11,14 +11,14 @@ use crate::service::effects::prelude::*;
use crate::service::persistent_container::Subcontainer;
use crate::util::Invoke;
#[cfg(feature = "start-container")]
#[cfg(feature = "cli-container")]
mod sync;
#[cfg(not(feature = "start-container"))]
#[cfg(not(feature = "cli-container"))]
mod sync_dummy;
pub use sync::*;
#[cfg(not(feature = "start-container"))]
#[cfg(not(feature = "cli-container"))]
use sync_dummy as sync;
#[derive(Debug, Deserialize, Serialize, Parser, TS)]

View File

@@ -6,8 +6,8 @@ use std::os::unix::prelude::MetadataExt;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::task::{Poll, Waker};
use std::time::Duration;
@@ -22,7 +22,7 @@ use nix::unistd::{Gid, Uid};
use serde::{Deserialize, Serialize};
use tokio::fs::{File, OpenOptions};
use tokio::io::{
AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, DuplexStream, ReadBuf, WriteHalf, duplex,
duplex, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, DuplexStream, ReadBuf, WriteHalf,
};
use tokio::net::TcpStream;
use tokio::sync::{Notify, OwnedMutexGuard};
@@ -1528,6 +1528,9 @@ impl ValueParserFactory for TermSize {
}
}
pub trait ReadWriter: AsyncRead + AsyncWrite {}
impl<T: AsyncRead + AsyncWrite> ReadWriter for T {}
#[instrument(skip_all)]
async fn wait_for_created(stream: &mut EventStream<[u8; 1024]>, path: &Path) -> Result<(), Error> {
let parent = stream