feat: per-service and default outbound gateway routing

Add set-outbound-gateway RPC for packages and set-default-outbound RPC
for the server, with policy routing enforcement via ip rules. Fix
connmark restore to skip packets with existing fwmarks, add bridge
subnet routes to per-interface tables, and fix squashfs path in
update-image-local.sh.
This commit is contained in:
Aiden McClelland
2026-02-17 12:52:24 -07:00
parent ccafb599a6
commit 68141112b7
6 changed files with 387 additions and 35 deletions

View File

@@ -381,7 +381,6 @@ pub struct PackageDataEntry {
pub hosts: Hosts,
#[ts(type = "string[]")]
pub store_exposed_dependents: Vec<JsonPointer>,
#[serde(default)]
#[ts(type = "string | null")]
pub outbound_gateway: Option<GatewayId>,
}

View File

@@ -555,4 +555,12 @@ pub fn package<C: Context>() -> ParentHandler<C> {
"host",
net::host::host_api::<C>().with_about("about.manage-network-hosts-package"),
)
.subcommand(
"set-outbound-gateway",
from_fn_async(net::gateway::set_outbound_gateway)
.with_metadata("sync_db", Value::Bool(true))
.no_display()
.with_about("about.set-outbound-gateway-package")
.with_call_remote::<CliContext>(),
)
}

View File

