mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-03-26 02:11:53 +00:00
fix: mitigate tokio I/O driver starvation (tokio-rs/tokio#4730)
Tokio's multi-thread scheduler has an unfixed vulnerability where all worker threads can end up parked on condvars with no worker driving the I/O reactor. Condvar-parked workers have no timeout and sleep indefinitely, so once in this state the runtime never recovers. This was observed on a box migrating from 0.3.5.1: after heavy task churn (package reinstalls, container operations, logging) all 16 workers ended up on futex_wait with no thread on epoll_wait. The web server listened on both HTTP and HTTPS but never replied. The box was stuck for 7+ hours with 0% CPU. Two mitigations: 1. Watchdog OS thread (startd.rs): a plain std::thread that every 30s injects a no-op task via Handle::spawn. This forces a condvar-parked worker to wake, cycle through park, and grab the driver TryLock — breaking the stall regardless of what triggered it. 2. block_in_place in the logger (logger.rs): the TeeWriter holds a std::sync::Mutex across blocking file + stderr writes on worker threads. Wrapping in block_in_place tells tokio to hand off driver duties before the worker blocks, reducing the window for starvation. Guarded by runtime_flavor() to avoid panicking on current-thread runtimes used by the CLI.
This commit is contained in:
@@ -148,6 +148,15 @@ pub fn main(args: impl IntoIterator<Item = OsString>) {
|
|||||||
.build()
|
.build()
|
||||||
.expect(&t!("bins.startd.failed-to-initialize-runtime"));
|
.expect(&t!("bins.startd.failed-to-initialize-runtime"));
|
||||||
let res = rt.block_on(async {
|
let res = rt.block_on(async {
|
||||||
|
// Periodically wake a worker thread from a non-tokio OS thread to
|
||||||
|
// prevent tokio I/O driver starvation (all workers parked on
|
||||||
|
// condvar with no driver). See tokio-rs/tokio#4730.
|
||||||
|
let rt_handle = tokio::runtime::Handle::current();
|
||||||
|
std::thread::spawn(move || loop {
|
||||||
|
std::thread::sleep(Duration::from_secs(30));
|
||||||
|
rt_handle.spawn(async {});
|
||||||
|
});
|
||||||
|
|
||||||
let mut server = WebServer::new(Acceptor::new(WildcardListener::new(80)?), refresher());
|
let mut server = WebServer::new(Acceptor::new(WildcardListener::new(80)?), refresher());
|
||||||
match inner_main(&mut server, &config).await {
|
match inner_main(&mut server, &config).await {
|
||||||
Ok(a) => {
|
Ok(a) => {
|
||||||
|
|||||||
@@ -26,6 +26,30 @@ impl<'a> MakeWriter<'a> for LogFile {
|
|||||||
struct TeeWriter<'a>(MutexGuard<'a, Option<File>>);
|
struct TeeWriter<'a>(MutexGuard<'a, Option<File>>);
|
||||||
impl<'a> Write for TeeWriter<'a> {
|
impl<'a> Write for TeeWriter<'a> {
|
||||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
|
// Blocking file+stderr I/O on a tokio worker thread can
|
||||||
|
// starve the I/O driver (tokio-rs/tokio#4730).
|
||||||
|
// block_in_place tells the runtime to hand off driver
|
||||||
|
// duties before we block. Only available on the
|
||||||
|
// multi-thread runtime; falls back to a direct write on
|
||||||
|
// current-thread runtimes (CLI) or outside a runtime.
|
||||||
|
if matches!(
|
||||||
|
tokio::runtime::Handle::try_current().map(|h| h.runtime_flavor()),
|
||||||
|
Ok(tokio::runtime::RuntimeFlavor::MultiThread),
|
||||||
|
) {
|
||||||
|
tokio::task::block_in_place(|| self.write_inner(buf))
|
||||||
|
} else {
|
||||||
|
self.write_inner(buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn flush(&mut self) -> io::Result<()> {
|
||||||
|
if let Some(f) = &mut *self.0 {
|
||||||
|
f.flush()?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<'a> TeeWriter<'a> {
|
||||||
|
fn write_inner(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
let n = if let Some(f) = &mut *self.0 {
|
let n = if let Some(f) = &mut *self.0 {
|
||||||
f.write(buf)?
|
f.write(buf)?
|
||||||
} else {
|
} else {
|
||||||
@@ -34,12 +58,6 @@ impl<'a> MakeWriter<'a> for LogFile {
|
|||||||
io::stderr().write_all(&buf[..n])?;
|
io::stderr().write_all(&buf[..n])?;
|
||||||
Ok(n)
|
Ok(n)
|
||||||
}
|
}
|
||||||
fn flush(&mut self) -> io::Result<()> {
|
|
||||||
if let Some(f) = &mut *self.0 {
|
|
||||||
f.flush()?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Box::new(TeeWriter(f))
|
Box::new(TeeWriter(f))
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Reference in New Issue
Block a user