From 53dff95365933d484ddf5d746619f523b96c5468 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Tue, 24 Mar 2026 11:13:59 -0600 Subject: [PATCH] revert: remove websocket shutdown signal from RpcContinuations --- core/src/context/diagnostic.rs | 2 +- core/src/context/init.rs | 2 +- core/src/context/rpc.rs | 2 +- core/src/context/setup.rs | 2 +- core/src/registry/context.rs | 2 +- core/src/rpc_continuations.rs | 37 +++++++++------------------------- core/src/tunnel/context.rs | 2 +- 7 files changed, 16 insertions(+), 33 deletions(-) diff --git a/core/src/context/diagnostic.rs b/core/src/context/diagnostic.rs index bf27da071..c069d017f 100644 --- a/core/src/context/diagnostic.rs +++ b/core/src/context/diagnostic.rs @@ -39,7 +39,7 @@ impl DiagnosticContext { shutdown, disk_guid, error: Arc::new(error.into()), - rpc_continuations: RpcContinuations::new(None), + rpc_continuations: RpcContinuations::new(), }))) } } diff --git a/core/src/context/init.rs b/core/src/context/init.rs index 5f6c35222..b7d5eac6a 100644 --- a/core/src/context/init.rs +++ b/core/src/context/init.rs @@ -32,7 +32,7 @@ impl InitContext { error: watch::channel(None).0, progress, shutdown, - rpc_continuations: RpcContinuations::new(None), + rpc_continuations: RpcContinuations::new(), }))) } } diff --git a/core/src/context/rpc.rs b/core/src/context/rpc.rs index dbb886f3c..3ba8d7a07 100644 --- a/core/src/context/rpc.rs +++ b/core/src/context/rpc.rs @@ -339,7 +339,7 @@ impl RpcContext { services, cancellable_installs: SyncMutex::new(BTreeMap::new()), metrics_cache, - rpc_continuations: RpcContinuations::new(Some(shutdown.clone())), + rpc_continuations: RpcContinuations::new(), shutdown, lxc_manager: Arc::new(LxcManager::new()), open_authed_continuations: OpenAuthedContinuations::new(), diff --git a/core/src/context/setup.rs b/core/src/context/setup.rs index 3d16624ef..d4d0bb9de 100644 --- a/core/src/context/setup.rs +++ b/core/src/context/setup.rs @@ -85,7 +85,7 @@ impl SetupContext { result: OnceCell::new(), disk_guid: OnceCell::new(), shutdown, - rpc_continuations: RpcContinuations::new(None), + rpc_continuations: RpcContinuations::new(), install_rootfs: SyncMutex::new(None), language: SyncMutex::new(None), keyboard: SyncMutex::new(None), diff --git a/core/src/registry/context.rs b/core/src/registry/context.rs index 8d0839cd8..02069caaa 100644 --- a/core/src/registry/context.rs +++ b/core/src/registry/context.rs @@ -142,7 +142,7 @@ impl RegistryContext { listen: config.registry_listen.unwrap_or(DEFAULT_REGISTRY_LISTEN), db, datadir, - rpc_continuations: RpcContinuations::new(None), + rpc_continuations: RpcContinuations::new(), client: Client::builder() .proxy(Proxy::custom(move |url| { if url.host_str().map_or(false, |h| h.ends_with(".onion")) { diff --git a/core/src/rpc_continuations.rs b/core/src/rpc_continuations.rs index e084264ab..42c3ae858 100644 --- a/core/src/rpc_continuations.rs +++ b/core/src/rpc_continuations.rs @@ -17,7 +17,6 @@ use ts_rs::TS; #[allow(unused_imports)] use crate::prelude::*; -use crate::shutdown::Shutdown; use crate::util::future::TimedResource; use crate::util::net::WebSocket; use crate::util::{FromStrParser, new_guid}; @@ -99,15 +98,12 @@ pub type RestHandler = Box RestFuture + Send>; pub struct WebSocketFuture { kill: Option>, - shutdown: Option>>, fut: BoxFuture<'static, ()>, } impl Future for WebSocketFuture { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.kill.as_ref().map_or(false, |k| !k.is_empty()) - || self.shutdown.as_ref().map_or(false, |s| !s.is_empty()) - { + if self.kill.as_ref().map_or(false, |k| !k.is_empty()) { Poll::Ready(()) } else { self.fut.poll_unpin(cx) @@ -142,7 +138,6 @@ impl RpcContinuation { RpcContinuation::WebSocket(TimedResource::new( Box::new(|ws| WebSocketFuture { kill: None, - shutdown: None, fut: handler(ws.into()).boxed(), }), timeout, @@ -175,7 +170,6 @@ impl RpcContinuation { RpcContinuation::WebSocket(TimedResource::new( Box::new(|ws| WebSocketFuture { kill, - shutdown: None, fut: handler(ws.into()).boxed(), }), timeout, @@ -189,21 +183,15 @@ impl RpcContinuation { } } -pub struct RpcContinuations { - continuations: AsyncMutex>, - shutdown: Option>>, -} +pub struct RpcContinuations(AsyncMutex>); impl RpcContinuations { - pub fn new(shutdown: Option>>) -> Self { - RpcContinuations { - continuations: AsyncMutex::new(BTreeMap::new()), - shutdown, - } + pub fn new() -> Self { + RpcContinuations(AsyncMutex::new(BTreeMap::new())) } #[instrument(skip_all)] pub async fn clean(&self) { - let mut continuations = self.continuations.lock().await; + let mut continuations = self.0.lock().await; let mut to_remove = Vec::new(); for (guid, cont) in &*continuations { if cont.is_timed_out() { @@ -218,28 +206,23 @@ impl RpcContinuations { #[instrument(skip_all)] pub async fn add(&self, guid: Guid, handler: RpcContinuation) { self.clean().await; - self.continuations.lock().await.insert(guid, handler); + self.0.lock().await.insert(guid, handler); } pub async fn get_ws_handler(&self, guid: &Guid) -> Option { - let mut continuations = self.continuations.lock().await; + let mut continuations = self.0.lock().await; if !matches!(continuations.get(guid), Some(RpcContinuation::WebSocket(_))) { return None; } let Some(RpcContinuation::WebSocket(x)) = continuations.remove(guid) else { return None; }; - let handler = x.get().await?; - let shutdown = self.shutdown.as_ref().map(|s| s.subscribe()); - Some(Box::new(move |ws| { - let mut fut = handler(ws); - fut.shutdown = shutdown; - fut - })) + x.get().await } pub async fn get_rest_handler(&self, guid: &Guid) -> Option { - let mut continuations = self.continuations.lock().await; + let mut continuations: tokio::sync::MutexGuard<'_, BTreeMap> = + self.0.lock().await; if !matches!(continuations.get(guid), Some(RpcContinuation::Rest(_))) { return None; } diff --git a/core/src/tunnel/context.rs b/core/src/tunnel/context.rs index 67e7233f2..df6d2d888 100644 --- a/core/src/tunnel/context.rs +++ b/core/src/tunnel/context.rs @@ -202,7 +202,7 @@ impl TunnelContext { listen, db, datadir, - rpc_continuations: RpcContinuations::new(None), + rpc_continuations: RpcContinuations::new(), open_authed_continuations: OpenAuthedContinuations::new(), ephemeral_sessions: SyncMutex::new(Sessions::new()), net_iface,