From 68141112b7d08182e14a00f8515902743f965c92 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Tue, 17 Feb 2026 12:52:24 -0700 Subject: [PATCH] feat: per-service and default outbound gateway routing Add set-outbound-gateway RPC for packages and set-default-outbound RPC for the server, with policy routing enforcement via ip rules. Fix connmark restore to skip packets with existing fwmarks, add bridge subnet routes to per-interface tables, and fix squashfs path in update-image-local.sh. --- container-runtime/update-image-local.sh | 2 +- core/src/db/model/package.rs | 1 - core/src/lib.rs | 8 + core/src/net/gateway.rs | 251 ++++++++++++++++++++++-- core/src/net/net_controller.rs | 159 +++++++++++++-- core/src/service/mod.rs | 1 + 6 files changed, 387 insertions(+), 35 deletions(-) diff --git a/container-runtime/update-image-local.sh b/container-runtime/update-image-local.sh index 20dc7a9ef..14df9e325 100755 --- a/container-runtime/update-image-local.sh +++ b/container-runtime/update-image-local.sh @@ -16,6 +16,6 @@ case $ARCH in esac docker run --rm $USE_TTY --platform=$DOCKER_PLATFORM -eARCH --privileged -v "$(pwd):/root/start-os" start9/build-env /root/start-os/container-runtime/update-image.sh -if [ "$(ls -nd "rootfs.${ARCH}.squashfs" | awk '{ print $3 }')" != "$UID" ]; then +if [ "$(ls -nd "container-runtime/rootfs.${ARCH}.squashfs" | awk '{ print $3 }')" != "$UID" ]; then docker run --rm $USE_TTY -v "$(pwd):/root/start-os" start9/build-env chown -R $UID:$UID /root/start-os/container-runtime fi \ No newline at end of file diff --git a/core/src/db/model/package.rs b/core/src/db/model/package.rs index 63db0b8ac..928d1d934 100644 --- a/core/src/db/model/package.rs +++ b/core/src/db/model/package.rs @@ -381,7 +381,6 @@ pub struct PackageDataEntry { pub hosts: Hosts, #[ts(type = "string[]")] pub store_exposed_dependents: Vec, - #[serde(default)] #[ts(type = "string | null")] pub outbound_gateway: Option, } diff --git a/core/src/lib.rs b/core/src/lib.rs index 7fc1853e3..9aa4d58ad 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -555,4 +555,12 @@ pub fn package() -> ParentHandler { "host", net::host::host_api::().with_about("about.manage-network-hosts-package"), ) + .subcommand( + "set-outbound-gateway", + from_fn_async(net::gateway::set_outbound_gateway) + .with_metadata("sync_db", Value::Bool(true)) + .no_display() + .with_about("about.set-outbound-gateway-package") + .with_call_remote::(), + ) } diff --git a/core/src/net/gateway.rs b/core/src/net/gateway.rs index 73d092080..571a7068b 100644 --- a/core/src/net/gateway.rs +++ b/core/src/net/gateway.rs @@ -8,7 +8,7 @@ use std::time::Duration; use clap::Parser; use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; use imbl::{OrdMap, OrdSet}; -use imbl_value::InternedString; +use imbl_value::{InternedString, Value}; use ipnet::IpNet; use itertools::Itertools; use nix::net::if_::if_nametoindex; @@ -28,13 +28,12 @@ use zbus::zvariant::{ }; use zbus::{Connection, proxy}; -use crate::GatewayId; use crate::context::{CliContext, RpcContext}; use crate::db::model::Database; use crate::db::model::public::{IpInfo, NetworkInterfaceInfo, NetworkInterfaceType}; use crate::net::forward::START9_BRIDGE_IFACE; -use crate::net::host::all_hosts; use crate::net::gateway::device::DeviceProxy; +use crate::net::host::all_hosts; use crate::net::web_server::{Accept, AcceptStream, MetadataVisitor, TcpMetadata}; use crate::prelude::*; use crate::util::Invoke; @@ -43,6 +42,7 @@ use crate::util::future::{NonDetachingJoinHandle, Until}; use crate::util::io::open_file; use crate::util::serde::{HandlerExtSerde, display_serializable}; use crate::util::sync::Watch; +use crate::{GatewayId, PackageId}; pub fn gateway_api() -> ParentHandler { ParentHandler::new() @@ -118,6 +118,14 @@ pub fn gateway_api() -> ParentHandler { .with_about("about.check-port-reachability") .with_call_remote::(), ) + .subcommand( + "set-default-outbound", + from_fn_async(set_default_outbound) + .with_metadata("sync_db", Value::Bool(true)) + .no_display() + .with_about("about.set-default-outbound-gateway") + .with_call_remote::(), + ) } async fn list_interfaces( @@ -180,23 +188,16 @@ async fn check_port( CheckPortParams { port, gateway }: CheckPortParams, ) -> Result { let db = ctx.db.peek().await; - let base_url = db - .as_public() - .as_server_info() - .as_ifconfig_url() - .de()?; + let base_url = db.as_public().as_server_info().as_ifconfig_url().de()?; let gateways = db .as_public() .as_server_info() .as_network() .as_gateways() .de()?; - let gw_info = gateways.get(&gateway).ok_or_else(|| { - Error::new( - eyre!("unknown gateway: {gateway}"), - ErrorKind::NotFound, - ) - })?; + let gw_info = gateways + .get(&gateway) + .ok_or_else(|| Error::new(eyre!("unknown gateway: {gateway}"), ErrorKind::NotFound))?; let ip_info = gw_info.ip_info.as_ref().ok_or_else(|| { Error::new( eyre!("gateway {gateway} has no IP info"), @@ -223,6 +224,79 @@ async fn check_port( Ok(res) } +#[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export)] +struct SetDefaultOutboundParams { + #[arg(help = "help.arg.gateway-id")] + gateway: Option, +} + +async fn set_default_outbound( + ctx: RpcContext, + SetDefaultOutboundParams { gateway }: SetDefaultOutboundParams, +) -> Result<(), Error> { + if let Some(ref gw) = gateway { + let ip_info = ctx.net_controller.net_iface.watcher.ip_info(); + let info = ip_info + .get(gw) + .ok_or_else(|| Error::new(eyre!("unknown gateway: {gw}"), ErrorKind::NotFound))?; + ensure_code!( + info.ip_info.is_some(), + ErrorKind::InvalidRequest, + "gateway {gw} is not connected" + ); + } + ctx.db + .mutate(|db| { + db.as_public_mut() + .as_server_info_mut() + .as_network_mut() + .as_default_outbound_mut() + .ser(&gateway) + }) + .await + .result +} + +#[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export)] +pub struct SetOutboundGatewayParams { + #[arg(help = "help.arg.package-id")] + package: PackageId, + #[arg(help = "help.arg.gateway-id")] + gateway: Option, +} + +pub async fn set_outbound_gateway( + ctx: RpcContext, + SetOutboundGatewayParams { package, gateway }: SetOutboundGatewayParams, +) -> Result<(), Error> { + if let Some(ref gw) = gateway { + let ip_info = ctx.net_controller.net_iface.watcher.ip_info(); + let info = ip_info + .get(gw) + .ok_or_else(|| Error::new(eyre!("unknown gateway: {gw}"), ErrorKind::NotFound))?; + ensure_code!( + info.ip_info.is_some(), + ErrorKind::InvalidRequest, + "gateway {gw} is not connected" + ); + } + ctx.db + .mutate(|db| { + db.as_public_mut() + .as_package_data_mut() + .as_idx_mut(&package) + .or_not_found(&package)? + .as_outbound_gateway_mut() + .ser(&gateway) + }) + .await + .result +} + #[proxy( interface = "org.freedesktop.NetworkManager", default_service = "org.freedesktop.NetworkManager", @@ -714,6 +788,7 @@ async fn watch_ip( .arg("rule").arg("del") .arg("fwmark").arg(&table_str) .arg("lookup").arg(&table_str) + .arg("priority").arg("50") .invoke(ErrorKind::Network) .await .log_err(); @@ -823,6 +898,15 @@ async fn watch_ip( .await .log_err(); } + // Add bridge subnet so per-service outbound routing + // doesn't break local container traffic + Command::new("ip") + .arg("route").arg("add").arg("10.0.3.0/24") + .arg("dev").arg(START9_BRIDGE_IFACE) + .arg("table").arg(&table_str) + .invoke(ErrorKind::Network) + .await + .log_err(); { let mut cmd = Command::new("ip"); cmd.arg("route").arg("add").arg("default"); @@ -843,10 +927,14 @@ async fn watch_ip( // PREROUTING (forwarded packets) and OUTPUT (locally-generated replies). // Both are needed: PREROUTING handles DNAT-forwarded traffic, // OUTPUT handles replies from locally-bound listeners (e.g. vhost). + // The `-m mark --mark 0` condition ensures we only restore + // when the packet has no existing fwmark, preserving marks + // set by WireGuard on encapsulation packets. for chain in ["PREROUTING", "OUTPUT"] { if !Command::new("iptables") .arg("-t").arg("mangle") .arg("-C").arg(chain) + .arg("-m").arg("mark").arg("--mark").arg("0") .arg("-j").arg("CONNMARK") .arg("--restore-mark") .status().await @@ -855,6 +943,7 @@ async fn watch_ip( Command::new("iptables") .arg("-t").arg("mangle") .arg("-I").arg(chain).arg("1") + .arg("-m").arg("mark").arg("--mark").arg("0") .arg("-j").arg("CONNMARK") .arg("--restore-mark") .invoke(ErrorKind::Network) @@ -904,6 +993,7 @@ async fn watch_ip( .arg("rule").arg("add") .arg("fwmark").arg(&table_str) .arg("lookup").arg(&table_str) + .arg("priority").arg("50") .invoke(ErrorKind::Network) .await .log_err(); @@ -1101,7 +1191,8 @@ impl NetworkInterfaceController { .as_network_mut() .as_gateways_mut() .ser(info)?; - let hostname = crate::hostname::Hostname(db.as_public().as_server_info().as_hostname().de()?); + let hostname = + crate::hostname::Hostname(db.as_public().as_server_info().as_hostname().de()?); let ports = db.as_private().as_available_ports().de()?; for host in all_hosts(db) { host?.update_addresses(&hostname, info, &ports)?; @@ -1161,6 +1252,106 @@ impl NetworkInterfaceController { Ok(()) } + + async fn apply_default_outbound( + default_outbound: &Option, + ip_info: &OrdMap, + ) { + // Clean up all our policy routing rules (loop because multiple may exist + // at the same priority, and `ip rule del` only removes one at a time) + for prio in ["74", "75"] { + loop { + let ok = Command::new("ip") + .arg("rule").arg("del") + .arg("priority").arg(prio) + .status() + .await + .map_or(false, |s| s.success()); + if !ok { + break; + } + } + } + + // Nothing more to do if no default outbound is selected + let Some(gw_id) = default_outbound else { + return; + }; + let Some(info) = ip_info.get(gw_id) else { + tracing::warn!("default outbound gateway {gw_id} not found in ip_info"); + return; + }; + if info.ip_info.is_none() { + tracing::warn!("default outbound gateway {gw_id} is not connected"); + return; + } + + let table_id = match if_nametoindex(gw_id.as_str()) { + Ok(idx) => 1000 + idx, + Err(e) => { + tracing::error!("failed to get ifindex for {gw_id}: {e}"); + return; + } + }; + let table_str = table_id.to_string(); + + // Exempt ALL active WireGuard interfaces' encapsulation packets. + // Our priority-75 catch-all would otherwise swallow their encap traffic + // before NM's fwmark rules at priority 31610 can route it correctly. + for (iface_id, iface_info) in ip_info { + let Some(ref ip) = iface_info.ip_info else { + continue; + }; + if ip.device_type != Some(NetworkInterfaceType::Wireguard) { + continue; + } + match Command::new("wg") + .arg("show").arg(iface_id.as_str()).arg("fwmark") + .invoke(ErrorKind::Network) + .await + { + Ok(output) => { + let fwmark_hex = String::from_utf8_lossy(&output).trim().to_owned(); + if fwmark_hex.is_empty() || fwmark_hex == "off" { + continue; + } + let fwmark = match u32::from_str_radix( + fwmark_hex.strip_prefix("0x").unwrap_or(&fwmark_hex), + 16, + ) { + Ok(v) => v, + Err(e) => { + tracing::error!( + "failed to parse WireGuard fwmark '{fwmark_hex}' for {iface_id}: {e}" + ); + continue; + } + }; + Command::new("ip") + .arg("rule").arg("add") + .arg("fwmark").arg(fwmark.to_string()) + .arg("lookup").arg("main") + .arg("priority").arg("74") + .invoke(ErrorKind::Network) + .await + .log_err(); + } + Err(e) => { + tracing::error!("failed to read WireGuard fwmark for {iface_id}: {e}"); + } + } + } + + // Route all other traffic through the gateway's per-interface table + Command::new("ip") + .arg("rule").arg("add") + .arg("table").arg(&table_str) + .arg("priority").arg("75") + .invoke(ErrorKind::Network) + .await + .log_err(); + } + pub fn new(db: TypedPatchDb) -> Self { let (seeded_send, seeded) = oneshot::channel(); let watcher = NetworkInterfaceWatcher::new( @@ -1209,10 +1400,28 @@ impl NetworkInterfaceController { _sync: tokio::spawn(async move { let res: Result<(), Error> = async { let mut ip_info = seeded.await.ok(); + let mut outbound_sub = db + .subscribe( + "/public/serverInfo/network/defaultOutbound" + .parse::>() + .unwrap(), + ) + .await; loop { if let Err(e) = async { - if let Some(ip_info) = ip_info { - Self::sync(&db, &ip_info).boxed().await?; + if let Some(ref ip_info) = ip_info { + Self::sync(&db, ip_info).boxed().await?; + } + if let Some(ref ip_info) = ip_info { + let default_outbound: Option = db + .peek() + .await + .as_public() + .as_server_info() + .as_network() + .as_default_outbound() + .de()?; + Self::apply_default_outbound(&default_outbound, ip_info).await; } Ok::<_, Error>(()) @@ -1226,8 +1435,12 @@ impl NetworkInterfaceController { tracing::debug!("{e:?}"); } - let _ = ip_info_watch.changed().await; - ip_info = Some(ip_info_watch.read()); + tokio::select! { + _ = ip_info_watch.changed() => { + ip_info = Some(ip_info_watch.read()); + } + _ = outbound_sub.recv() => {} + } } } .await; diff --git a/core/src/net/net_controller.rs b/core/src/net/net_controller.rs index c2624c5e4..def83a49e 100644 --- a/core/src/net/net_controller.rs +++ b/core/src/net/net_controller.rs @@ -4,6 +4,8 @@ use std::sync::{Arc, Weak}; use color_eyre::eyre::eyre; use imbl_value::InternedString; +use nix::net::if_::if_nametoindex; +use tokio::process::Command; use tokio::sync::Mutex; use tokio::task::JoinHandle; use tokio_rustls::rustls::ClientConfig as TlsClientConfig; @@ -26,6 +28,7 @@ use crate::net::socks::SocksController; use crate::net::vhost::{AlpnInfo, DynVHostTarget, ProxyTarget, VHostController}; use crate::prelude::*; use crate::service::effects::callbacks::ServiceCallbacks; +use crate::util::Invoke; use crate::util::serde::MaybeUtf8String; use crate::util::sync::Watch; use crate::{GatewayId, HOST_IP, HostId, OptionExt, PackageId}; @@ -466,36 +469,150 @@ impl NetService { let synced = Watch::new(0u64); let synced_writer = synced.clone(); + let ip = data.ip; let data = Arc::new(Mutex::new(data)); let thread_data = data.clone(); - let sync_task = tokio::spawn(async move { if let Some(ref id) = pkg_id { let ptr: JsonPointer = format!("/public/packageData/{}/hosts", id) .parse() .unwrap(); let mut watch = db.watch(ptr).await.typed::(); + + // Outbound gateway enforcement + let service_ip = ip.to_string(); + // Purge any stale rules from a previous instance loop { - if let Err(e) = watch.changed().await { - tracing::error!("DB watch disconnected for {id}: {e}"); + if !Command::new("ip") + .arg("rule").arg("del") + .arg("from").arg(&service_ip) + .arg("priority").arg("100") + .status() + .await + .map_or(false, |s| s.success()) + { break; } - if let Err(e) = async { - let hosts = watch.peek()?.de()?; - let mut data = thread_data.lock().await; - let ctrl = data.net_controller()?; - for (host_id, host) in hosts.0 { - data.update(&*ctrl, host_id, host).await?; + } + let mut outbound_sub = db + .subscribe( + format!("/public/packageData/{}/outboundGateway", id) + .parse::>() + .unwrap(), + ) + .await; + let ctrl_for_ip = thread_data.lock().await.net_controller().ok(); + let mut ip_info_watch = ctrl_for_ip + .as_ref() + .map(|c| c.net_iface.watcher.subscribe()); + if let Some(ref mut w) = ip_info_watch { + w.mark_seen(); + } + drop(ctrl_for_ip); + let mut current_outbound_table: Option = None; + + loop { + let (hosts_changed, outbound_changed) = tokio::select! { + res = watch.changed() => { + if let Err(e) = res { + tracing::error!("DB watch disconnected for {id}: {e}"); + break; + } + (true, false) + } + _ = outbound_sub.recv() => (false, true), + _ = async { + if let Some(ref mut w) = ip_info_watch { + w.changed().await; + } else { + std::future::pending::<()>().await; + } + } => (false, true), + }; + + // Handle host updates + if hosts_changed { + if let Err(e) = async { + let hosts = watch.peek()?.de()?; + let mut data = thread_data.lock().await; + let ctrl = data.net_controller()?; + for (host_id, host) in hosts.0 { + data.update(&*ctrl, host_id, host).await?; + } + Ok::<_, Error>(()) + } + .await + { + tracing::error!("Failed to update network info for {id}: {e}"); + tracing::debug!("{e:?}"); } - Ok::<_, Error>(()) } - .await - { - tracing::error!("Failed to update network info for {id}: {e}"); - tracing::debug!("{e:?}"); + + // Handle outbound gateway changes + if outbound_changed { + if let Err(e) = async { + // Remove old rule if any + if let Some(old_table) = current_outbound_table.take() { + let old_table_str = old_table.to_string(); + let _ = Command::new("ip") + .arg("rule").arg("del") + .arg("from").arg(&service_ip) + .arg("lookup").arg(&old_table_str) + .arg("priority").arg("100") + .status() + .await; + } + // Read current outbound gateway from DB + let outbound_gw: Option = db + .peek() + .await + .as_public() + .as_package_data() + .as_idx(id) + .map(|p| p.as_outbound_gateway().de().ok()) + .flatten() + .flatten(); + if let Some(gw_id) = outbound_gw { + // Look up table ID for this gateway + if let Some(table_id) = if_nametoindex(gw_id.as_str()) + .map(|idx| 1000 + idx) + .log_err() + { + let table_str = table_id.to_string(); + Command::new("ip") + .arg("rule").arg("add") + .arg("from").arg(&service_ip) + .arg("lookup").arg(&table_str) + .arg("priority").arg("100") + .invoke(ErrorKind::Network) + .await + .log_err(); + current_outbound_table = Some(table_id); + } + } + Ok::<_, Error>(()) + } + .await + { + tracing::error!("Failed to update outbound gateway for {id}: {e}"); + tracing::debug!("{e:?}"); + } } + synced_writer.send_modify(|v| *v += 1); } + + // Cleanup outbound rule on task exit + if let Some(table_id) = current_outbound_table { + let table_str = table_id.to_string(); + let _ = Command::new("ip") + .arg("rule").arg("del") + .arg("from").arg(&service_ip) + .arg("lookup").arg(&table_str) + .arg("priority").arg("100") + .status() + .await; + } } else { let ptr: JsonPointer = "/public/serverInfo/network/host".parse().unwrap(); let mut watch = db.watch(ptr).await.typed::(); @@ -642,6 +759,20 @@ impl NetService { let mut w = self.synced.clone(); w.wait_for(|v| *v > current).await; self.sync_task.abort(); + // Clean up any outbound gateway ip rules for this service + let service_ip = self.data.lock().await.ip.to_string(); + loop { + if !Command::new("ip") + .arg("rule").arg("del") + .arg("from").arg(&service_ip) + .arg("priority").arg("100") + .status() + .await + .map_or(false, |s| s.success()) + { + break; + } + } self.shutdown = true; Ok(()) } diff --git a/core/src/service/mod.rs b/core/src/service/mod.rs index cc977cdfb..a2a8b58e0 100644 --- a/core/src/service/mod.rs +++ b/core/src/service/mod.rs @@ -1198,6 +1198,7 @@ pub async fn cli_attach( let prompt = e.to_string(); let options: Vec = from_value(e.info)?; let choice = choose(&prompt, &options).await?; + println!(); params["subcontainer"] = to_value(&choice.id)?; context .call_remote::(&method, params.clone())