mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-04-02 05:23:14 +00:00
Merge branch 'next/minor' of github.com:Start9Labs/start-os into next/major
This commit is contained in:
@@ -1,11 +1,13 @@
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use futures::future::abortable;
|
||||
use futures::stream::{AbortHandle, Abortable};
|
||||
use futures::Future;
|
||||
use futures::future::{abortable, pending, BoxFuture, FusedFuture};
|
||||
use futures::stream::{AbortHandle, Abortable, BoxStream};
|
||||
use futures::{Future, FutureExt, Stream, StreamExt};
|
||||
use tokio::sync::watch;
|
||||
|
||||
use crate::prelude::*;
|
||||
|
||||
#[pin_project::pin_project(PinnedDrop)]
|
||||
pub struct DropSignaling<F> {
|
||||
#[pin]
|
||||
@@ -102,6 +104,60 @@ impl CancellationHandle {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Until<'a> {
|
||||
streams: Vec<BoxStream<'a, Result<(), Error>>>,
|
||||
async_fns: Vec<Box<dyn FnMut() -> BoxFuture<'a, Result<(), Error>> + Send + 'a>>,
|
||||
}
|
||||
impl<'a> Until<'a> {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn with_stream(
|
||||
mut self,
|
||||
stream: impl Stream<Item = Result<(), Error>> + Send + 'a,
|
||||
) -> Self {
|
||||
self.streams.push(stream.boxed());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_async_fn<F, Fut>(mut self, mut f: F) -> Self
|
||||
where
|
||||
F: FnMut() -> Fut + Send + 'a,
|
||||
Fut: Future<Output = Result<(), Error>> + FusedFuture + Send + 'a,
|
||||
{
|
||||
self.async_fns.push(Box::new(move || f().boxed()));
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn run<Fut: Future<Output = Result<(), Error>> + Send>(
|
||||
&mut self,
|
||||
fut: Fut,
|
||||
) -> Result<(), Error> {
|
||||
let (res, _, _) = futures::future::select_all(
|
||||
self.streams
|
||||
.iter_mut()
|
||||
.map(|s| {
|
||||
async {
|
||||
s.next().await.transpose()?.ok_or_else(|| {
|
||||
Error::new(eyre!("stream is empty"), ErrorKind::Cancelled)
|
||||
})
|
||||
}
|
||||
.boxed()
|
||||
})
|
||||
.chain(self.async_fns.iter_mut().map(|f| f()))
|
||||
.chain([async {
|
||||
fut.await?;
|
||||
pending().await
|
||||
}
|
||||
.boxed()]),
|
||||
)
|
||||
.await;
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cancellable() {
|
||||
use std::sync::Arc;
|
||||
|
||||
Reference in New Issue
Block a user