@@ -8,7 +8,7 @@ use std::time::Duration;
use clap::Parser;
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
use imbl::{OrdMap, OrdSet};
use imbl_value::InternedString;
use imbl_value::{InternedString, Value};
use ipnet::IpNet;
use itertools::Itertools;
use nix::net::if_::if_nametoindex;
@@ -28,13 +28,12 @@ use zbus::zvariant::{
};
use zbus::{Connection, proxy};
use crate::GatewayId;
use crate::context::{CliContext, RpcContext};
use crate::db::model::Database;
use crate::db::model::public::{IpInfo, NetworkInterfaceInfo, NetworkInterfaceType};
use crate::net::forward::START9_BRIDGE_IFACE;
use crate::net::host::all_hosts;
use crate::net::gateway::device::DeviceProxy;
use crate::net::host::all_hosts;
use crate::net::web_server::{Accept, AcceptStream, MetadataVisitor, TcpMetadata};
use crate::prelude::*;
use crate::util::Invoke;
@@ -43,6 +42,7 @@ use crate::util::future::{NonDetachingJoinHandle, Until};
use crate::util::io::open_file;
use crate::util::serde::{HandlerExtSerde, display_serializable};
use crate::util::sync::Watch;
use crate::{GatewayId, PackageId};
pub fn gateway_api<C: Context>() -> ParentHandler<C> {
ParentHandler::new()
@@ -118,6 +118,14 @@ pub fn gateway_api<C: Context>() -> ParentHandler<C> {
.with_about("about.check-port-reachability")
.with_call_remote::<CliContext>(),
)
.subcommand(
"set-default-outbound",
from_fn_async(set_default_outbound)
.with_metadata("sync_db", Value::Bool(true))
.no_display()
.with_about("about.set-default-outbound-gateway")
.with_call_remote::<CliContext>(),
)
}
async fn list_interfaces(
@@ -180,23 +188,16 @@ async fn check_port(
CheckPortParams { port, gateway }: CheckPortParams,
) -> Result<CheckPortRes, Error> {
let db = ctx.db.peek().await;
let base_url = db
.as_public()
.as_server_info()
.as_ifconfig_url()
.de()?;
let base_url = db.as_public().as_server_info().as_ifconfig_url().de()?;
let gateways = db
.as_public()
.as_server_info()
.as_network()
.as_gateways()
.de()?;
let gw_info = gateways.get(&gateway).ok_or_else(|| {
Error::new(
eyre!("unknown gateway: {gateway}"),
ErrorKind::NotFound,
)
})?;
let gw_info = gateways
.get(&gateway)
.ok_or_else(|| Error::new(eyre!("unknown gateway: {gateway}"), ErrorKind::NotFound))?;
let ip_info = gw_info.ip_info.as_ref().ok_or_else(|| {
Error::new(
eyre!("gateway {gateway} has no IP info"),
@@ -223,6 +224,79 @@ async fn check_port(
Ok(res)
}
#[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
struct SetDefaultOutboundParams {
#[arg(help = "help.arg.gateway-id")]
gateway: Option<GatewayId>,
}
async fn set_default_outbound(
ctx: RpcContext,
SetDefaultOutboundParams { gateway }: SetDefaultOutboundParams,
) -> Result<(), Error> {
if let Some(ref gw) = gateway {
let ip_info = ctx.net_controller.net_iface.watcher.ip_info();
let info = ip_info
.get(gw)
.ok_or_else(|| Error::new(eyre!("unknown gateway: {gw}"), ErrorKind::NotFound))?;
ensure_code!(
info.ip_info.is_some(),
ErrorKind::InvalidRequest,
"gateway {gw} is not connected"
);
}
ctx.db
.mutate(|db| {
db.as_public_mut()
.as_server_info_mut()
.as_network_mut()
.as_default_outbound_mut()
.ser(&gateway)
})
.await
.result
}
#[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct SetOutboundGatewayParams {
#[arg(help = "help.arg.package-id")]
package: PackageId,
#[arg(help = "help.arg.gateway-id")]
gateway: Option<GatewayId>,
}
pub async fn set_outbound_gateway(
ctx: RpcContext,
SetOutboundGatewayParams { package, gateway }: SetOutboundGatewayParams,
) -> Result<(), Error> {
if let Some(ref gw) = gateway {
let ip_info = ctx.net_controller.net_iface.watcher.ip_info();
let info = ip_info
.get(gw)
.ok_or_else(|| Error::new(eyre!("unknown gateway: {gw}"), ErrorKind::NotFound))?;
ensure_code!(
info.ip_info.is_some(),
ErrorKind::InvalidRequest,
"gateway {gw} is not connected"
);
}
ctx.db
.mutate(|db| {
db.as_public_mut()
.as_package_data_mut()
.as_idx_mut(&package)
.or_not_found(&package)?
.as_outbound_gateway_mut()
.ser(&gateway)
})
.await
.result
}
#[proxy(
interface = "org.freedesktop.NetworkManager",
default_service = "org.freedesktop.NetworkManager",
@@ -714,6 +788,7 @@ async fn watch_ip(
.arg("rule").arg("del")
.arg("fwmark").arg(&table_str)
.arg("lookup").arg(&table_str)
.arg("priority").arg("50")
.invoke(ErrorKind::Network)
.await
.log_err();
@@ -823,6 +898,15 @@ async fn watch_ip(
.await
.log_err();
}
// Add bridge subnet so per-service outbound routing
// doesn't break local container traffic
Command::new("ip")
.arg("route").arg("add").arg("10.0.3.0/24")
.arg("dev").arg(START9_BRIDGE_IFACE)
.arg("table").arg(&table_str)
.invoke(ErrorKind::Network)
.await
.log_err();
{
let mut cmd = Command::new("ip");
cmd.arg("route").arg("add").arg("default");
@@ -843,10 +927,14 @@ async fn watch_ip(
// PREROUTING (forwarded packets) and OUTPUT (locally-generated replies).
// Both are needed: PREROUTING handles DNAT-forwarded traffic,
// OUTPUT handles replies from locally-bound listeners (e.g. vhost).
// The `-m mark --mark 0` condition ensures we only restore
// when the packet has no existing fwmark, preserving marks
// set by WireGuard on encapsulation packets.
for chain in ["PREROUTING", "OUTPUT"] {
if !Command::new("iptables")
.arg("-t").arg("mangle")
.arg("-C").arg(chain)
.arg("-m").arg("mark").arg("--mark").arg("0")
.arg("-j").arg("CONNMARK")
.arg("--restore-mark")
.status().await
@@ -855,6 +943,7 @@ async fn watch_ip(
Command::new("iptables")
.arg("-t").arg("mangle")
.arg("-I").arg(chain).arg("1")
.arg("-m").arg("mark").arg("--mark").arg("0")
.arg("-j").arg("CONNMARK")
.arg("--restore-mark")
.invoke(ErrorKind::Network)
@@ -904,6 +993,7 @@ async fn watch_ip(
.arg("rule").arg("add")
.arg("fwmark").arg(&table_str)
.arg("lookup").arg(&table_str)
.arg("priority").arg("50")
.invoke(ErrorKind::Network)
.await
.log_err();
@@ -1101,7 +1191,8 @@ impl NetworkInterfaceController {
.as_network_mut()
.as_gateways_mut()
.ser(info)?;
let hostname = crate::hostname::Hostname(db.as_public().as_server_info().as_hostname().de()?);
let hostname =
crate::hostname::Hostname(db.as_public().as_server_info().as_hostname().de()?);
let ports = db.as_private().as_available_ports().de()?;
for host in all_hosts(db) {
host?.update_addresses(&hostname, info, &ports)?;
@@ -1161,6 +1252,106 @@ impl NetworkInterfaceController {
Ok(())
}
async fn apply_default_outbound(
default_outbound: &Option<GatewayId>,
ip_info: &OrdMap<GatewayId, NetworkInterfaceInfo>,
) {
// Clean up all our policy routing rules (loop because multiple may exist
// at the same priority, and `ip rule del` only removes one at a time)
for prio in ["74", "75"] {
loop {
let ok = Command::new("ip")
.arg("rule").arg("del")
.arg("priority").arg(prio)
.status()
.await
.map_or(false, |s| s.success());
if !ok {
break;
}
}
}
// Nothing more to do if no default outbound is selected
let Some(gw_id) = default_outbound else {
return;
};
let Some(info) = ip_info.get(gw_id) else {
tracing::warn!("default outbound gateway {gw_id} not found in ip_info");
return;
};
if info.ip_info.is_none() {
tracing::warn!("default outbound gateway {gw_id} is not connected");
return;
}
let table_id = match if_nametoindex(gw_id.as_str()) {
Ok(idx) => 1000 + idx,
Err(e) => {
tracing::error!("failed to get ifindex for {gw_id}: {e}");
return;
}
};
let table_str = table_id.to_string();
// Exempt ALL active WireGuard interfaces' encapsulation packets.
// Our priority-75 catch-all would otherwise swallow their encap traffic
// before NM's fwmark rules at priority 31610 can route it correctly.
for (iface_id, iface_info) in ip_info {
let Some(ref ip) = iface_info.ip_info else {
continue;
};
if ip.device_type != Some(NetworkInterfaceType::Wireguard) {
continue;
}
match Command::new("wg")
.arg("show").arg(iface_id.as_str()).arg("fwmark")
.invoke(ErrorKind::Network)
.await
{
Ok(output) => {
let fwmark_hex = String::from_utf8_lossy(&output).trim().to_owned();
if fwmark_hex.is_empty() || fwmark_hex == "off" {
continue;
}
let fwmark = match u32::from_str_radix(
fwmark_hex.strip_prefix("0x").unwrap_or(&fwmark_hex),
16,
) {
Ok(v) => v,
Err(e) => {
tracing::error!(
"failed to parse WireGuard fwmark '{fwmark_hex}' for {iface_id}: {e}"
);
continue;
}
};
Command::new("ip")
.arg("rule").arg("add")
.arg("fwmark").arg(fwmark.to_string())
.arg("lookup").arg("main")
.arg("priority").arg("74")
.invoke(ErrorKind::Network)
.await
.log_err();
}
Err(e) => {
tracing::error!("failed to read WireGuard fwmark for {iface_id}: {e}");
}
}
}
// Route all other traffic through the gateway's per-interface table
Command::new("ip")
.arg("rule").arg("add")
.arg("table").arg(&table_str)
.arg("priority").arg("75")
.invoke(ErrorKind::Network)
.await
.log_err();
}
pub fn new(db: TypedPatchDb<Database>) -> Self {
let (seeded_send, seeded) = oneshot::channel();
let watcher = NetworkInterfaceWatcher::new(
@@ -1209,10 +1400,28 @@ impl NetworkInterfaceController {
_sync: tokio::spawn(async move {
let res: Result<(), Error> = async {
let mut ip_info = seeded.await.ok();
let mut outbound_sub = db
.subscribe(
"/public/serverInfo/network/defaultOutbound"
.parse::<JsonPointer<_, _>>()
.unwrap(),
)
.await;
loop {
if let Err(e) = async {
if let Some(ip_info) = ip_info {
Self::sync(&db, &ip_info).boxed().await?;
if let Some(ref ip_info) = ip_info {
Self::sync(&db, ip_info).boxed().await?;
}
if let Some(ref ip_info) = ip_info {
let default_outbound: Option<GatewayId> = db
.peek()
.await
.as_public()
.as_server_info()
.as_network()
.as_default_outbound()
.de()?;
Self::apply_default_outbound(&default_outbound, ip_info).await;
}
Ok::<_, Error>(())
@@ -1226,8 +1435,12 @@ impl NetworkInterfaceController {
tracing::debug!("{e:?}");
}
let _ = ip_info_watch.changed().await;
ip_info = Some(ip_info_watch.read());
tokio::select! {
_ = ip_info_watch.changed() => {
ip_info = Some(ip_info_watch.read());
}
_ = outbound_sub.recv() => {}
}
}
}
.await;

View File

@@ -4,6 +4,8 @@ use std::sync::{Arc, Weak};
use color_eyre::eyre::eyre;
use imbl_value::InternedString;
use nix::net::if_::if_nametoindex;
use tokio::process::Command;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio_rustls::rustls::ClientConfig as TlsClientConfig;
@@ -26,6 +28,7 @@ use crate::net::socks::SocksController;
use crate::net::vhost::{AlpnInfo, DynVHostTarget, ProxyTarget, VHostController};
use crate::prelude::*;
use crate::service::effects::callbacks::ServiceCallbacks;
use crate::util::Invoke;
use crate::util::serde::MaybeUtf8String;
use crate::util::sync::Watch;
use crate::{GatewayId, HOST_IP, HostId, OptionExt, PackageId};
@@ -466,36 +469,150 @@ impl NetService {
let synced = Watch::new(0u64);
let synced_writer = synced.clone();
let ip = data.ip;
let data = Arc::new(Mutex::new(data));
let thread_data = data.clone();
let sync_task = tokio::spawn(async move {
if let Some(ref id) = pkg_id {
let ptr: JsonPointer = format!("/public/packageData/{}/hosts", id)
.parse()
.unwrap();
let mut watch = db.watch(ptr).await.typed::<Hosts>();
// Outbound gateway enforcement
let service_ip = ip.to_string();
// Purge any stale rules from a previous instance
loop {
if let Err(e) = watch.changed().await {
tracing::error!("DB watch disconnected for {id}: {e}");
if !Command::new("ip")
.arg("rule").arg("del")
.arg("from").arg(&service_ip)
.arg("priority").arg("100")
.status()
.await
.map_or(false, |s| s.success())
{
break;
}
if let Err(e) = async {
let hosts = watch.peek()?.de()?;
let mut data = thread_data.lock().await;
let ctrl = data.net_controller()?;
for (host_id, host) in hosts.0 {
data.update(&*ctrl, host_id, host).await?;
}
let mut outbound_sub = db
.subscribe(
format!("/public/packageData/{}/outboundGateway", id)
.parse::<JsonPointer<_, _>>()
.unwrap(),
)
.await;
let ctrl_for_ip = thread_data.lock().await.net_controller().ok();
let mut ip_info_watch = ctrl_for_ip
.as_ref()
.map(|c| c.net_iface.watcher.subscribe());
if let Some(ref mut w) = ip_info_watch {
w.mark_seen();
}
drop(ctrl_for_ip);
let mut current_outbound_table: Option<u32> = None;
loop {
let (hosts_changed, outbound_changed) = tokio::select! {
res = watch.changed() => {
if let Err(e) = res {
tracing::error!("DB watch disconnected for {id}: {e}");
break;
}
(true, false)
}
_ = outbound_sub.recv() => (false, true),
_ = async {
if let Some(ref mut w) = ip_info_watch {
w.changed().await;
} else {
std::future::pending::<()>().await;
}
} => (false, true),
};
// Handle host updates
if hosts_changed {
if let Err(e) = async {
let hosts = watch.peek()?.de()?;
let mut data = thread_data.lock().await;
let ctrl = data.net_controller()?;
for (host_id, host) in hosts.0 {
data.update(&*ctrl, host_id, host).await?;
}
Ok::<_, Error>(())
}
.await
{
tracing::error!("Failed to update network info for {id}: {e}");
tracing::debug!("{e:?}");
}
Ok::<_, Error>(())
}
.await
{
tracing::error!("Failed to update network info for {id}: {e}");
tracing::debug!("{e:?}");
// Handle outbound gateway changes
if outbound_changed {
if let Err(e) = async {
// Remove old rule if any
if let Some(old_table) = current_outbound_table.take() {
let old_table_str = old_table.to_string();
let _ = Command::new("ip")
.arg("rule").arg("del")
.arg("from").arg(&service_ip)
.arg("lookup").arg(&old_table_str)
.arg("priority").arg("100")
.status()
.await;
}
// Read current outbound gateway from DB
let outbound_gw: Option<GatewayId> = db
.peek()
.await
.as_public()
.as_package_data()
.as_idx(id)
.map(|p| p.as_outbound_gateway().de().ok())
.flatten()
.flatten();
if let Some(gw_id) = outbound_gw {
// Look up table ID for this gateway
if let Some(table_id) = if_nametoindex(gw_id.as_str())
.map(|idx| 1000 + idx)
.log_err()
{
let table_str = table_id.to_string();
Command::new("ip")
.arg("rule").arg("add")
.arg("from").arg(&service_ip)
.arg("lookup").arg(&table_str)
.arg("priority").arg("100")
.invoke(ErrorKind::Network)
.await
.log_err();
current_outbound_table = Some(table_id);
}
}
Ok::<_, Error>(())
}
.await
{
tracing::error!("Failed to update outbound gateway for {id}: {e}");
tracing::debug!("{e:?}");
}
}
synced_writer.send_modify(|v| *v += 1);
}
// Cleanup outbound rule on task exit
if let Some(table_id) = current_outbound_table {
let table_str = table_id.to_string();
let _ = Command::new("ip")
.arg("rule").arg("del")
.arg("from").arg(&service_ip)
.arg("lookup").arg(&table_str)
.arg("priority").arg("100")
.status()
.await;
}
} else {
let ptr: JsonPointer = "/public/serverInfo/network/host".parse().unwrap();
let mut watch = db.watch(ptr).await.typed::<Host>();
@@ -642,6 +759,20 @@ impl NetService {
let mut w = self.synced.clone();
w.wait_for(|v| *v > current).await;
self.sync_task.abort();
// Clean up any outbound gateway ip rules for this service
let service_ip = self.data.lock().await.ip.to_string();
loop {
if !Command::new("ip")
.arg("rule").arg("del")
.arg("from").arg(&service_ip)
.arg("priority").arg("100")
.status()
.await
.map_or(false, |s| s.success())
{
break;
}
}
self.shutdown = true;
Ok(())
}

View File

@@ -1198,6 +1198,7 @@ pub async fn cli_attach(
let prompt = e.to_string();
let options: Vec<SubcontainerInfo> = from_value(e.info)?;
let choice = choose(&prompt, &options).await?;
println!();
params["subcontainer"] = to_value(&choice.id)?;
context
.call_remote::<RpcContext>(&method, params.clone())