mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-03-26 10:21:52 +00:00
fix: switch BackgroundJobRunner from Vec to FuturesUnordered
BackgroundJobRunner stored active jobs in a Vec<BoxFuture> and polled ALL of them on every wakeup — O(n) per poll. Since this runs in the same tokio::select! as the WebServer accept loop, polling overhead from active connections directly delayed acceptance of new connections. FuturesUnordered only polls woken futures — O(woken) instead of O(n).
This commit is contained in:
11
TODO.md
11
TODO.md
@@ -171,17 +171,6 @@ Pending tasks for AI agents. Remove items when completed.
|
||||
| `sdk/base/lib/interfaces/Host.ts` | SDK `MultiHost.bindPort()` — no changes needed |
|
||||
| `core/src/db/model/public.rs` | Public DB model — port forward mapping |
|
||||
|
||||
- [ ] Switch `BackgroundJobRunner` from `Vec<BoxFuture>` to `FuturesUnordered` - @dr-bonez
|
||||
|
||||
**Problem**: `BackgroundJobRunner` (in `core/src/util/actor/background.rs`) stores active jobs in a
|
||||
`Vec<BoxFuture>` and polls ALL of them on every wakeup — O(n) per poll. This runs inside the same
|
||||
`tokio::select!` as the WebServer accept loop (`core/src/net/web_server.rs:502`), so polling overhead
|
||||
from active connections directly delays acceptance of new connections.
|
||||
|
||||
**Fix**: Replace `jobs: Vec<BoxFuture<'static, ()>>` with `jobs: FuturesUnordered<BoxFuture<'static, ()>>`.
|
||||
`FuturesUnordered` only polls woken futures — O(woken) per poll instead of O(n). The poll loop changes
|
||||
from `iter_mut().filter_map(poll_unpin)` + `swap_remove` to `while poll_next_unpin().is_ready() {}`.
|
||||
|
||||
- [ ] Extract TS-exported types into a lightweight sub-crate for fast binding generation
|
||||
|
||||
**Problem**: `make ts-bindings` compiles the entire `start-os` crate (with all dependencies: tokio,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{Future, FutureExt};
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::{Future, FutureExt, StreamExt};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -11,7 +12,7 @@ impl BackgroundJobQueue {
|
||||
Self(send),
|
||||
BackgroundJobRunner {
|
||||
recv,
|
||||
jobs: Vec::new(),
|
||||
jobs: FuturesUnordered::new(),
|
||||
},
|
||||
)
|
||||
}
|
||||
@@ -27,7 +28,7 @@ impl BackgroundJobQueue {
|
||||
|
||||
pub struct BackgroundJobRunner {
|
||||
recv: mpsc::UnboundedReceiver<BoxFuture<'static, ()>>,
|
||||
jobs: Vec<BoxFuture<'static, ()>>,
|
||||
jobs: FuturesUnordered<BoxFuture<'static, ()>>,
|
||||
}
|
||||
impl BackgroundJobRunner {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
@@ -43,19 +44,7 @@ impl Future for BackgroundJobRunner {
|
||||
while let std::task::Poll::Ready(Some(job)) = self.recv.poll_recv(cx) {
|
||||
self.jobs.push(job);
|
||||
}
|
||||
let complete = self
|
||||
.jobs
|
||||
.iter_mut()
|
||||
.enumerate()
|
||||
.filter_map(|(i, f)| match f.poll_unpin(cx) {
|
||||
std::task::Poll::Pending => None,
|
||||
std::task::Poll::Ready(_) => Some(i),
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
for idx in complete.into_iter().rev() {
|
||||
#[allow(clippy::let_underscore_future)]
|
||||
let _ = self.jobs.swap_remove(idx);
|
||||
}
|
||||
while let std::task::Poll::Ready(Some(())) = self.jobs.poll_next_unpin(cx) {}
|
||||
if self.jobs.is_empty() && self.recv.is_closed() {
|
||||
std::task::Poll::Ready(())
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user