enabling support for wireguard and firewall (#2713)

* wip: enabling support for wireguard and firewall

* wip

* wip

* wip

* wip

* wip

* implement some things

* fix warning

* wip

* alpha.23

* misc fixes

* remove ufw since no longer required

* remove debug info

* add cli bindings

* debugging

* fixes

* individualized acme and privacy settings for domains and bindings

* sdk version bump

* migration

* misc fixes

* refactor Host::update

* debug info

* refactor webserver

* misc fixes

* misc fixes

* refactor port forwarding

* recheck interfaces every 5 min if no dbus event

* misc fixes and cleanup

* misc fixes
This commit is contained in:
Aiden McClelland
2025-01-09 16:34:34 -07:00
committed by GitHub
parent 45ca9405d3
commit 29e8210782
144 changed files with 4878 additions and 2398 deletions

View File

@@ -1,134 +1,299 @@
use std::convert::Infallible;
use std::future::Future;
use std::net::SocketAddr;
use std::ops::Deref;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
use std::task::Poll;
use std::time::Duration;
use axum::extract::Request;
use axum::Router;
use axum_server::Handle;
use bytes::Bytes;
use futures::future::{ready, BoxFuture};
use futures::future::{BoxFuture, Either};
use futures::FutureExt;
use helpers::NonDetachingJoinHandle;
use hyper_util::rt::{TokioIo, TokioTimer};
use hyper_util::service::TowerToHyperService;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{oneshot, watch};
use crate::context::{DiagnosticContext, InitContext, InstallContext, RpcContext, SetupContext};
use crate::net::network_interface::NetworkInterfaceListener;
use crate::net::static_server::{
diagnostic_ui_router, init_ui_router, install_ui_router, main_ui_router, refresher,
diagnostic_ui_router, init_ui_router, install_ui_router, main_ui_router, redirecter, refresher,
setup_ui_router,
};
use crate::prelude::*;
use crate::util::actor::background::BackgroundJobQueue;
#[derive(Clone)]
pub struct SwappableRouter(watch::Sender<Router>);
impl SwappableRouter {
pub fn new(router: Router) -> Self {
Self(watch::channel(router).0)
}
pub fn swap(&self, router: Router) {
let _ = self.0.send_replace(router);
}
pub struct Accepted {
pub https_redirect: bool,
pub stream: TcpStream,
}
pub struct SwappableRouterService {
router: watch::Receiver<Router>,
changed: Option<BoxFuture<'static, ()>>,
pub trait Accept {
fn poll_accept(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<Accepted, Error>>;
}
impl SwappableRouterService {
fn router(&self) -> Router {
self.router.borrow().clone()
}
fn changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> {
let mut changed = if let Some(changed) = self.changed.take() {
changed
} else {
let mut router = self.router.clone();
async move {
router.changed().await;
impl Accept for Vec<TcpListener> {
fn poll_accept(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<Accepted, Error>> {
for listener in &*self {
if let Poll::Ready((stream, _)) = listener.poll_accept(cx)? {
return Poll::Ready(Ok(Accepted {
https_redirect: false,
stream,
}));
}
.boxed()
};
if changed.poll_unpin(cx).is_ready() {
return Poll::Ready(());
}
self.changed = Some(changed);
Poll::Pending
}
}
impl Clone for SwappableRouterService {
fn clone(&self) -> Self {
impl Accept for NetworkInterfaceListener {
fn poll_accept(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<Accepted, Error>> {
NetworkInterfaceListener::poll_accept(self, cx, true).map(|res| {
res.map(|a| Accepted {
https_redirect: a.is_public,
stream: a.stream,
})
})
}
}
impl<A: Accept, B: Accept> Accept for Either<A, B> {
fn poll_accept(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<Accepted, Error>> {
match self {
Either::Left(a) => a.poll_accept(cx),
Either::Right(b) => b.poll_accept(cx),
}
}
}
impl<A: Accept> Accept for Option<A> {
fn poll_accept(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<Accepted, Error>> {
match self {
None => Poll::Pending,
Some(a) => a.poll_accept(cx),
}
}
}
#[pin_project::pin_project]
pub struct Acceptor<A: Accept> {
acceptor: (watch::Sender<A>, watch::Receiver<A>),
changed: Option<BoxFuture<'static, ()>>,
}
impl<A: Accept + Send + Sync + 'static> Acceptor<A> {
pub fn new(acceptor: A) -> Self {
Self {
router: self.router.clone(),
acceptor: watch::channel(acceptor),
changed: None,
}
}
}
impl<B> tower_service::Service<Request<B>> for SwappableRouterService
where
B: axum::body::HttpBody<Data = Bytes> + Send + 'static,
B::Error: Into<axum::BoxError>,
{
type Response = <Router as tower_service::Service<Request<B>>>::Response;
type Error = <Router as tower_service::Service<Request<B>>>::Error;
type Future = <Router as tower_service::Service<Request<B>>>::Future;
#[inline]
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.changed(cx).is_ready() {
return Poll::Ready(Ok(()));
fn poll_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> {
let mut changed = if let Some(changed) = self.changed.take() {
changed
} else {
let mut recv = self.acceptor.1.clone();
async move {
let _ = recv.changed().await;
}
.boxed()
};
let res = changed.poll_unpin(cx);
if res.is_pending() {
self.changed = Some(changed);
}
tower_service::Service::<Request<B>>::poll_ready(&mut self.router(), cx)
res
}
fn call(&mut self, req: Request<B>) -> Self::Future {
self.router().call(req)
fn poll_accept(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<Accepted, Error>> {
let _ = self.poll_changed(cx);
let mut res = Poll::Pending;
self.acceptor.0.send_if_modified(|a| {
res = a.poll_accept(cx);
false
});
res
}
async fn accept(&mut self) -> Result<Accepted, Error> {
std::future::poll_fn(|cx| self.poll_accept(cx)).await
}
}
impl Acceptor<Vec<TcpListener>> {
pub async fn bind(listen: impl IntoIterator<Item = SocketAddr>) -> Result<Self, Error> {
Ok(Self::new(
futures::future::try_join_all(listen.into_iter().map(TcpListener::bind)).await?,
))
}
}
impl<T> tower_service::Service<T> for SwappableRouter {
type Response = SwappableRouterService;
type Error = Infallible;
type Future = futures::future::Ready<Result<Self::Response, Self::Error>>;
#[inline]
fn poll_ready(
&mut self,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: T) -> Self::Future {
ready(Ok(SwappableRouterService {
router: self.0.subscribe(),
changed: None,
}))
pub type UpgradableListener = Option<Either<Vec<TcpListener>, NetworkInterfaceListener>>;
impl Acceptor<UpgradableListener> {
pub async fn bind_upgradable(
listen: impl IntoIterator<Item = SocketAddr>,
) -> Result<Self, Error> {
Ok(Self::new(Some(Either::Left(
futures::future::try_join_all(listen.into_iter().map(TcpListener::bind)).await?,
))))
}
}
pub struct WebServer {
pub struct WebServerAcceptorSetter<A: Accept> {
acceptor: watch::Sender<A>,
}
impl<A: Accept, B: Accept> WebServerAcceptorSetter<Option<Either<A, B>>> {
pub fn try_upgrade<F: FnOnce(A) -> Result<B, Error>>(&self, f: F) -> Result<(), Error> {
let mut res = Ok(());
self.acceptor.send_modify(|a| {
*a = match a.take() {
Some(Either::Left(a)) => match f(a) {
Ok(b) => Some(Either::Right(b)),
Err(e) => {
res = Err(e);
None
}
},
x => x,
}
});
res
}
}
impl<A: Accept> Deref for WebServerAcceptorSetter<A> {
type Target = watch::Sender<A>;
fn deref(&self) -> &Self::Target {
&self.acceptor
}
}
pub struct WebServer<A: Accept> {
shutdown: oneshot::Sender<()>,
router: SwappableRouter,
router: watch::Sender<Option<Router>>,
acceptor: watch::Sender<A>,
thread: NonDetachingJoinHandle<()>,
}
impl WebServer {
pub fn new(bind: SocketAddr) -> Self {
let router = SwappableRouter::new(refresher());
let thread_router = router.clone();
impl<A: Accept + Send + Sync + 'static> WebServer<A> {
pub fn acceptor_setter(&self) -> WebServerAcceptorSetter<A> {
WebServerAcceptorSetter {
acceptor: self.acceptor.clone(),
}
}
pub fn new(mut acceptor: Acceptor<A>) -> Self {
let acceptor_send = acceptor.acceptor.0.clone();
let (router, service) = watch::channel::<Option<Router>>(None);
let (shutdown, shutdown_recv) = oneshot::channel();
let thread = NonDetachingJoinHandle::from(tokio::spawn(async move {
let handle = Handle::new();
let mut server = axum_server::bind(bind).handle(handle.clone());
server.http_builder().http1().preserve_header_case(true);
server.http_builder().http1().title_case_headers(true);
#[derive(Clone)]
struct QueueRunner {
queue: Arc<RwLock<Option<BackgroundJobQueue>>>,
}
impl<Fut> hyper::rt::Executor<Fut> for QueueRunner
where
Fut: Future + Send + 'static,
{
fn execute(&self, fut: Fut) {
if let Some(q) = &*self.queue.read().unwrap() {
q.add_job(fut);
} else {
tracing::warn!("job queued after shutdown");
}
}
}
if let (Err(e), _) = tokio::join!(server.serve(thread_router), async {
let _ = shutdown_recv.await;
handle.graceful_shutdown(Some(Duration::from_secs(0)));
}) {
tracing::error!("Spawning hyper server error: {}", e);
let accept = AtomicBool::new(true);
let queue_cell = Arc::new(RwLock::new(None));
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
let mut server = hyper_util::server::conn::auto::Builder::new(QueueRunner {
queue: queue_cell.clone(),
});
server
.http1()
.timer(TokioTimer::new())
.title_case_headers(true)
.preserve_header_case(true)
.http2()
.timer(TokioTimer::new())
.enable_connect_protocol()
.keep_alive_interval(Duration::from_secs(60))
.keep_alive_timeout(Duration::from_secs(300));
let (queue, mut runner) = BackgroundJobQueue::new();
*queue_cell.write().unwrap() = Some(queue.clone());
let handler = async {
loop {
if let Err(e) = async {
let accepted = acceptor.accept().await?;
if accepted.https_redirect {
queue.add_job(
graceful.watch(
server
.serve_connection_with_upgrades(
TokioIo::new(accepted.stream),
TowerToHyperService::new(redirecter().into_service()),
)
.into_owned(),
),
);
} else {
let service = { service.borrow().clone() };
if let Some(service) = service {
queue.add_job(
graceful.watch(
server
.serve_connection_with_upgrades(
TokioIo::new(accepted.stream),
TowerToHyperService::new(service.into_service()),
)
.into_owned(),
),
);
} else {
queue.add_job(
graceful.watch(
server
.serve_connection_with_upgrades(
TokioIo::new(accepted.stream),
TowerToHyperService::new(
refresher().into_service(),
),
)
.into_owned(),
),
);
}
}
Ok::<_, Error>(())
}
.await
{
tracing::error!("Error accepting HTTP connection: {e}");
tracing::debug!("{e:?}");
}
}
}
.boxed();
tokio::select! {
_ = shutdown_recv => (),
_ = handler => (),
_ = &mut runner => (),
}
accept.store(false, std::sync::atomic::Ordering::SeqCst);
drop(queue);
drop(queue_cell.write().unwrap().take());
if !runner.is_empty() {
runner.await;
}
}));
Self {
shutdown,
router,
thread,
acceptor: acceptor_send,
}
}
@@ -138,7 +303,7 @@ impl WebServer {
}
pub fn serve_router(&mut self, router: Router) {
self.router.swap(router)
self.router.send_replace(Some(router));
}
pub fn serve_main(&mut self, ctx: RpcContext) {