From 60a974a29c5e6380f7bbfbc1b4716f6d2b20b189 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Thu, 15 Aug 2024 11:57:56 -0600 Subject: [PATCH] fix future busywait when job_source closed --- src/util.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/util.rs b/src/util.rs index c7b71b2..9217b32 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,4 +1,5 @@ use std::fmt::{Debug, Display}; +use std::task::Waker; use futures::future::{BoxFuture, FusedFuture}; use futures::stream::FusedStream; @@ -179,12 +180,14 @@ pub fn poll_select_all<'a, T>( } pub struct JobRunner<'a, T> { + wakers: Vec, closed: bool, running: Vec>, } impl<'a, T> JobRunner<'a, T> { pub fn new() -> Self { JobRunner { + wakers: Vec::new(), closed: false, running: Vec::new(), } @@ -196,12 +199,24 @@ impl<'a, T> JobRunner<'a, T> { &mut self, job_source: &mut Src, ) -> Option { + let mut job_source = Some(job_source); loop { + let next_job_fut = async { + if let Some(job_source) = &mut job_source { + job_source.next().await + } else { + futures::future::pending().await + } + }; tokio::select! { - job = job_source.next() => { + job = next_job_fut => { if let Some(job) = job { self.running.push(job.boxed()); + while let Some(waker) = self.wakers.pop() { + waker.wake(); + } } else { + job_source.take(); self.closed = true; if self.running.is_empty() { return None; @@ -221,6 +236,10 @@ impl<'a, T> Stream for JobRunner<'a, T> { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { + if self.running.is_empty() { + self.wakers.push(cx.waker().clone()); + return std::task::Poll::Pending; + } match poll_select_all(&mut self.running, cx) { std::task::Poll::Pending if self.closed && self.running.is_empty() => { std::task::Poll::Ready(None)