diff --git a/TODO.md b/TODO.md index 6f6a6866f..d7a06bb6d 100644 --- a/TODO.md +++ b/TODO.md @@ -171,17 +171,6 @@ Pending tasks for AI agents. Remove items when completed. | `sdk/base/lib/interfaces/Host.ts` | SDK `MultiHost.bindPort()` — no changes needed | | `core/src/db/model/public.rs` | Public DB model — port forward mapping | -- [ ] Switch `BackgroundJobRunner` from `Vec` to `FuturesUnordered` - @dr-bonez - - **Problem**: `BackgroundJobRunner` (in `core/src/util/actor/background.rs`) stores active jobs in a - `Vec` and polls ALL of them on every wakeup — O(n) per poll. This runs inside the same - `tokio::select!` as the WebServer accept loop (`core/src/net/web_server.rs:502`), so polling overhead - from active connections directly delays acceptance of new connections. - - **Fix**: Replace `jobs: Vec>` with `jobs: FuturesUnordered>`. - `FuturesUnordered` only polls woken futures — O(woken) per poll instead of O(n). The poll loop changes - from `iter_mut().filter_map(poll_unpin)` + `swap_remove` to `while poll_next_unpin().is_ready() {}`. - - [ ] Extract TS-exported types into a lightweight sub-crate for fast binding generation **Problem**: `make ts-bindings` compiles the entire `start-os` crate (with all dependencies: tokio, diff --git a/core/src/util/actor/background.rs b/core/src/util/actor/background.rs index 87a535f87..46147ca5c 100644 --- a/core/src/util/actor/background.rs +++ b/core/src/util/actor/background.rs @@ -1,5 +1,6 @@ use futures::future::BoxFuture; -use futures::{Future, FutureExt}; +use futures::stream::FuturesUnordered; +use futures::{Future, FutureExt, StreamExt}; use tokio::sync::mpsc; #[derive(Clone)] @@ -11,7 +12,7 @@ impl BackgroundJobQueue { Self(send), BackgroundJobRunner { recv, - jobs: Vec::new(), + jobs: FuturesUnordered::new(), }, ) } @@ -27,7 +28,7 @@ impl BackgroundJobQueue { pub struct BackgroundJobRunner { recv: mpsc::UnboundedReceiver>, - jobs: Vec>, + jobs: FuturesUnordered>, } impl BackgroundJobRunner { pub fn is_empty(&self) -> bool { @@ -43,19 +44,7 @@ impl Future for BackgroundJobRunner { while let std::task::Poll::Ready(Some(job)) = self.recv.poll_recv(cx) { self.jobs.push(job); } - let complete = self - .jobs - .iter_mut() - .enumerate() - .filter_map(|(i, f)| match f.poll_unpin(cx) { - std::task::Poll::Pending => None, - std::task::Poll::Ready(_) => Some(i), - }) - .collect::>(); - for idx in complete.into_iter().rev() { - #[allow(clippy::let_underscore_future)] - let _ = self.jobs.swap_remove(idx); - } + while let std::task::Poll::Ready(Some(())) = self.jobs.poll_next_unpin(cx) {} if self.jobs.is_empty() && self.recv.is_closed() { std::task::Poll::Ready(()) } else {