feat: refactor NetService to watch DB and reconcile network state

- NetService sync task now uses PatchDB DbWatch instead of being called
  directly after DB mutations
- Read gateways from DB instead of network interface context when
  updating host addresses
- gateway sync updates all host addresses in the DB
- Add Watch<u64> channel for callers to wait on sync completion
- Fix ts-rs codegen bug with #[ts(skip)] on flattened Plugin field
- Update SDK getServiceInterface.ts for new HostnameInfo shape
- Remove unnecessary HTTPS redirect in static_server.rs
- Fix tunnel/api.rs to filter for WAN IPv4 address
This commit is contained in:
Aiden McClelland
2026-02-13 16:21:57 -07:00
parent 3765465618
commit 49d4da03ca
12 changed files with 5767 additions and 3489 deletions

View File

@@ -32,6 +32,7 @@ use crate::context::{CliContext, RpcContext};
use crate::db::model::Database;
use crate::db::model::public::{IpInfo, NetworkInterfaceInfo, NetworkInterfaceType};
use crate::net::forward::START9_BRIDGE_IFACE;
use crate::net::host::all_hosts;
use crate::net::gateway::device::DeviceProxy;
use crate::net::web_server::{Accept, AcceptStream, MetadataVisitor, TcpMetadata};
use crate::prelude::*;
@@ -1003,7 +1004,12 @@ impl NetworkInterfaceController {
.as_server_info_mut()
.as_network_mut()
.as_gateways_mut()
.ser(info)
.ser(info)?;
let ports = db.as_private().as_available_ports().de()?;
for host in all_hosts(db) {
host?.update_addresses(info, &ports)?;
}
Ok(())
})
.await
.result?;

View File

