fix future busywait when job_source closed

This commit is contained in:
Aiden McClelland
2024-08-15 11:57:56 -06:00
parent f608480034
commit 60a974a29c

View File

@@ -1,4 +1,5 @@
use std::fmt::{Debug, Display}; use std::fmt::{Debug, Display};
use std::task::Waker;
use futures::future::{BoxFuture, FusedFuture}; use futures::future::{BoxFuture, FusedFuture};
use futures::stream::FusedStream; use futures::stream::FusedStream;
@@ -179,12 +180,14 @@ pub fn poll_select_all<'a, T>(
} }
pub struct JobRunner<'a, T> { pub struct JobRunner<'a, T> {
wakers: Vec<Waker>,
closed: bool, closed: bool,
running: Vec<BoxFuture<'a, T>>, running: Vec<BoxFuture<'a, T>>,
} }
impl<'a, T> JobRunner<'a, T> { impl<'a, T> JobRunner<'a, T> {
pub fn new() -> Self { pub fn new() -> Self {
JobRunner { JobRunner {
wakers: Vec::new(),
closed: false, closed: false,
running: Vec::new(), running: Vec::new(),
} }
@@ -196,12 +199,24 @@ impl<'a, T> JobRunner<'a, T> {
&mut self, &mut self,
job_source: &mut Src, job_source: &mut Src,
) -> Option<T> { ) -> Option<T> {
let mut job_source = Some(job_source);
loop { loop {
let next_job_fut = async {
if let Some(job_source) = &mut job_source {
job_source.next().await
} else {
futures::future::pending().await
}
};
tokio::select! { tokio::select! {
job = job_source.next() => { job = next_job_fut => {
if let Some(job) = job { if let Some(job) = job {
self.running.push(job.boxed()); self.running.push(job.boxed());
while let Some(waker) = self.wakers.pop() {
waker.wake();
}
} else { } else {
job_source.take();
self.closed = true; self.closed = true;
if self.running.is_empty() { if self.running.is_empty() {
return None; return None;
@@ -221,6 +236,10 @@ impl<'a, T> Stream for JobRunner<'a, T> {
mut self: std::pin::Pin<&mut Self>, mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> { ) -> std::task::Poll<Option<Self::Item>> {
if self.running.is_empty() {
self.wakers.push(cx.waker().clone());
return std::task::Poll::Pending;
}
match poll_select_all(&mut self.running, cx) { match poll_select_all(&mut self.running, cx) {
std::task::Poll::Pending if self.closed && self.running.is_empty() => { std::task::Poll::Pending if self.closed && self.running.is_empty() => {
std::task::Poll::Ready(None) std::task::Poll::Ready(None)