From 46179f5c83f0d7e6c274eb6c1cec606100a0434e Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Thu, 14 Nov 2024 11:31:47 -0700 Subject: [PATCH] attempt to fix webserver lockup (#2788) --- core/startos/Cargo.toml | 2 +- core/startos/src/net/web_server.rs | 50 ++++++++++++++++++++++++------ 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/core/startos/Cargo.toml b/core/startos/Cargo.toml index 3a9935373..df4ac2378 100644 --- a/core/startos/Cargo.toml +++ b/core/startos/Cargo.toml @@ -198,7 +198,7 @@ tokio-util = { version = "0.7.9", features = ["io"] } torut = { git = "https://github.com/Start9Labs/torut.git", branch = "update/dependencies", features = [ "serialize", ] } -tower-service = "0.3.2" +tower-service = "0.3.3" tracing = "0.1.39" tracing-error = "0.2.0" tracing-futures = "0.2.5" diff --git a/core/startos/src/net/web_server.rs b/core/startos/src/net/web_server.rs index a9cfdf046..d1ad64d01 100644 --- a/core/startos/src/net/web_server.rs +++ b/core/startos/src/net/web_server.rs @@ -7,7 +7,7 @@ use axum::extract::Request; use axum::Router; use axum_server::Handle; use bytes::Bytes; -use futures::future::ready; +use futures::future::{ready, BoxFuture}; use futures::FutureExt; use helpers::NonDetachingJoinHandle; use tokio::sync::{oneshot, watch}; @@ -30,8 +30,39 @@ impl SwappableRouter { } } -#[derive(Clone)] -pub struct SwappableRouterService(watch::Receiver); +pub struct SwappableRouterService { + router: watch::Receiver, + changed: Option>, +} +impl SwappableRouterService { + fn router(&self) -> Router { + self.router.borrow().clone() + } + fn changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> { + let mut changed = if let Some(changed) = self.changed.take() { + changed + } else { + let mut router = self.router.clone(); + async move { + router.changed().await; + } + .boxed() + }; + if changed.poll_unpin(cx).is_ready() { + return Poll::Ready(()); + } + self.changed = Some(changed); + Poll::Pending + } +} +impl Clone for SwappableRouterService { + fn clone(&self) -> Self { + Self { + router: self.router.clone(), + changed: None, + } + } +} impl tower_service::Service> for SwappableRouterService where B: axum::body::HttpBody + Send + 'static, @@ -42,15 +73,13 @@ where type Future = >>::Future; #[inline] fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { - let mut changed = self.0.changed().boxed(); - if changed.poll_unpin(cx).is_ready() { + if self.changed(cx).is_ready() { return Poll::Ready(Ok(())); } - drop(changed); - tower_service::Service::>::poll_ready(&mut self.0.borrow().clone(), cx) + tower_service::Service::>::poll_ready(&mut self.router(), cx) } fn call(&mut self, req: Request) -> Self::Future { - self.0.borrow().clone().call(req) + self.router().call(req) } } @@ -66,7 +95,10 @@ impl tower_service::Service for SwappableRouter { Poll::Ready(Ok(())) } fn call(&mut self, _: T) -> Self::Future { - ready(Ok(SwappableRouterService(self.0.subscribe()))) + ready(Ok(SwappableRouterService { + router: self.0.subscribe(), + changed: None, + })) } }