revert: remove websocket shutdown signal from RpcContinuations

This commit is contained in:
Aiden McClelland
2026-03-24 11:13:59 -06:00
parent 7f6abf2a80
commit 53dff95365
7 changed files with 16 additions and 33 deletions

View File

@@ -39,7 +39,7 @@ impl DiagnosticContext {
shutdown,
disk_guid,
error: Arc::new(error.into()),
rpc_continuations: RpcContinuations::new(None),
rpc_continuations: RpcContinuations::new(),
})))
}
}

View File

@@ -32,7 +32,7 @@ impl InitContext {
error: watch::channel(None).0,
progress,
shutdown,
rpc_continuations: RpcContinuations::new(None),
rpc_continuations: RpcContinuations::new(),
})))
}
}

View File

@@ -339,7 +339,7 @@ impl RpcContext {
services,
cancellable_installs: SyncMutex::new(BTreeMap::new()),
metrics_cache,
rpc_continuations: RpcContinuations::new(Some(shutdown.clone())),
rpc_continuations: RpcContinuations::new(),
shutdown,
lxc_manager: Arc::new(LxcManager::new()),
open_authed_continuations: OpenAuthedContinuations::new(),

View File

@@ -85,7 +85,7 @@ impl SetupContext {
result: OnceCell::new(),
disk_guid: OnceCell::new(),
shutdown,
rpc_continuations: RpcContinuations::new(None),
rpc_continuations: RpcContinuations::new(),
install_rootfs: SyncMutex::new(None),
language: SyncMutex::new(None),
keyboard: SyncMutex::new(None),

View File

@@ -142,7 +142,7 @@ impl RegistryContext {
listen: config.registry_listen.unwrap_or(DEFAULT_REGISTRY_LISTEN),
db,
datadir,
rpc_continuations: RpcContinuations::new(None),
rpc_continuations: RpcContinuations::new(),
client: Client::builder()
.proxy(Proxy::custom(move |url| {
if url.host_str().map_or(false, |h| h.ends_with(".onion")) {

View File

@@ -17,7 +17,6 @@ use ts_rs::TS;
#[allow(unused_imports)]
use crate::prelude::*;
use crate::shutdown::Shutdown;
use crate::util::future::TimedResource;
use crate::util::net::WebSocket;
use crate::util::{FromStrParser, new_guid};
@@ -99,15 +98,12 @@ pub type RestHandler = Box<dyn FnOnce(Request) -> RestFuture + Send>;
pub struct WebSocketFuture {
kill: Option<broadcast::Receiver<()>>,
shutdown: Option<broadcast::Receiver<Option<Shutdown>>>,
fut: BoxFuture<'static, ()>,
}
impl Future for WebSocketFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.kill.as_ref().map_or(false, |k| !k.is_empty())
|| self.shutdown.as_ref().map_or(false, |s| !s.is_empty())
{
if self.kill.as_ref().map_or(false, |k| !k.is_empty()) {
Poll::Ready(())
} else {
self.fut.poll_unpin(cx)
@@ -142,7 +138,6 @@ impl RpcContinuation {
RpcContinuation::WebSocket(TimedResource::new(
Box::new(|ws| WebSocketFuture {
kill: None,
shutdown: None,
fut: handler(ws.into()).boxed(),
}),
timeout,
@@ -175,7 +170,6 @@ impl RpcContinuation {
RpcContinuation::WebSocket(TimedResource::new(
Box::new(|ws| WebSocketFuture {
kill,
shutdown: None,
fut: handler(ws.into()).boxed(),
}),
timeout,
@@ -189,21 +183,15 @@ impl RpcContinuation {
}
}
pub struct RpcContinuations {
continuations: AsyncMutex<BTreeMap<Guid, RpcContinuation>>,
shutdown: Option<broadcast::Sender<Option<Shutdown>>>,
}
pub struct RpcContinuations(AsyncMutex<BTreeMap<Guid, RpcContinuation>>);
impl RpcContinuations {
pub fn new(shutdown: Option<broadcast::Sender<Option<Shutdown>>>) -> Self {
RpcContinuations {
continuations: AsyncMutex::new(BTreeMap::new()),
shutdown,
}
pub fn new() -> Self {
RpcContinuations(AsyncMutex::new(BTreeMap::new()))
}
#[instrument(skip_all)]
pub async fn clean(&self) {
let mut continuations = self.continuations.lock().await;
let mut continuations = self.0.lock().await;
let mut to_remove = Vec::new();
for (guid, cont) in &*continuations {
if cont.is_timed_out() {
@@ -218,28 +206,23 @@ impl RpcContinuations {
#[instrument(skip_all)]
pub async fn add(&self, guid: Guid, handler: RpcContinuation) {
self.clean().await;
self.continuations.lock().await.insert(guid, handler);
self.0.lock().await.insert(guid, handler);
}
pub async fn get_ws_handler(&self, guid: &Guid) -> Option<WebSocketHandler> {
let mut continuations = self.continuations.lock().await;
let mut continuations = self.0.lock().await;
if !matches!(continuations.get(guid), Some(RpcContinuation::WebSocket(_))) {
return None;
}
let Some(RpcContinuation::WebSocket(x)) = continuations.remove(guid) else {
return None;
};
let handler = x.get().await?;
let shutdown = self.shutdown.as_ref().map(|s| s.subscribe());
Some(Box::new(move |ws| {
let mut fut = handler(ws);
fut.shutdown = shutdown;
fut
}))
x.get().await
}
pub async fn get_rest_handler(&self, guid: &Guid) -> Option<RestHandler> {
let mut continuations = self.continuations.lock().await;
let mut continuations: tokio::sync::MutexGuard<'_, BTreeMap<Guid, RpcContinuation>> =
self.0.lock().await;
if !matches!(continuations.get(guid), Some(RpcContinuation::Rest(_))) {
return None;
}

View File

@@ -202,7 +202,7 @@ impl TunnelContext {
listen,
db,
datadir,
rpc_continuations: RpcContinuations::new(None),
rpc_continuations: RpcContinuations::new(),
open_authed_continuations: OpenAuthedContinuations::new(),
ephemeral_sessions: SyncMutex::new(Sessions::new()),
net_iface,