prevent gateways from getting stuck empty

This commit is contained in:
Aiden McClelland
2025-11-13 15:19:30 -07:00
parent aeaad3c833
commit 60ee836d44

View File

@@ -3,7 +3,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV6};
use std::sync::{Arc, Weak};
use std::task::{Poll, ready};
use std::task::{ready, Poll};
use std::time::Duration;
use clap::Parser;
@@ -17,7 +17,7 @@ use itertools::Itertools;
use models::GatewayId;
use nix::net::if_::if_nametoindex;
use patch_db::json_ptr::JsonPointer;
use rpc_toolkit::{Context, HandlerArgs, HandlerExt, ParentHandler, from_fn_async};
use rpc_toolkit::{from_fn_async, Context, HandlerArgs, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::net::TcpListener;
@@ -29,29 +29,29 @@ use zbus::proxy::{PropertyChanged, PropertyStream, SignalStream};
use zbus::zvariant::{
DeserializeDict, Dict, OwnedObjectPath, OwnedValue, Type as ZType, Value as ZValue,
};
use zbus::{Connection, proxy};
use zbus::{proxy, Connection};
use crate::context::{CliContext, RpcContext};
use crate::db::model::Database;
use crate::db::model::public::{IpInfo, NetworkInterfaceInfo, NetworkInterfaceType};
use crate::db::model::Database;
use crate::net::forward::START9_BRIDGE_IFACE;
use crate::net::gateway::device::DeviceProxy;
use crate::net::utils::ipv6_is_link_local;
use crate::net::web_server::{Accept, AcceptStream, Acceptor, MetadataVisitor};
use crate::prelude::*;
use crate::util::Invoke;
use crate::util::collections::OrdMapIterMut;
use crate::util::future::Until;
use crate::util::io::open_file;
use crate::util::serde::{HandlerExtSerde, display_serializable};
use crate::util::serde::{display_serializable, HandlerExtSerde};
use crate::util::sync::{SyncMutex, Watch};
use crate::util::Invoke;
pub fn gateway_api<C: Context>() -> ParentHandler<C> {
ParentHandler::new()
.subcommand(
"list",
from_fn_async(list_interfaces)
.custom_ts("{}".into(), BTreeMap::<GatewayId, NetworkInterfaceInfo>::inline())
.custom_ts("{}".into(), BTreeMap::<GatewayId, NetworkInterfaceInfo>::inline_flattened())
.with_display_serializable()
.with_custom_display_fn(|HandlerArgs { params, .. }, res| {
use prettytable::*;
@@ -131,7 +131,6 @@ async fn list_interfaces(
}
#[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)]
#[ts(export)]
struct NetworkInterfaceSetPublicParams {
gateway: GatewayId,
public: Option<bool>,
@@ -148,7 +147,6 @@ async fn set_public(
}
#[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)]
#[ts(export)]
struct UnsetPublicParams {
gateway: GatewayId,
}
@@ -164,7 +162,6 @@ async fn unset_public(
}
#[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)]
#[ts(export)]
struct ForgetGatewayParams {
gateway: GatewayId,
}
@@ -177,7 +174,6 @@ async fn forget_iface(
}
#[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)]
#[ts(export)]
struct RenameGatewayParams {
id: GatewayId,
name: InternedString,
@@ -405,6 +401,12 @@ async fn watcher(
) {
loop {
let res: Result<(), Error> = async {
Command::new("systemctl")
.arg("start")
.arg("NetworkManager")
.invoke(ErrorKind::Network)
.await?;
let connection = Connection::system().await?;
let netman_proxy = NetworkManagerProxy::new(&connection).await?;
@@ -436,49 +438,60 @@ async fn watcher(
loop {
until
.run(async {
let devices = netman_proxy.all_devices().await?;
let mut ifaces = BTreeSet::new();
let mut jobs = Vec::new();
for device in devices {
use futures::future::Either;
let device_proxy =
device::DeviceProxy::new(&connection, device.clone()).await?;
let iface = InternedString::intern(device_proxy.ip_interface().await?);
if iface.is_empty() {
loop {
let devices = netman_proxy.all_devices().await?;
if devices.is_empty() {
tracing::warn!(
"NetworkManager returned no devices. Trying again..."
);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
let iface: GatewayId = iface.into();
if watch_activation.peek(|a| a.contains_key(&iface)) {
jobs.push(Either::Left(watch_activated(
let mut ifaces = BTreeSet::new();
let mut jobs = Vec::new();
for device in devices {
use futures::future::Either;
let device_proxy =
device::DeviceProxy::new(&connection, device.clone()).await?;
let iface =
InternedString::intern(device_proxy.ip_interface().await?);
if iface.is_empty() {
continue;
}
let iface: GatewayId = iface.into();
if watch_activation.peek(|a| a.contains_key(&iface)) {
jobs.push(Either::Left(watch_activated(
&connection,
device_proxy.clone(),
iface.clone(),
&watch_activation,
)));
}
jobs.push(Either::Right(watch_ip(
&connection,
device_proxy.clone(),
iface.clone(),
&watch_activation,
&watch_ip_info,
)));
ifaces.insert(iface);
}
jobs.push(Either::Right(watch_ip(
&connection,
device_proxy.clone(),
iface.clone(),
&watch_ip_info,
)));
ifaces.insert(iface);
}
watch_ip_info.send_if_modified(|m| {
let mut changed = false;
for (iface, info) in OrdMapIterMut::from(m) {
if !ifaces.contains(iface) {
info.ip_info = None;
changed = true;
watch_ip_info.send_if_modified(|m| {
let mut changed = false;
for (iface, info) in OrdMapIterMut::from(m) {
if !ifaces.contains(iface) {
info.ip_info = None;
changed = true;
}
}
}
changed
});
futures::future::try_join_all(jobs).await?;
changed
});
futures::future::try_join_all(jobs).await?;
break;
}
Ok::<_, Error>(())
})
.await?;