@@ -193,7 +193,10 @@ pub async fn add_public_domain<Kind: HostApiKind>(
Kind::host_for(&inheritance, db)?
.as_public_domains_mut()
.insert(&fqdn, &PublicDomainConfig { acme, gateway })?;
handle_duplicates(db)
handle_duplicates(db)?;
let gateways = db.as_public().as_server_info().as_network().as_gateways().de()?;
let ports = db.as_private().as_available_ports().de()?;
Kind::host_for(&inheritance, db)?.update_addresses(&gateways, &ports)
})
.await
.result?;
@@ -221,7 +224,10 @@ pub async fn remove_public_domain<Kind: HostApiKind>(
.mutate(|db| {
Kind::host_for(&inheritance, db)?
.as_public_domains_mut()
.remove(&fqdn)
.remove(&fqdn)?;
let gateways = db.as_public().as_server_info().as_network().as_gateways().de()?;
let ports = db.as_private().as_available_ports().de()?;
Kind::host_for(&inheritance, db)?.update_addresses(&gateways, &ports)
})
.await
.result?;
@@ -248,7 +254,10 @@ pub async fn add_private_domain<Kind: HostApiKind>(
.as_private_domains_mut()
.upsert(&fqdn, || Ok(BTreeSet::new()))?
.mutate(|d| Ok(d.insert(gateway)))?;
handle_duplicates(db)
handle_duplicates(db)?;
let gateways = db.as_public().as_server_info().as_network().as_gateways().de()?;
let ports = db.as_private().as_available_ports().de()?;
Kind::host_for(&inheritance, db)?.update_addresses(&gateways, &ports)
})
.await
.result?;
@@ -266,7 +275,10 @@ pub async fn remove_private_domain<Kind: HostApiKind>(
.mutate(|db| {
Kind::host_for(&inheritance, db)?
.as_private_domains_mut()
.mutate(|d| Ok(d.remove(&domain)))
.mutate(|d| Ok(d.remove(&domain)))?;
let gateways = db.as_public().as_server_info().as_network().as_gateways().de()?;
let ports = db.as_private().as_available_ports().de()?;
Kind::host_for(&inheritance, db)?.update_addresses(&gateways, &ports)
})
.await
.result?;

View File

@@ -3,17 +3,15 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::{Arc, Weak};
use color_eyre::eyre::eyre;
use imbl::vector;
use imbl_value::InternedString;
use ipnet::IpNet;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio_rustls::rustls::ClientConfig as TlsClientConfig;
use tracing::instrument;
use patch_db::json_ptr::JsonPointer;
use crate::db::model::Database;
use crate::db::model::public::NetworkInterfaceType;
use crate::error::ErrorCollection;
use crate::hostname::Hostname;
use crate::net::dns::DnsController;
use crate::net::forward::{
@@ -23,13 +21,13 @@ use crate::net::gateway::NetworkInterfaceController;
use crate::net::host::address::HostAddress;
use crate::net::host::binding::{AddSslOptions, BindId, BindOptions};
use crate::net::host::{Host, Hosts, host_for};
use crate::net::service_interface::{HostnameInfo, HostnameMetadata};
use crate::net::service_interface::HostnameMetadata;
use crate::net::socks::SocksController;
use crate::net::utils::ipv6_is_local;
use crate::net::vhost::{AlpnInfo, DynVHostTarget, ProxyTarget, VHostController};
use crate::prelude::*;
use crate::service::effects::callbacks::ServiceCallbacks;
use crate::util::serde::MaybeUtf8String;
use crate::util::sync::Watch;
use crate::{GatewayId, HOST_IP, HostId, OptionExt, PackageId};
pub struct NetController {
@@ -182,79 +180,11 @@ impl NetServiceData {
})
}
async fn clear_bindings(
&mut self,
ctrl: &NetController,
except: BTreeSet<BindId>,
) -> Result<(), Error> {
if let Some(pkg_id) = &self.id {
let hosts = ctrl
.db
.mutate(|db| {
let mut res = Hosts::default();
for (host_id, host) in db
.as_public_mut()
.as_package_data_mut()
.as_idx_mut(pkg_id)
.or_not_found(pkg_id)?
.as_hosts_mut()
.as_entries_mut()?
{
host.as_bindings_mut().mutate(|b| {
for (internal_port, info) in b.iter_mut() {
if !except.contains(&BindId {
id: host_id.clone(),
internal_port: *internal_port,
}) {
info.disable();
}
}
Ok(())
})?;
res.0.insert(host_id, host.de()?);
}
Ok(res)
})
.await
.result?;
let mut errors = ErrorCollection::new();
for (id, host) in hosts.0 {
errors.handle(self.update(ctrl, id, host).await);
}
errors.into_result()
} else {
let host = ctrl
.db
.mutate(|db| {
let host = db
.as_public_mut()
.as_server_info_mut()
.as_network_mut()
.as_host_mut();
host.as_bindings_mut().mutate(|b| {
for (internal_port, info) in b.iter_mut() {
if !except.contains(&BindId {
id: HostId::default(),
internal_port: *internal_port,
}) {
info.disable();
}
}
Ok(())
})?;
host.de()
})
.await
.result?;
self.update(ctrl, HostId::default(), host).await
}
}
async fn update(
&mut self,
ctrl: &NetController,
id: HostId,
mut host: Host,
host: Host,
) -> Result<(), Error> {
let mut forwards: BTreeMap<u16, (SocketAddrV4, ForwardRequirements)> = BTreeMap::new();
let mut vhosts: BTreeMap<(Option<InternedString>, u16), ProxyTarget> = BTreeMap::new();
@@ -274,200 +204,7 @@ impl NetServiceData {
}
}
// ── Phase 1: Compute available addresses ──
for (_port, bind) in host.bindings.iter_mut() {
if !bind.enabled {
continue;
}
if bind.net.assigned_port.is_none() && bind.net.assigned_ssl_port.is_none() {
continue;
}
bind.addresses.available.clear();
// Domain port: non-SSL port for domains (secure-filtered, gateway-independent)
let domain_base_port = bind.net.assigned_port.filter(|_| {
bind.options
.secure
.map_or(false, |s| !s.ssl || bind.options.add_ssl.is_none())
});
let (domain_port, domain_ssl_port) = if bind
.options
.add_ssl
.as_ref()
.map_or(false, |ssl| ssl.preferred_external_port == 443)
{
(None, Some(443))
} else {
(domain_base_port, bind.net.assigned_ssl_port)
};
// Domain addresses
for HostAddress {
address, public, ..
} in host_addresses.iter().cloned()
{
// Public domain entry
if let Some(pub_config) = &public {
let metadata = HostnameMetadata::PublicDomain {
gateway: pub_config.gateway.clone(),
};
if let Some(p) = domain_port {
bind.addresses.available.insert(HostnameInfo {
ssl: false,
public: true,
host: address.clone(),
port: Some(p),
metadata: metadata.clone(),
});
}
if let Some(sp) = domain_ssl_port {
bind.addresses.available.insert(HostnameInfo {
ssl: true,
public: true,
host: address.clone(),
port: Some(sp),
metadata,
});
}
}
// Private domain entry
if let Some(gateways) = host.private_domains.get(&address) {
if !gateways.is_empty() {
let metadata = HostnameMetadata::PrivateDomain {
gateways: gateways.clone(),
};
if let Some(p) = domain_port {
bind.addresses.available.insert(HostnameInfo {
ssl: false,
public: false,
host: address.clone(),
port: Some(p),
metadata: metadata.clone(),
});
}
if let Some(sp) = domain_ssl_port {
bind.addresses.available.insert(HostnameInfo {
ssl: true,
public: false,
host: address.clone(),
port: Some(sp),
metadata,
});
}
}
}
}
// IP addresses (per-gateway)
for (gateway_id, info) in net_ifaces
.iter()
.filter(|(_, info)| {
info.ip_info.as_ref().map_or(false, |i| {
!matches!(i.device_type, Some(NetworkInterfaceType::Bridge))
})
})
.filter(|(_, info)| info.ip_info.is_some())
{
let port = bind.net.assigned_port.filter(|_| {
bind.options.secure.map_or(false, |s| {
!(s.ssl && bind.options.add_ssl.is_some()) || info.secure()
})
});
if let Some(ip_info) = &info.ip_info {
let public = info.public();
// WAN IP (public)
if let Some(wan_ip) = ip_info.wan_ip {
let host_str = InternedString::from_display(&wan_ip);
if let Some(p) = port {
bind.addresses.available.insert(HostnameInfo {
ssl: false,
public: true,
host: host_str.clone(),
port: Some(p),
metadata: HostnameMetadata::Ipv4 {
gateway: gateway_id.clone(),
},
});
}
if let Some(sp) = bind.net.assigned_ssl_port {
bind.addresses.available.insert(HostnameInfo {
ssl: true,
public: true,
host: host_str,
port: Some(sp),
metadata: HostnameMetadata::Ipv4 {
gateway: gateway_id.clone(),
},
});
}
}
// Subnet IPs
for ipnet in &ip_info.subnets {
match ipnet {
IpNet::V4(net) => {
if !public {
let host_str = InternedString::from_display(&net.addr());
if let Some(p) = port {
bind.addresses.available.insert(HostnameInfo {
ssl: false,
public: false,
host: host_str.clone(),
port: Some(p),
metadata: HostnameMetadata::Ipv4 {
gateway: gateway_id.clone(),
},
});
}
if let Some(sp) = bind.net.assigned_ssl_port {
bind.addresses.available.insert(HostnameInfo {
ssl: true,
public: false,
host: host_str,
port: Some(sp),
metadata: HostnameMetadata::Ipv4 {
gateway: gateway_id.clone(),
},
});
}
}
}
IpNet::V6(net) => {
let is_public = public && !ipv6_is_local(net.addr());
let host_str = InternedString::from_display(&net.addr());
if let Some(p) = port {
bind.addresses.available.insert(HostnameInfo {
ssl: false,
public: is_public,
host: host_str.clone(),
port: Some(p),
metadata: HostnameMetadata::Ipv6 {
gateway: gateway_id.clone(),
scope_id: ip_info.scope_id,
},
});
}
if let Some(sp) = bind.net.assigned_ssl_port {
bind.addresses.available.insert(HostnameInfo {
ssl: true,
public: is_public,
host: host_str,
port: Some(sp),
metadata: HostnameMetadata::Ipv6 {
gateway: gateway_id.clone(),
scope_id: ip_info.scope_id,
},
});
}
}
}
}
}
}
}
// ── Phase 2: Build controller entries from enabled addresses ──
// ── Build controller entries from enabled addresses ──
for (port, bind) in host.bindings.iter() {
if !bind.enabled {
continue;
@@ -685,74 +422,16 @@ impl NetServiceData {
}
ctrl.dns.gc_private_domains(&rm)?;
let res = ctrl
.db
.mutate(|db| {
let bindings = host_for(db, self.id.as_ref(), &id)?.as_bindings_mut();
for (port, bind) in host.bindings.0 {
if let Some(b) = bindings.as_idx_mut(&port) {
b.as_addresses_mut()
.as_available_mut()
.ser(&bind.addresses.available)?;
}
}
Ok(())
})
.await;
res.result?;
if let Some(pkg_id) = self.id.as_ref() {
if res.revision.is_some() {
if let Some(cbs) = ctrl.callbacks.get_host_info(&(pkg_id.clone(), id)) {
cbs.call(vector![]).await?;
}
}
}
Ok(())
}
async fn update_all(&mut self) -> Result<(), Error> {
let ctrl = self.net_controller()?;
if let Some(id) = self.id.clone() {
for (host_id, host) in ctrl
.db
.peek()
.await
.as_public()
.as_package_data()
.as_idx(&id)
.or_not_found(&id)?
.as_hosts()
.as_entries()?
{
tracing::info!("Updating host {host_id} for {id}");
self.update(&*ctrl, host_id.clone(), host.de()?).await?;
tracing::info!("Updated host {host_id} for {id}");
}
} else {
tracing::info!("Updating host for Main UI");
self.update(
&*ctrl,
HostId::default(),
ctrl.db
.peek()
.await
.as_public()
.as_server_info()
.as_network()
.as_host()
.de()?,
)
.await?;
tracing::info!("Updated host for Main UI");
}
Ok(())
}
}
pub struct NetService {
shutdown: bool,
data: Arc<Mutex<NetServiceData>>,
sync_task: JoinHandle<()>,
synced: Watch<u64>,
}
impl NetService {
fn dummy() -> Self {
@@ -766,26 +445,79 @@ impl NetService {
binds: BTreeMap::new(),
})),
sync_task: tokio::spawn(futures::future::ready(())),
synced: Watch::new(0u64),
}
}
fn new(data: NetServiceData) -> Result<Self, Error> {
let mut ip_info = data.net_controller()?.net_iface.watcher.subscribe();
let ctrl = data.net_controller()?;
let pkg_id = data.id.clone();
let db = ctrl.db.clone();
drop(ctrl);
let synced = Watch::new(0u64);
let synced_writer = synced.clone();
let data = Arc::new(Mutex::new(data));
let thread_data = data.clone();
let sync_task = tokio::spawn(async move {
loop {
if let Err(e) = thread_data.lock().await.update_all().await {
tracing::error!("Failed to update network info: {e}");
tracing::debug!("{e:?}");
if let Some(ref id) = pkg_id {
let ptr: JsonPointer = format!("/public/packageData/{}/hosts", id)
.parse()
.unwrap();
let mut watch = db.watch(ptr).await.typed::<Hosts>();
loop {
if let Err(e) = watch.changed().await {
tracing::error!("DB watch disconnected for {id}: {e}");
break;
}
if let Err(e) = async {
let hosts = watch.peek()?.de()?;
let mut data = thread_data.lock().await;
let ctrl = data.net_controller()?;
for (host_id, host) in hosts.0 {
data.update(&*ctrl, host_id, host).await?;
}
Ok::<_, Error>(())
}
.await
{
tracing::error!("Failed to update network info for {id}: {e}");
tracing::debug!("{e:?}");
}
synced_writer.send_modify(|v| *v += 1);
}
} else {
let ptr: JsonPointer = "/public/serverInfo/network/host".parse().unwrap();
let mut watch = db.watch(ptr).await.typed::<Host>();
loop {
if let Err(e) = watch.changed().await {
tracing::error!("DB watch disconnected for Main UI: {e}");
break;
}
if let Err(e) = async {
let host = watch.peek()?.de()?;
let mut data = thread_data.lock().await;
let ctrl = data.net_controller()?;
data.update(&*ctrl, HostId::default(), host).await?;
Ok::<_, Error>(())
}
.await
{
tracing::error!("Failed to update network info for Main UI: {e}");
tracing::debug!("{e:?}");
}
synced_writer.send_modify(|v| *v += 1);
}
ip_info.changed().await;
}
});
Ok(Self {
shutdown: false,
data,
sync_task,
synced,
})
}
@@ -795,60 +527,113 @@ impl NetService {
internal_port: u16,
options: BindOptions,
) -> Result<(), Error> {
let mut data = self.data.lock().await;
let pkg_id = &data.id;
let ctrl = data.net_controller()?;
let host = ctrl
.db
let (ctrl, pkg_id) = {
let data = self.data.lock().await;
(data.net_controller()?, data.id.clone())
};
ctrl.db
.mutate(|db| {
let gateways = db
.as_public()
.as_server_info()
.as_network()
.as_gateways()
.de()?;
let mut ports = db.as_private().as_available_ports().de()?;
let host = host_for(db, pkg_id.as_ref(), &id)?;
host.add_binding(&mut ports, internal_port, options)?;
let host = host.de()?;
host.update_addresses(&gateways, &ports)?;
db.as_private_mut().as_available_ports_mut().ser(&ports)?;
Ok(host)
Ok(())
})
.await
.result?;
data.update(&*ctrl, id, host).await
.result
}
pub async fn clear_bindings(&self, except: BTreeSet<BindId>) -> Result<(), Error> {
let mut data = self.data.lock().await;
let ctrl = data.net_controller()?;
data.clear_bindings(&*ctrl, except).await
let (ctrl, pkg_id) = {
let data = self.data.lock().await;
(data.net_controller()?, data.id.clone())
};
ctrl.db
.mutate(|db| {
let gateways = db
.as_public()
.as_server_info()
.as_network()
.as_gateways()
.de()?;
let ports = db.as_private().as_available_ports().de()?;
if let Some(ref pkg_id) = pkg_id {
for (host_id, host) in db
.as_public_mut()
.as_package_data_mut()
.as_idx_mut(pkg_id)
.or_not_found(pkg_id)?
.as_hosts_mut()
.as_entries_mut()?
{
host.as_bindings_mut().mutate(|b| {
for (internal_port, info) in b.iter_mut() {
if !except.contains(&BindId {
id: host_id.clone(),
internal_port: *internal_port,
}) {
info.disable();
}
}
Ok(())
})?;
host.update_addresses(&gateways, &ports)?;
}
} else {
let host = db
.as_public_mut()
.as_server_info_mut()
.as_network_mut()
.as_host_mut();
host.as_bindings_mut().mutate(|b| {
for (internal_port, info) in b.iter_mut() {
if !except.contains(&BindId {
id: HostId::default(),
internal_port: *internal_port,
}) {
info.disable();
}
}
Ok(())
})?;
host.update_addresses(&gateways, &ports)?;
}
Ok(())
})
.await
.result
}
pub async fn update(&self, id: HostId, host: Host) -> Result<(), Error> {
let mut data = self.data.lock().await;
let ctrl = data.net_controller()?;
data.update(&*ctrl, id, host).await
}
pub async fn sync_host(&self, id: HostId) -> Result<(), Error> {
let mut data = self.data.lock().await;
let ctrl = data.net_controller()?;
let host = host_for(&mut ctrl.db.peek().await, data.id.as_ref(), &id)?.de()?;
data.update(&*ctrl, id, host).await
pub async fn sync_host(&self, _id: HostId) -> Result<(), Error> {
let current = self.synced.peek(|v| *v);
let mut w = self.synced.clone();
w.wait_for(|v| *v > current).await;
Ok(())
}
pub async fn remove_all(mut self) -> Result<(), Error> {
self.sync_task.abort();
let mut data = self.data.lock().await;
if let Some(ctrl) = Weak::upgrade(&data.controller) {
self.shutdown = true;
data.clear_bindings(&*ctrl, Default::default()).await?;
drop(ctrl);
Ok(())
} else {
if Weak::upgrade(&self.data.lock().await.controller).is_none() {
self.shutdown = true;
tracing::warn!("NetService dropped after NetController is shutdown");
Err(Error::new(
return Err(Error::new(
eyre!("NetController is shutdown"),
crate::ErrorKind::Network,
))
));
}
let current = self.synced.peek(|v| *v);
self.clear_bindings(Default::default()).await?;
let mut w = self.synced.clone();
w.wait_for(|v| *v > current).await;
self.sync_task.abort();
self.shutdown = true;
Ok(())
}
pub async fn get_ip(&self) -> Ipv4Addr {

View File

@@ -41,6 +41,7 @@ pub enum HostnameMetadata {
Plugin {
package: PackageId,
#[serde(flatten)]
#[ts(skip)]
extra: InOMap<InternedString, Value>,
},
}

View File

@@ -9,14 +9,14 @@ use async_compression::tokio::bufread::GzipEncoder;
use axum::Router;
use axum::body::Body;
use axum::extract::{self as x, Request};
use axum::response::{IntoResponse, Redirect, Response};
use axum::response::{IntoResponse, Response};
use axum::routing::{any, get};
use base64::display::Base64Display;
use digest::Digest;
use futures::future::ready;
use http::header::{
ACCEPT_ENCODING, ACCEPT_RANGES, CACHE_CONTROL, CONNECTION, CONTENT_ENCODING, CONTENT_LENGTH,
CONTENT_RANGE, CONTENT_TYPE, ETAG, HOST, RANGE,
CONTENT_RANGE, CONTENT_TYPE, ETAG, RANGE,
};
use http::request::Parts as RequestParts;
use http::{HeaderValue, Method, StatusCode};
@@ -36,8 +36,6 @@ use crate::middleware::auth::Auth;
use crate::middleware::auth::session::ValidSessionToken;
use crate::middleware::cors::Cors;
use crate::middleware::db::SyncDb;
use crate::net::gateway::GatewayInfo;
use crate::net::tls::TlsHandshakeInfo;
use crate::prelude::*;
use crate::rpc_continuations::{Guid, RpcContinuations};
use crate::s9pk::S9pk;
@@ -89,30 +87,6 @@ impl UiContext for RpcContext {
.middleware(SyncDb::new())
}
fn extend_router(self, router: Router) -> Router {
async fn https_redirect_if_public_http(
req: Request,
next: axum::middleware::Next,
) -> Response {
if req
.extensions()
.get::<GatewayInfo>()
.map_or(false, |p| p.info.public())
&& req.extensions().get::<TlsHandshakeInfo>().is_none()
{
Redirect::temporary(&format!(
"https://{}{}",
req.headers()
.get(HOST)
.and_then(|s| s.to_str().ok())
.unwrap_or("localhost"),
req.uri()
))
.into_response()
} else {
next.run(req).await
}
}
router
.route("/proxy/{url}", {
let ctx = self.clone();
@@ -136,7 +110,6 @@ impl UiContext for RpcContext {
}
}),
)
.layer(axum::middleware::from_fn(https_redirect_if_public_http))
}
}

View File

@@ -175,10 +175,13 @@ pub async fn remove_tunnel(
ctx.db
.mutate(|db| {
let gateways = db.as_public().as_server_info().as_network().as_gateways().de()?;
let ports = db.as_private().as_available_ports().de()?;
for host in all_hosts(db) {
let host = host?;
host.as_public_domains_mut()
.mutate(|p| Ok(p.retain(|_, v| v.gateway != id)))?;
host.update_addresses(&gateways, &ports)?;
}
Ok(())
@@ -190,6 +193,8 @@ pub async fn remove_tunnel(
ctx.db
.mutate(|db| {
let gateways = db.as_public().as_server_info().as_network().as_gateways().de()?;
let ports = db.as_private().as_available_ports().de()?;
for host in all_hosts(db) {
let host = host?;
host.as_private_domains_mut().mutate(|d| {
@@ -199,6 +204,7 @@ pub async fn remove_tunnel(
d.retain(|_, gateways| !gateways.is_empty());
Ok(())
})?;
host.update_addresses(&gateways, &ports)?;
}
Ok(())

View File

@@ -11,6 +11,9 @@ use serde::{Deserialize, Serialize};
use tracing::warn;
use ts_rs::TS;
use patch_db::json_ptr::JsonPointer;
use crate::db::model::Database;
use crate::net::ssl::FullchainCertData;
use crate::prelude::*;
use crate::service::effects::context::EffectContext;
@@ -29,7 +32,7 @@ struct ServiceCallbackMap {
get_service_interface: BTreeMap<(PackageId, ServiceInterfaceId), Vec<CallbackHandler>>,
list_service_interfaces: BTreeMap<PackageId, Vec<CallbackHandler>>,
get_system_smtp: Vec<CallbackHandler>,
get_host_info: BTreeMap<(PackageId, HostId), Vec<CallbackHandler>>,
get_host_info: BTreeMap<(PackageId, HostId), (NonDetachingJoinHandle<()>, Vec<CallbackHandler>)>,
get_ssl_certificate: EqMap<
(BTreeSet<InternedString>, FullchainCertData, Algorithm),
(NonDetachingJoinHandle<()>, Vec<CallbackHandler>),
@@ -57,7 +60,7 @@ impl ServiceCallbacks {
});
this.get_system_smtp
.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
this.get_host_info.retain(|_, v| {
this.get_host_info.retain(|_, (_, v)| {
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
!v.is_empty()
});
@@ -141,29 +144,57 @@ impl ServiceCallbacks {
}
pub(super) fn add_get_host_info(
&self,
self: &Arc<Self>,
db: &TypedPatchDb<Database>,
package_id: PackageId,
host_id: HostId,
handler: CallbackHandler,
) {
self.mutate(|this| {
this.get_host_info
.entry((package_id, host_id))
.or_default()
.entry((package_id.clone(), host_id.clone()))
.or_insert_with(|| {
let ptr: JsonPointer = format!(
"/public/packageData/{}/hosts/{}",
package_id, host_id
)
.parse()
.expect("valid json pointer");
let db = db.clone();
let callbacks = Arc::clone(self);
let key = (package_id, host_id);
(
tokio::spawn(async move {
let mut sub = db.subscribe(ptr).await;
while sub.recv().await.is_some() {
if let Some(cbs) = callbacks.mutate(|this| {
this.get_host_info
.remove(&key)
.map(|(_, handlers)| CallbackHandlers(handlers))
.filter(|cb| !cb.0.is_empty())
}) {
if let Err(e) = cbs.call(vector![]).await {
tracing::error!(
"Error in host info callback: {e}"
);
tracing::debug!("{e:?}");
}
}
// entry was removed when we consumed handlers,
// so stop watching — a new subscription will be
// created if the service re-registers
break;
}
})
.into(),
Vec::new(),
)
})
.1
.push(handler);
})
}
#[must_use]
pub fn get_host_info(&self, id: &(PackageId, HostId)) -> Option<CallbackHandlers> {
self.mutate(|this| {
Some(CallbackHandlers(
this.get_host_info.remove(id).unwrap_or_default(),
))
.filter(|cb| !cb.0.is_empty())
})
}
pub(super) fn add_get_ssl_certificate(
&self,
ctx: EffectContext,

View File

@@ -29,6 +29,7 @@ pub async fn get_host_info(
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context.seed.ctx.callbacks.add_get_host_info(
&context.seed.ctx.db,
package_id.clone(),
host_id.clone(),
CallbackHandler::new(&context, callback),

View File

@@ -414,14 +414,11 @@ pub async fn show_config(
i.iter().find_map(|(_, info)| {
info.ip_info
.as_ref()
.filter(|_| info.public())
.iter()
.find_map(|info| info.subnets.iter().next())
.copied()
.and_then(|ip_info| ip_info.wan_ip)
.map(IpAddr::from)
})
})
.or_not_found("a public IP address")?
.addr()
};
Ok(client
.client_config(