mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-03-30 04:01:58 +00:00
misc improvements to cli (#2827)
* misc improvements to cli * switch host shorthand to H * simplify macro
This commit is contained in:
@@ -1,7 +1,12 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::task::{Poll, Waker};
|
||||
|
||||
use futures::stream::BoxStream;
|
||||
use futures::Stream;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct SyncMutex<T>(std::sync::Mutex<T>);
|
||||
impl<T> SyncMutex<T> {
|
||||
@@ -160,3 +165,149 @@ impl<T: Clone> futures::Stream for Watch<T> {
|
||||
(1, None)
|
||||
}
|
||||
}
|
||||
|
||||
struct DupState<T, Upstream = BoxStream<'static, T>>
|
||||
where
|
||||
T: Clone,
|
||||
Upstream: Stream<Item = T> + Unpin,
|
||||
{
|
||||
buffer: VecDeque<T>,
|
||||
upstream: Upstream,
|
||||
pos: usize,
|
||||
pos_refs: Vec<Weak<AtomicUsize>>,
|
||||
wakers: Vec<Waker>,
|
||||
}
|
||||
impl<T: Clone, Upstream: Stream<Item = T> + Unpin> DupState<T, Upstream> {
|
||||
fn poll_next(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Option<T>> {
|
||||
use futures::stream::StreamExt;
|
||||
let Some(next) = futures::ready!(self.upstream.poll_next_unpin(cx)) else {
|
||||
return Poll::Ready(None);
|
||||
};
|
||||
self.pos += 1;
|
||||
self.buffer.push_back(next.clone());
|
||||
for waker in self.wakers.drain(..) {
|
||||
if !waker.will_wake(cx.waker()) {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Ready(Some(next))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DupStream<T, Upstream = BoxStream<'static, T>>
|
||||
where
|
||||
T: Clone,
|
||||
Upstream: Stream<Item = T> + Unpin,
|
||||
{
|
||||
state: Arc<SyncMutex<DupState<T, Upstream>>>,
|
||||
pos: Arc<AtomicUsize>,
|
||||
}
|
||||
impl<T: Clone, Upstream: Stream<Item = T> + Unpin> DupStream<T, Upstream> {
|
||||
pub fn new(upstream: Upstream) -> Self {
|
||||
let pos = Arc::new(AtomicUsize::new(0));
|
||||
Self {
|
||||
state: Arc::new(SyncMutex::new(DupState {
|
||||
buffer: VecDeque::new(),
|
||||
upstream,
|
||||
pos: 0,
|
||||
pos_refs: vec![Arc::downgrade(&pos)],
|
||||
wakers: Vec::new(),
|
||||
})),
|
||||
pos,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone, Upstream: Stream<Item = T> + Unpin> Clone for DupStream<T, Upstream> {
|
||||
fn clone(&self) -> Self {
|
||||
let pos = self.state.mutate(|state| {
|
||||
let pos = Arc::new(AtomicUsize::new(
|
||||
self.pos.load(std::sync::atomic::Ordering::Relaxed),
|
||||
));
|
||||
state.pos_refs.push(Arc::downgrade(&pos));
|
||||
state.pos_refs.retain(|ptr| ptr.strong_count() > 0);
|
||||
pos
|
||||
});
|
||||
Self {
|
||||
state: self.state.clone(),
|
||||
pos,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone, Upstream: Stream<Item = T> + Unpin> Stream for DupStream<T, Upstream> {
|
||||
type Item = T;
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
self.state.mutate(|state| {
|
||||
let pos = self.pos.load(std::sync::atomic::Ordering::Relaxed);
|
||||
if pos < state.pos {
|
||||
self.pos.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
if state
|
||||
.pos_refs
|
||||
.iter()
|
||||
.filter_map(|ptr| ptr.upgrade())
|
||||
.all(|ptr| ptr.load(std::sync::atomic::Ordering::Relaxed) > pos)
|
||||
{
|
||||
while let Some(next) = state.buffer.pop_front() {
|
||||
if state.buffer.len() + 1 == state.pos - pos {
|
||||
return Poll::Ready(Some(next));
|
||||
}
|
||||
}
|
||||
Poll::Ready(None)
|
||||
} else {
|
||||
Poll::Ready(
|
||||
state
|
||||
.buffer
|
||||
.get(state.buffer.len() + pos - state.pos)
|
||||
.cloned(),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
let res = state.poll_next(cx);
|
||||
if res.is_ready() {
|
||||
self.pos
|
||||
.store(state.pos, std::sync::atomic::Ordering::Relaxed);
|
||||
} else {
|
||||
let waker = cx.waker();
|
||||
if state.wakers.iter().all(|w| !w.will_wake(waker)) {
|
||||
state.wakers.push(waker.clone());
|
||||
}
|
||||
}
|
||||
res
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_dup_stream() {
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::StreamExt;
|
||||
|
||||
let stream = async_stream::stream! {
|
||||
for i in 0..100 {
|
||||
tokio::time::sleep(Duration::from_nanos(rand::random_range(0..=10000000))).await;
|
||||
yield i;
|
||||
}
|
||||
}
|
||||
.boxed();
|
||||
let n = rand::random_range(3..10);
|
||||
let mut tasks = Vec::with_capacity(n);
|
||||
for mut dup_stream in std::iter::repeat_n(DupStream::new(stream), n) {
|
||||
tasks.push(tokio::spawn(async move {
|
||||
let mut ctr = 0;
|
||||
while let Some(i) = dup_stream.next().await {
|
||||
assert_eq!(ctr, i);
|
||||
ctr += 1;
|
||||
tokio::time::sleep(Duration::from_nanos(rand::random_range(0..=10000000))).await;
|
||||
}
|
||||
assert_eq!(ctr, 100);
|
||||
}));
|
||||
}
|
||||
futures::future::try_join_all(tasks).await.unwrap();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user