From 6dc9a11a89aa24a5a5f53d8fb82f49260696e1bd Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Wed, 12 Feb 2025 12:20:18 -0700 Subject: [PATCH] misc improvements to cli (#2827) * misc improvements to cli * switch host shorthand to H * simplify macro --- core/Cargo.lock | 89 ++++++++++- core/build-cli.sh | 16 +- core/startos/Cargo.toml | 8 +- core/startos/src/account.rs | 5 +- core/startos/src/backup/os.rs | 6 +- core/startos/src/context/config.rs | 15 +- core/startos/src/db/model/private.rs | 4 +- core/startos/src/developer/mod.rs | 2 +- core/startos/src/s9pk/merkle_archive/test.rs | 2 +- core/startos/src/system.rs | 120 ++++++++------- core/startos/src/util/mod.rs | 2 +- core/startos/src/util/sync.rs | 153 ++++++++++++++++++- core/startos/src/version/v0_3_6_alpha_0.rs | 4 +- 13 files changed, 339 insertions(+), 87 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index a1e796a15..929148310 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -74,7 +74,7 @@ dependencies = [ "getrandom 0.2.15", "once_cell", "version_check", - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -2323,6 +2323,18 @@ dependencies = [ "wasi 0.11.0+wasi-snapshot-preview1", ] +[[package]] +name = "getrandom" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.13.3+wasi-0.2.2", + "windows-targets 0.52.6", +] + [[package]] name = "gimli" version = "0.28.1" @@ -4456,7 +4468,7 @@ version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" dependencies = [ - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -4690,6 +4702,17 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.0", + "zerocopy 0.8.17", +] + [[package]] name = "rand_chacha" version = "0.2.2" @@ -4710,6 +4733,16 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.0", +] + [[package]] name = "rand_core" version = "0.5.1" @@ -4728,6 +4761,16 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "rand_core" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b08f3c9802962f7e1b25113931d94f43ed9725bebc59db9d0c3e9a23b67e15ff" +dependencies = [ + "getrandom 0.3.1", + "zerocopy 0.8.17", +] + [[package]] name = "rand_hc" version = "0.2.0" @@ -6019,7 +6062,7 @@ dependencies = [ "proptest", "proptest-derive", "qrcode", - "rand 0.8.5", + "rand 0.9.0", "regex", "reqwest", "reqwest_cookie_store", @@ -7173,6 +7216,15 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.13.3+wasi-0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "wasite" version = "0.1.0" @@ -7572,6 +7624,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "wit-bindgen-rt" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c" +dependencies = [ + "bitflags 2.8.0", +] + [[package]] name = "write16" version = "1.0.0" @@ -7782,7 +7843,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ "byteorder", - "zerocopy-derive", + "zerocopy-derive 0.7.35", +] + +[[package]] +name = "zerocopy" +version = "0.8.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa91407dacce3a68c56de03abe2760159582b846c6a4acd2f456618087f12713" +dependencies = [ + "zerocopy-derive 0.8.17", ] [[package]] @@ -7796,6 +7866,17 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "zerocopy-derive" +version = "0.8.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06718a168365cad3d5ff0bb133aad346959a2074bd4a85c121255a11304a8626" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "zerofrom" version = "0.1.5" diff --git a/core/build-cli.sh b/core/build-cli.sh index 8e069a690..de1a4d421 100755 --- a/core/build-cli.sh +++ b/core/build-cli.sh @@ -40,11 +40,15 @@ if [[ "${ENVIRONMENT}" =~ (^|-)unstable($|-) ]]; then RUSTFLAGS="--cfg tokio_unstable" fi -alias 'rust-zig-builder'='docker run $USE_TTY --rm -e "RUSTFLAGS=$RUSTFLAGS" -v "$HOME/.cargo/registry":/root/.cargo/registry -v "$HOME/.cargo/git":/root/.cargo/git -v "$(pwd)":/home/rust/src -w /home/rust/src -P messense/cargo-zigbuild' +if which zig > /dev/null && [ "$ENFORCE_USE_DOCKER" != 1 ]; do + echo "FEATURES=\"$FEATURES\"" + echo "RUSTFLAGS=\"$RUSTFLAGS\"" + RUSTFLAGS=$RUSTFLAGS sh -c "cd core && cargo zigbuild --release --no-default-features --features cli,$FEATURES --locked --bin start-cli --target=$TARGET" +else + alias 'rust-zig-builder'='docker run $USE_TTY --rm -e "RUSTFLAGS=$RUSTFLAGS" -v "$HOME/.cargo/registry":/root/.cargo/registry -v "$HOME/.cargo/git":/root/.cargo/git -v "$(pwd)":/home/rust/src -w /home/rust/src -P messense/cargo-zigbuild' + RUSTFLAGS=$RUSTFLAGS rust-zig-builder sh -c "cd core && cargo zigbuild --release --no-default-features --features cli,$FEATURES --locked --bin start-cli --target=$TARGET" -echo "FEATURES=\"$FEATURES\"" -echo "RUSTFLAGS=\"$RUSTFLAGS\"" -rust-zig-builder sh -c "cd core && cargo zigbuild --release --no-default-features --features cli,daemon,$FEATURES --locked --bin start-cli --target=$TARGET" -if [ "$(ls -nd core/target/$TARGET/release/start-cli | awk '{ print $3 }')" != "$UID" ]; then - rust-zig-builder sh -c "cd core && chown -R $UID:$UID target && chown -R $UID:$UID /root/.cargo" + if [ "$(ls -nd core/target/$TARGET/release/start-cli | awk '{ print $3 }')" != "$UID" ]; then + rust-zig-builder sh -c "cd core && chown -R $UID:$UID target && chown -R $UID:$UID /root/.cargo" + fi fi \ No newline at end of file diff --git a/core/startos/Cargo.toml b/core/startos/Cargo.toml index df1622851..0f42d020f 100644 --- a/core/startos/Cargo.toml +++ b/core/startos/Cargo.toml @@ -40,7 +40,7 @@ path = "src/main.rs" [features] cli = [] container-runtime = ["procfs", "tty-spawn"] -daemon = [] +daemon = ["mail-send"] registry = [] default = ["cli", "daemon", "registry", "container-runtime"] dev = [] @@ -71,7 +71,7 @@ basic-cookies = "0.1.4" blake3 = { version = "1.5.0", features = ["mmap", "rayon"] } bytes = "1" chrono = { version = "0.4.31", features = ["serde"] } -clap = "4.4.12" +clap = { version = "4.4.12", features = ["string"] } color-eyre = "0.6.2" console = "0.15.7" console-subscriber = { version = "0.3.0", optional = true } @@ -167,7 +167,7 @@ procfs = { version = "0.16.0", optional = true } proptest = "1.3.1" proptest-derive = "0.5.0" qrcode = "0.14.1" -rand = { version = "0.8.5", features = ["std"] } +rand = "0.9.0" regex = "1.10.2" reqwest = { version = "0.12.4", features = ["stream", "json", "socks"] } reqwest_cookie_store = "0.8.0" @@ -224,7 +224,7 @@ urlencoding = "2.1.3" uuid = { version = "1.4.1", features = ["v4"] } zbus = "5.1.1" zeroize = "1.6.0" -mail-send = { git = "https://github.com/dr-bonez/mail-send.git", branch = "main" } +mail-send = { git = "https://github.com/dr-bonez/mail-send.git", branch = "main", optional = true } rustls = "0.23.20" rustls-pki-types = { version = "1.10.1", features = ["alloc"] } diff --git a/core/startos/src/account.rs b/core/startos/src/account.rs index 389589667..3de55dfb3 100644 --- a/core/startos/src/account.rs +++ b/core/startos/src/account.rs @@ -38,9 +38,10 @@ impl AccountInfo { let root_ca_key = generate_key()?; let root_ca_cert = make_root_cert(&root_ca_key, &hostname, start_time)?; let ssh_key = ssh_key::PrivateKey::from(ssh_key::private::Ed25519Keypair::random( - &mut rand::thread_rng(), + &mut ssh_key::rand_core::OsRng::default(), )); - let compat_s9pk_key = ed25519_dalek::SigningKey::generate(&mut rand::thread_rng()); + let compat_s9pk_key = + ed25519_dalek::SigningKey::generate(&mut ssh_key::rand_core::OsRng::default()); Ok(Self { server_id, hostname, diff --git a/core/startos/src/backup/os.rs b/core/startos/src/backup/os.rs index 324543fb8..4c5798103 100644 --- a/core/startos/src/backup/os.rs +++ b/core/startos/src/backup/os.rs @@ -82,11 +82,13 @@ impl OsBackupV0 { root_ca_key: self.root_ca_key.0, root_ca_cert: self.root_ca_cert.0, ssh_key: ssh_key::PrivateKey::random( - &mut rand::thread_rng(), + &mut ssh_key::rand_core::OsRng::default(), ssh_key::Algorithm::Ed25519, )?, tor_keys: vec![TorSecretKeyV3::from(self.tor_key.0)], - compat_s9pk_key: ed25519_dalek::SigningKey::generate(&mut rand::thread_rng()), + compat_s9pk_key: ed25519_dalek::SigningKey::generate( + &mut ssh_key::rand_core::OsRng::default(), + ), }, ui: self.ui, }) diff --git a/core/startos/src/context/config.rs b/core/startos/src/context/config.rs index 4a0925d81..1339cb6d4 100644 --- a/core/startos/src/context/config.rs +++ b/core/startos/src/context/config.rs @@ -13,6 +13,7 @@ use crate::disk::OsPartitionInfo; use crate::init::init_postgres; use crate::prelude::*; use crate::util::serde::IoFormat; +use crate::version::VersionT; use crate::MAIN_DATA; pub const DEVICE_CONFIG_PATH: &str = "/media/startos/config/config.yaml"; // "/media/startos/config/config.yaml"; @@ -57,18 +58,20 @@ pub trait ContextConfig: DeserializeOwned + Default { #[derive(Debug, Default, Deserialize, Serialize, Parser)] #[serde(rename_all = "kebab-case")] #[command(rename_all = "kebab-case")] +#[command(name = "start-cli")] +#[command(version = crate::version::Current::default().semver().to_string())] pub struct ClientConfig { - #[arg(short = 'c', long = "config")] + #[arg(short = 'c', long)] pub config: Option, - #[arg(short = 'h', long = "host")] + #[arg(short = 'H', long)] pub host: Option, - #[arg(short = 'r', long = "registry")] + #[arg(short = 'r', long)] pub registry: Option, - #[arg(short = 'p', long = "proxy")] + #[arg(short = 'p', long)] pub proxy: Option, - #[arg(long = "cookie-path")] + #[arg(long)] pub cookie_path: Option, - #[arg(long = "developer-key-path")] + #[arg(long)] pub developer_key_path: Option, } impl ContextConfig for ClientConfig { diff --git a/core/startos/src/db/model/private.rs b/core/startos/src/db/model/private.rs index 2675b36d0..2108b32a4 100644 --- a/core/startos/src/db/model/private.rs +++ b/core/startos/src/db/model/private.rs @@ -32,5 +32,7 @@ pub struct Private { } pub fn generate_compat_key() -> Pem { - Pem(ed25519_dalek::SigningKey::generate(&mut rand::thread_rng())) + Pem(ed25519_dalek::SigningKey::generate( + &mut ssh_key::rand_core::OsRng::default(), + )) } diff --git a/core/startos/src/developer/mod.rs b/core/startos/src/developer/mod.rs index 4a2a4c3df..de71b0fc0 100644 --- a/core/startos/src/developer/mod.rs +++ b/core/startos/src/developer/mod.rs @@ -20,7 +20,7 @@ pub fn init(ctx: CliContext) -> Result<(), Error> { .with_ctx(|_| (crate::ErrorKind::Filesystem, parent.display().to_string()))?; } tracing::info!("Generating new developer key..."); - let secret = SigningKey::generate(&mut rand::thread_rng()); + let secret = SigningKey::generate(&mut ssh_key::rand_core::OsRng::default()); tracing::info!("Writing key to {}", ctx.developer_key_path.display()); let keypair_bytes = ed25519::KeypairBytes { secret_key: secret.to_bytes(), diff --git a/core/startos/src/s9pk/merkle_archive/test.rs b/core/startos/src/s9pk/merkle_archive/test.rs index 861f3b04c..820e5be92 100644 --- a/core/startos/src/s9pk/merkle_archive/test.rs +++ b/core/startos/src/s9pk/merkle_archive/test.rs @@ -51,7 +51,7 @@ fn test(files: Vec<(PathBuf, String)>) -> Result<(), Error> { check_set.insert(path.to_owned(), content); } } - let key = SigningKey::generate(&mut rand::thread_rng()); + let key = SigningKey::generate(&mut ssh_key::rand_core::OsRng::default()); let mut a1 = MerkleArchive::new(root, key, "test"); tokio::runtime::Builder::new_current_thread() .enable_io() diff --git a/core/startos/src/system.rs b/core/startos/src/system.rs index 123fdec42..ff0dc15fb 100644 --- a/core/startos/src/system.rs +++ b/core/startos/src/system.rs @@ -7,8 +7,6 @@ use clap::Parser; use color_eyre::eyre::eyre; use futures::FutureExt; use imbl::vector; -use mail_send::mail_builder::{self, MessageBuilder}; -use mail_send::SmtpClientBuilder; use rpc_toolkit::{from_fn_async, Context, Empty, HandlerExt, ParentHandler}; use rustls::crypto::CryptoProvider; use rustls::RootCertStore; @@ -906,64 +904,74 @@ pub async fn test_smtp( password, }: TestSmtpParams, ) -> Result<(), Error> { - use rustls_pki_types::pem::PemObject; + #[cfg(feature = "mail-send")] + { + use mail_send::mail_builder::{self, MessageBuilder}; + use mail_send::SmtpClientBuilder; + use rustls_pki_types::pem::PemObject; - let Some(pass_val) = password else { - return Err(Error::new( - eyre!("mail-send requires a password"), - ErrorKind::InvalidRequest, - )); - }; + let Some(pass_val) = password else { + return Err(Error::new( + eyre!("mail-send requires a password"), + ErrorKind::InvalidRequest, + )); + }; - let mut root_cert_store = RootCertStore::empty(); - let pem = tokio::fs::read("/etc/ssl/certs/ca-certificates.crt").await?; - for cert in CertificateDer::pem_slice_iter(&pem) { - root_cert_store.add_parsable_certificates([cert.with_kind(ErrorKind::OpenSsl)?]); - } - - let cfg = Arc::new( - rustls::ClientConfig::builder_with_provider(Arc::new( - rustls::crypto::ring::default_provider(), - )) - .with_safe_default_protocol_versions()? - .with_root_certificates(root_cert_store) - .with_no_client_auth(), - ); - let client = SmtpClientBuilder::new_with_tls_config(server, port, cfg) - .implicit_tls(false) - .credentials((login.split("@").next().unwrap().to_owned(), pass_val)); - - fn parse_address<'a>(addr: &'a str) -> mail_builder::headers::address::Address<'a> { - if addr.find("<").map_or(false, |start| { - addr.find(">").map_or(false, |end| start < end) - }) { - addr.split_once("<") - .map(|(name, addr)| (name.trim(), addr.strip_suffix(">").unwrap_or(addr))) - .unwrap() - .into() - } else { - addr.into() + let mut root_cert_store = RootCertStore::empty(); + let pem = tokio::fs::read("/etc/ssl/certs/ca-certificates.crt").await?; + for cert in CertificateDer::pem_slice_iter(&pem) { + root_cert_store.add_parsable_certificates([cert.with_kind(ErrorKind::OpenSsl)?]); } - } - let message = MessageBuilder::new() - .from(parse_address(&from)) - .to(parse_address(&to)) - .subject("StartOS Test Email") - .text_body("This is a test email sent from your StartOS Server"); - client - .connect() - .await - .map_err(|e| { - Error::new( - eyre!("mail-send connection error: {:?}", e), - ErrorKind::Unknown, - ) - })? - .send(message) - .await - .map_err(|e| Error::new(eyre!("mail-send send error: {:?}", e), ErrorKind::Unknown))?; - Ok(()) + let cfg = Arc::new( + rustls::ClientConfig::builder_with_provider(Arc::new( + rustls::crypto::ring::default_provider(), + )) + .with_safe_default_protocol_versions()? + .with_root_certificates(root_cert_store) + .with_no_client_auth(), + ); + let client = SmtpClientBuilder::new_with_tls_config(server, port, cfg) + .implicit_tls(false) + .credentials((login.split("@").next().unwrap().to_owned(), pass_val)); + + fn parse_address<'a>(addr: &'a str) -> mail_builder::headers::address::Address<'a> { + if addr.find("<").map_or(false, |start| { + addr.find(">").map_or(false, |end| start < end) + }) { + addr.split_once("<") + .map(|(name, addr)| (name.trim(), addr.strip_suffix(">").unwrap_or(addr))) + .unwrap() + .into() + } else { + addr.into() + } + } + + let message = MessageBuilder::new() + .from(parse_address(&from)) + .to(parse_address(&to)) + .subject("StartOS Test Email") + .text_body("This is a test email sent from your StartOS Server"); + client + .connect() + .await + .map_err(|e| { + Error::new( + eyre!("mail-send connection error: {:?}", e), + ErrorKind::Unknown, + ) + })? + .send(message) + .await + .map_err(|e| Error::new(eyre!("mail-send send error: {:?}", e), ErrorKind::Unknown))?; + Ok(()) + } + #[cfg(not(feature = "mail-send"))] + Err(Error::new( + eyre!("test-smtp requires mail-send feature to be enabled"), + ErrorKind::InvalidRequest, + )) } #[tokio::test] diff --git a/core/startos/src/util/mod.rs b/core/startos/src/util/mod.rs index d6f426135..a31332efa 100644 --- a/core/startos/src/util/mod.rs +++ b/core/startos/src/util/mod.rs @@ -648,7 +648,7 @@ impl<'a, T> From<&'a T> for MaybeOwned<'a, T> { pub fn new_guid() -> InternedString { use rand::RngCore; let mut buf = [0; 20]; - rand::thread_rng().fill_bytes(&mut buf); + rand::rng().fill_bytes(&mut buf); InternedString::intern(base32::encode( base32::Alphabet::Rfc4648 { padding: false }, &buf, diff --git a/core/startos/src/util/sync.rs b/core/startos/src/util/sync.rs index d6099cb5b..9bf396629 100644 --- a/core/startos/src/util/sync.rs +++ b/core/startos/src/util/sync.rs @@ -1,7 +1,12 @@ +use std::collections::VecDeque; use std::pin::Pin; -use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::{Arc, Weak}; use std::task::{Poll, Waker}; +use futures::stream::BoxStream; +use futures::Stream; + #[derive(Debug, Default)] pub struct SyncMutex(std::sync::Mutex); impl SyncMutex { @@ -160,3 +165,149 @@ impl futures::Stream for Watch { (1, None) } } + +struct DupState> +where + T: Clone, + Upstream: Stream + Unpin, +{ + buffer: VecDeque, + upstream: Upstream, + pos: usize, + pos_refs: Vec>, + wakers: Vec, +} +impl + Unpin> DupState { + fn poll_next(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + use futures::stream::StreamExt; + let Some(next) = futures::ready!(self.upstream.poll_next_unpin(cx)) else { + return Poll::Ready(None); + }; + self.pos += 1; + self.buffer.push_back(next.clone()); + for waker in self.wakers.drain(..) { + if !waker.will_wake(cx.waker()) { + waker.wake(); + } + } + + Poll::Ready(Some(next)) + } +} + +pub struct DupStream> +where + T: Clone, + Upstream: Stream + Unpin, +{ + state: Arc>>, + pos: Arc, +} +impl + Unpin> DupStream { + pub fn new(upstream: Upstream) -> Self { + let pos = Arc::new(AtomicUsize::new(0)); + Self { + state: Arc::new(SyncMutex::new(DupState { + buffer: VecDeque::new(), + upstream, + pos: 0, + pos_refs: vec![Arc::downgrade(&pos)], + wakers: Vec::new(), + })), + pos, + } + } +} + +impl + Unpin> Clone for DupStream { + fn clone(&self) -> Self { + let pos = self.state.mutate(|state| { + let pos = Arc::new(AtomicUsize::new( + self.pos.load(std::sync::atomic::Ordering::Relaxed), + )); + state.pos_refs.push(Arc::downgrade(&pos)); + state.pos_refs.retain(|ptr| ptr.strong_count() > 0); + pos + }); + Self { + state: self.state.clone(), + pos, + } + } +} + +impl + Unpin> Stream for DupStream { + type Item = T; + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.state.mutate(|state| { + let pos = self.pos.load(std::sync::atomic::Ordering::Relaxed); + if pos < state.pos { + self.pos.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if state + .pos_refs + .iter() + .filter_map(|ptr| ptr.upgrade()) + .all(|ptr| ptr.load(std::sync::atomic::Ordering::Relaxed) > pos) + { + while let Some(next) = state.buffer.pop_front() { + if state.buffer.len() + 1 == state.pos - pos { + return Poll::Ready(Some(next)); + } + } + Poll::Ready(None) + } else { + Poll::Ready( + state + .buffer + .get(state.buffer.len() + pos - state.pos) + .cloned(), + ) + } + } else { + let res = state.poll_next(cx); + if res.is_ready() { + self.pos + .store(state.pos, std::sync::atomic::Ordering::Relaxed); + } else { + let waker = cx.waker(); + if state.wakers.iter().all(|w| !w.will_wake(waker)) { + state.wakers.push(waker.clone()); + } + } + res + } + }) + } +} + +#[tokio::test] +async fn test_dup_stream() { + use std::time::Duration; + + use futures::StreamExt; + + let stream = async_stream::stream! { + for i in 0..100 { + tokio::time::sleep(Duration::from_nanos(rand::random_range(0..=10000000))).await; + yield i; + } + } + .boxed(); + let n = rand::random_range(3..10); + let mut tasks = Vec::with_capacity(n); + for mut dup_stream in std::iter::repeat_n(DupStream::new(stream), n) { + tasks.push(tokio::spawn(async move { + let mut ctr = 0; + while let Some(i) = dup_stream.next().await { + assert_eq!(ctr, i); + ctr += 1; + tokio::time::sleep(Duration::from_nanos(rand::random_range(0..=10000000))).await; + } + assert_eq!(ctr, 100); + })); + } + futures::future::try_join_all(tasks).await.unwrap(); +} diff --git a/core/startos/src/version/v0_3_6_alpha_0.rs b/core/startos/src/version/v0_3_6_alpha_0.rs index 6ed3fc316..8115aad1e 100644 --- a/core/startos/src/version/v0_3_6_alpha_0.rs +++ b/core/startos/src/version/v0_3_6_alpha_0.rs @@ -464,9 +464,9 @@ async fn previous_account_info(pg: &sqlx::Pool) -> Result