feat: replace InterfaceFilter with ForwardRequirements, add WildcardListener, complete alpha.20 bump

- Replace DynInterfaceFilter with ForwardRequirements for per-IP forward
  precision with source-subnet iptables filtering for private forwards
- Add WildcardListener (binds [::]:port) to replace the per-gateway
  NetworkInterfaceListener/SelfContainedNetworkInterfaceListener/
  UpgradableListener infrastructure
- Update forward-port script with src_subnet and excluded_src env vars
- Remove unused filter types and listener infrastructure from gateway.rs
- Add availablePorts migration (IdPool -> BTreeMap<u16, bool>) to alpha.20
- Complete version bump to 0.4.0-alpha.20 in SDK and web
This commit is contained in:
Aiden McClelland
2026-02-11 18:10:27 -07:00
parent 4e638fb58e
commit 2a54625f43
19 changed files with 714 additions and 896 deletions

View File

@@ -1,14 +1,11 @@
use std::any::Any;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt;
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV6};
use std::sync::{Arc, Weak};
use std::task::{Poll, ready};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use clap::Parser;
use futures::future::Either;
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
use imbl::{OrdMap, OrdSet};
use imbl_value::InternedString;
@@ -36,15 +33,14 @@ use crate::db::model::Database;
use crate::db::model::public::{IpInfo, NetworkInterfaceInfo, NetworkInterfaceType};
use crate::net::forward::START9_BRIDGE_IFACE;
use crate::net::gateway::device::DeviceProxy;
use crate::net::utils::ipv6_is_link_local;
use crate::net::web_server::{Accept, AcceptStream, Acceptor, MetadataVisitor};
use crate::net::web_server::{Accept, AcceptStream, MetadataVisitor, TcpMetadata};
use crate::prelude::*;
use crate::util::Invoke;
use crate::util::collections::OrdMapIterMut;
use crate::util::future::{NonDetachingJoinHandle, Until};
use crate::util::io::open_file;
use crate::util::serde::{HandlerExtSerde, display_serializable};
use crate::util::sync::{SyncMutex, Watch};
use crate::util::sync::Watch;
pub fn gateway_api<C: Context>() -> ParentHandler<C> {
ParentHandler::new()
@@ -838,7 +834,6 @@ pub struct NetworkInterfaceWatcher {
activated: Watch<BTreeMap<GatewayId, bool>>,
ip_info: Watch<OrdMap<GatewayId, NetworkInterfaceInfo>>,
_watcher: NonDetachingJoinHandle<()>,
listeners: SyncMutex<BTreeMap<u16, Weak<()>>>,
}
impl NetworkInterfaceWatcher {
pub fn new(
@@ -858,7 +853,6 @@ impl NetworkInterfaceWatcher {
watcher(ip_info, activated).await
})
.into(),
listeners: SyncMutex::new(BTreeMap::new()),
}
}
@@ -885,51 +879,6 @@ impl NetworkInterfaceWatcher {
pub fn ip_info(&self) -> OrdMap<GatewayId, NetworkInterfaceInfo> {
self.ip_info.read()
}
pub fn bind<B: Bind>(&self, bind: B, port: u16) -> Result<NetworkInterfaceListener<B>, Error> {
let arc = Arc::new(());
self.listeners.mutate(|l| {
if l.get(&port).filter(|w| w.strong_count() > 0).is_some() {
return Err(Error::new(
std::io::Error::from_raw_os_error(libc::EADDRINUSE),
ErrorKind::Network,
));
}
l.insert(port, Arc::downgrade(&arc));
Ok(())
})?;
let ip_info = self.ip_info.clone_unseen();
Ok(NetworkInterfaceListener {
_arc: arc,
ip_info,
listeners: ListenerMap::new(bind, port),
})
}
pub fn upgrade_listener<B: Bind>(
&self,
SelfContainedNetworkInterfaceListener {
mut listener,
..
}: SelfContainedNetworkInterfaceListener<B>,
) -> Result<NetworkInterfaceListener<B>, Error> {
let port = listener.listeners.port;
let arc = &listener._arc;
self.listeners.mutate(|l| {
if l.get(&port).filter(|w| w.strong_count() > 0).is_some() {
return Err(Error::new(
std::io::Error::from_raw_os_error(libc::EADDRINUSE),
ErrorKind::Network,
));
}
l.insert(port, Arc::downgrade(arc));
Ok(())
})?;
let ip_info = self.ip_info.clone_unseen();
ip_info.mark_changed();
listener.change_ip_info_source(ip_info);
Ok(listener)
}
}
pub struct NetworkInterfaceController {
@@ -1237,235 +1186,6 @@ impl NetworkInterfaceController {
}
}
pub trait InterfaceFilter: Any + Clone + std::fmt::Debug + Eq + Ord + Send + Sync {
fn filter(&self, id: &GatewayId, info: &NetworkInterfaceInfo) -> bool;
fn eq(&self, other: &dyn Any) -> bool {
Some(self) == other.downcast_ref::<Self>()
}
fn cmp(&self, other: &dyn Any) -> std::cmp::Ordering {
match (self as &dyn Any).type_id().cmp(&other.type_id()) {
std::cmp::Ordering::Equal => {
std::cmp::Ord::cmp(self, other.downcast_ref::<Self>().unwrap())
}
ord => ord,
}
}
fn as_any(&self) -> &dyn Any {
self
}
fn into_dyn(self) -> DynInterfaceFilter {
DynInterfaceFilter::new(self)
}
}
impl InterfaceFilter for bool {
fn filter(&self, _: &GatewayId, _: &NetworkInterfaceInfo) -> bool {
*self
}
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct TypeFilter(pub NetworkInterfaceType);
impl InterfaceFilter for TypeFilter {
fn filter(&self, _: &GatewayId, info: &NetworkInterfaceInfo) -> bool {
info.ip_info.as_ref().and_then(|i| i.device_type) == Some(self.0)
}
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct IdFilter(pub GatewayId);
impl InterfaceFilter for IdFilter {
fn filter(&self, id: &GatewayId, _: &NetworkInterfaceInfo) -> bool {
id == &self.0
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct PublicFilter {
pub public: bool,
}
impl InterfaceFilter for PublicFilter {
fn filter(&self, _: &GatewayId, info: &NetworkInterfaceInfo) -> bool {
self.public == info.public()
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct SecureFilter {
pub secure: bool,
}
impl InterfaceFilter for SecureFilter {
fn filter(&self, _: &GatewayId, info: &NetworkInterfaceInfo) -> bool {
self.secure || info.secure()
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct AndFilter<A, B>(pub A, pub B);
impl<A: InterfaceFilter, B: InterfaceFilter> InterfaceFilter for AndFilter<A, B> {
fn filter(&self, id: &GatewayId, info: &NetworkInterfaceInfo) -> bool {
self.0.filter(id, info) && self.1.filter(id, info)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct OrFilter<A, B>(pub A, pub B);
impl<A: InterfaceFilter, B: InterfaceFilter> InterfaceFilter for OrFilter<A, B> {
fn filter(&self, id: &GatewayId, info: &NetworkInterfaceInfo) -> bool {
self.0.filter(id, info) || self.1.filter(id, info)
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct AnyFilter(pub BTreeSet<DynInterfaceFilter>);
impl InterfaceFilter for AnyFilter {
fn filter(&self, id: &GatewayId, info: &NetworkInterfaceInfo) -> bool {
self.0.iter().any(|f| InterfaceFilter::filter(f, id, info))
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct AllFilter(pub BTreeSet<DynInterfaceFilter>);
impl InterfaceFilter for AllFilter {
fn filter(&self, id: &GatewayId, info: &NetworkInterfaceInfo) -> bool {
self.0.iter().all(|f| InterfaceFilter::filter(f, id, info))
}
}
pub trait DynInterfaceFilterT: std::fmt::Debug + Any + Send + Sync {
fn filter(&self, id: &GatewayId, info: &NetworkInterfaceInfo) -> bool;
fn eq(&self, other: &dyn Any) -> bool;
fn cmp(&self, other: &dyn Any) -> std::cmp::Ordering;
fn as_any(&self) -> &dyn Any;
}
impl<T: InterfaceFilter> DynInterfaceFilterT for T {
fn filter(&self, id: &GatewayId, info: &NetworkInterfaceInfo) -> bool {
InterfaceFilter::filter(self, id, info)
}
fn eq(&self, other: &dyn Any) -> bool {
InterfaceFilter::eq(self, other)
}
fn cmp(&self, other: &dyn Any) -> std::cmp::Ordering {
InterfaceFilter::cmp(self, other)
}
fn as_any(&self) -> &dyn Any {
InterfaceFilter::as_any(self)
}
}
#[test]
fn test_interface_filter_eq() {
let dyn_t = true.into_dyn();
assert!(DynInterfaceFilterT::eq(
&dyn_t,
DynInterfaceFilterT::as_any(&true),
))
}
#[derive(Clone, Debug)]
pub struct DynInterfaceFilter(Arc<dyn DynInterfaceFilterT>);
impl InterfaceFilter for DynInterfaceFilter {
fn filter(&self, id: &GatewayId, info: &NetworkInterfaceInfo) -> bool {
self.0.filter(id, info)
}
fn eq(&self, other: &dyn Any) -> bool {
self.0.eq(other)
}
fn cmp(&self, other: &dyn Any) -> std::cmp::Ordering {
self.0.cmp(other)
}
fn as_any(&self) -> &dyn Any {
self.0.as_any()
}
fn into_dyn(self) -> DynInterfaceFilter {
self
}
}
impl DynInterfaceFilter {
fn new<T: InterfaceFilter>(value: T) -> Self {
Self(Arc::new(value))
}
}
impl PartialEq for DynInterfaceFilter {
fn eq(&self, other: &Self) -> bool {
DynInterfaceFilterT::eq(&*self.0, DynInterfaceFilterT::as_any(&*other.0))
}
}
impl Eq for DynInterfaceFilter {}
impl PartialOrd for DynInterfaceFilter {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.0.cmp(other.0.as_any()))
}
}
impl Ord for DynInterfaceFilter {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.0.cmp(other.0.as_any())
}
}
struct ListenerMap<B: Bind> {
prev_filter: DynInterfaceFilter,
bind: B,
port: u16,
listeners: BTreeMap<SocketAddr, B::Accept>,
}
impl<B: Bind> ListenerMap<B> {
fn new(bind: B, port: u16) -> Self {
Self {
prev_filter: false.into_dyn(),
bind,
port,
listeners: BTreeMap::new(),
}
}
#[instrument(skip(self))]
fn update(
&mut self,
ip_info: &OrdMap<GatewayId, NetworkInterfaceInfo>,
filter: &impl InterfaceFilter,
) -> Result<(), Error> {
let mut keep = BTreeSet::<SocketAddr>::new();
for (_, info) in ip_info
.iter()
.filter(|(id, info)| filter.filter(*id, *info))
{
if let Some(ip_info) = &info.ip_info {
for ipnet in &ip_info.subnets {
let addr = match ipnet.addr() {
IpAddr::V6(ip6) => SocketAddrV6::new(
ip6,
self.port,
0,
if ipv6_is_link_local(ip6) {
ip_info.scope_id
} else {
0
},
)
.into(),
ip => SocketAddr::new(ip, self.port),
};
keep.insert(addr);
if !self.listeners.contains_key(&addr) {
self.listeners.insert(addr, self.bind.bind(addr)?);
}
}
}
}
self.listeners.retain(|key, _| keep.contains(key));
self.prev_filter = filter.clone().into_dyn();
Ok(())
}
fn poll_accept(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(SocketAddr, <B::Accept as Accept>::Metadata, AcceptStream), Error>> {
let (metadata, stream) = ready!(self.listeners.poll_accept(cx)?);
Poll::Ready(Ok((metadata.key, metadata.inner, stream)))
}
}
pub fn lookup_info_by_addr(
ip_info: &OrdMap<GatewayId, NetworkInterfaceInfo>,
addr: SocketAddr,
@@ -1477,28 +1197,6 @@ pub fn lookup_info_by_addr(
})
}
pub trait Bind {
type Accept: Accept;
fn bind(&mut self, addr: SocketAddr) -> Result<Self::Accept, Error>;
}
#[derive(Clone, Copy, Default)]
pub struct BindTcp;
impl Bind for BindTcp {
type Accept = TcpListener;
fn bind(&mut self, addr: SocketAddr) -> Result<Self::Accept, Error> {
TcpListener::from_std(
mio::net::TcpListener::bind(addr)
.with_kind(ErrorKind::Network)?
.into(),
)
.with_kind(ErrorKind::Network)
}
}
pub trait FromGatewayInfo {
fn from_gateway_info(id: &GatewayId, info: &NetworkInterfaceInfo) -> Self;
}
#[derive(Clone, Debug)]
pub struct GatewayInfo {
pub id: GatewayId,
@@ -1509,202 +1207,88 @@ impl<V: MetadataVisitor> Visit<V> for GatewayInfo {
visitor.visit(self)
}
}
impl FromGatewayInfo for GatewayInfo {
fn from_gateway_info(id: &GatewayId, info: &NetworkInterfaceInfo) -> Self {
Self {
id: id.clone(),
info: info.clone(),
}
}
}
pub struct NetworkInterfaceListener<B: Bind = BindTcp> {
pub ip_info: Watch<OrdMap<GatewayId, NetworkInterfaceInfo>>,
listeners: ListenerMap<B>,
_arc: Arc<()>,
}
impl<B: Bind> NetworkInterfaceListener<B> {
pub(super) fn new(
mut ip_info: Watch<OrdMap<GatewayId, NetworkInterfaceInfo>>,
bind: B,
port: u16,
) -> Self {
ip_info.mark_unseen();
Self {
ip_info,
listeners: ListenerMap::new(bind, port),
_arc: Arc::new(()),
}
}
pub fn port(&self) -> u16 {
self.listeners.port
}
#[cfg_attr(feature = "unstable", inline(never))]
pub fn poll_accept<M: FromGatewayInfo>(
&mut self,
cx: &mut std::task::Context<'_>,
filter: &impl InterfaceFilter,
) -> Poll<Result<(M, <B::Accept as Accept>::Metadata, AcceptStream), Error>> {
while self.ip_info.poll_changed(cx).is_ready()
|| !DynInterfaceFilterT::eq(&self.listeners.prev_filter, filter.as_any())
{
self.ip_info
.peek_and_mark_seen(|ip_info| self.listeners.update(ip_info, filter))?;
}
let (addr, inner, stream) = ready!(self.listeners.poll_accept(cx)?);
Poll::Ready(Ok((
self.ip_info
.peek(|ip_info| {
lookup_info_by_addr(ip_info, addr)
.map(|(id, info)| M::from_gateway_info(id, info))
})
.or_not_found(lazy_format!("gateway for {addr}"))?,
inner,
stream,
)))
}
pub fn change_ip_info_source(
&mut self,
mut ip_info: Watch<OrdMap<GatewayId, NetworkInterfaceInfo>>,
) {
ip_info.mark_unseen();
self.ip_info = ip_info;
}
pub async fn accept<M: FromGatewayInfo>(
&mut self,
filter: &impl InterfaceFilter,
) -> Result<(M, <B::Accept as Accept>::Metadata, AcceptStream), Error> {
futures::future::poll_fn(|cx| self.poll_accept(cx, filter)).await
}
pub fn check_filter(&self) -> impl FnOnce(SocketAddr, &DynInterfaceFilter) -> bool + 'static {
let ip_info = self.ip_info.clone();
move |addr, filter| {
ip_info.peek(|i| {
lookup_info_by_addr(i, addr).map_or(false, |(id, info)| {
InterfaceFilter::filter(filter, id, info)
})
})
}
}
}
#[derive(VisitFields)]
pub struct NetworkInterfaceListenerAcceptMetadata<B: Bind> {
pub inner: <B::Accept as Accept>::Metadata,
/// Metadata for connections accepted by WildcardListener or VHostBindListener.
#[derive(Clone, Debug, VisitFields)]
pub struct NetworkInterfaceListenerAcceptMetadata {
pub inner: TcpMetadata,
pub info: GatewayInfo,
}
impl<B: Bind> fmt::Debug for NetworkInterfaceListenerAcceptMetadata<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NetworkInterfaceListenerAcceptMetadata")
.field("inner", &self.inner)
.field("info", &self.info)
.finish()
}
}
impl<B: Bind> Clone for NetworkInterfaceListenerAcceptMetadata<B>
where
<B::Accept as Accept>::Metadata: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
info: self.info.clone(),
}
}
}
impl<B, V> Visit<V> for NetworkInterfaceListenerAcceptMetadata<B>
where
B: Bind,
<B::Accept as Accept>::Metadata: Visit<V> + Clone + Send + Sync + 'static,
V: MetadataVisitor,
{
impl<V: MetadataVisitor> Visit<V> for NetworkInterfaceListenerAcceptMetadata {
fn visit(&self, visitor: &mut V) -> V::Result {
self.visit_fields(visitor).collect()
}
}
impl<B: Bind> Accept for NetworkInterfaceListener<B> {
type Metadata = NetworkInterfaceListenerAcceptMetadata<B>;
/// A simple TCP listener on 0.0.0.0:port that looks up GatewayInfo from the
/// connection's local address on each accepted connection.
pub struct WildcardListener {
listener: TcpListener,
ip_info: Watch<OrdMap<GatewayId, NetworkInterfaceInfo>>,
/// Handle to the self-contained watcher task started in `new()`.
/// Dropped (and thus aborted) when `set_ip_info` replaces the ip_info source.
_watcher: Option<NonDetachingJoinHandle<()>>,
}
impl WildcardListener {
pub fn new(port: u16) -> Result<Self, Error> {
let listener = TcpListener::from_std(
mio::net::TcpListener::bind(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port))
.with_kind(ErrorKind::Network)?
.into(),
)
.with_kind(ErrorKind::Network)?;
let ip_info = Watch::new(OrdMap::new());
let watcher_handle =
tokio::spawn(watcher(ip_info.clone(), Watch::new(BTreeMap::new()))).into();
Ok(Self {
listener,
ip_info,
_watcher: Some(watcher_handle),
})
}
/// Replace the ip_info source with the one from the NetworkInterfaceController.
/// Aborts the self-contained watcher task.
pub fn set_ip_info(&mut self, ip_info: Watch<OrdMap<GatewayId, NetworkInterfaceInfo>>) {
self.ip_info = ip_info;
self._watcher = None;
}
}
impl Accept for WildcardListener {
type Metadata = NetworkInterfaceListenerAcceptMetadata;
fn poll_accept(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(Self::Metadata, AcceptStream), Error>> {
NetworkInterfaceListener::poll_accept(self, cx, &true).map(|res| {
res.map(|(info, inner, stream)| {
(
NetworkInterfaceListenerAcceptMetadata { inner, info },
stream,
)
})
})
}
}
pub struct SelfContainedNetworkInterfaceListener<B: Bind = BindTcp> {
_watch_thread: NonDetachingJoinHandle<()>,
listener: NetworkInterfaceListener<B>,
}
impl<B: Bind> SelfContainedNetworkInterfaceListener<B> {
pub fn bind(bind: B, port: u16) -> Self {
let ip_info = Watch::new(OrdMap::new());
let _watch_thread =
tokio::spawn(watcher(ip_info.clone(), Watch::new(BTreeMap::new()))).into();
Self {
_watch_thread,
listener: NetworkInterfaceListener::new(ip_info, bind, port),
if let Poll::Ready((stream, peer_addr)) = TcpListener::poll_accept(&self.listener, cx)? {
if let Err(e) = socket2::SockRef::from(&stream).set_keepalive(true) {
tracing::error!("Failed to set tcp keepalive: {e}");
tracing::debug!("{e:?}");
}
let local_addr = stream.local_addr()?;
let info = self
.ip_info
.peek(|ip_info| {
lookup_info_by_addr(ip_info, local_addr).map(|(id, info)| GatewayInfo {
id: id.clone(),
info: info.clone(),
})
})
.unwrap_or_else(|| GatewayInfo {
id: InternedString::from_static("").into(),
info: NetworkInterfaceInfo::default(),
});
return Poll::Ready(Ok((
NetworkInterfaceListenerAcceptMetadata {
inner: TcpMetadata {
local_addr,
peer_addr,
},
info,
},
Box::pin(stream),
)));
}
Poll::Pending
}
}
impl<B: Bind> Accept for SelfContainedNetworkInterfaceListener<B> {
type Metadata = <NetworkInterfaceListener<B> as Accept>::Metadata;
fn poll_accept(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(Self::Metadata, AcceptStream), Error>> {
Accept::poll_accept(&mut self.listener, cx)
}
}
pub type UpgradableListener<B = BindTcp> =
Option<Either<SelfContainedNetworkInterfaceListener<B>, NetworkInterfaceListener<B>>>;
impl<B> Acceptor<UpgradableListener<B>>
where
B: Bind + Send + Sync + 'static,
B::Accept: Send + Sync,
{
pub fn bind_upgradable(listener: SelfContainedNetworkInterfaceListener<B>) -> Self {
Self::new(Some(Either::Left(listener)))
}
}
#[test]
fn test_filter() {
let wg1 = "wg1".parse::<GatewayId>().unwrap();
assert!(!InterfaceFilter::filter(
&AndFilter(IdFilter(wg1.clone()), PublicFilter { public: false }).into_dyn(),
&wg1,
&NetworkInterfaceInfo {
name: None,
public: None,
secure: None,
ip_info: Some(Arc::new(IpInfo {
name: "".into(),
scope_id: 3,
device_type: Some(NetworkInterfaceType::Wireguard),
subnets: ["10.59.0.2/24".parse::<IpNet>().unwrap()]
.into_iter()
.collect(),
lan_ip: Default::default(),
wan_ip: None,
ntp_servers: Default::default(),
dns_servers: Default::default(),
})),
},
));
}