diff --git a/core/Cargo.lock b/core/Cargo.lock index 7eaac03bd..455272344 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -4968,9 +4968,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.34.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", "bytes", diff --git a/core/startos/Cargo.toml b/core/startos/Cargo.toml index d9d4a4e36..cded2ab41 100644 --- a/core/startos/Cargo.toml +++ b/core/startos/Cargo.toml @@ -160,7 +160,7 @@ ssh-key = { version = "0.6.2", features = ["ed25519"] } stderrlog = "0.5.4" tar = "0.4.40" thiserror = "1.0.49" -tokio = { version = "1", features = ["full"] } +tokio = { version = "1.37", features = ["full"] } tokio-rustls = "0.25.0" tokio-socks = "0.5.1" tokio-stream = { version = "0.1.14", features = ["io-util", "sync", "net"] } diff --git a/core/startos/src/s9pk/v2/compat.rs b/core/startos/src/s9pk/v2/compat.rs index 5a98538dc..b4c2e1c14 100644 --- a/core/startos/src/s9pk/v2/compat.rs +++ b/core/startos/src/s9pk/v2/compat.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeSet; use std::io::Cursor; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -154,7 +155,7 @@ impl S9pk> { i.strip_prefix(&format!("start9/{}/", manifest.id)) .map(|s| s.to_owned()) }) { - new_manifest.images.push(image.parse()?); + new_manifest.images.insert(image.parse()?); let sqfs_path = images_dir.join(&image).with_extension("squashfs"); let image_name = format!("start9/{}/{}:{}", manifest.id, image, manifest.version); let id = String::from_utf8( @@ -334,7 +335,7 @@ impl From for Manifest { marketing_site: value.marketing_site.unwrap_or_else(|| default_url.clone()), donation_url: value.donation_url, description: value.description, - images: Vec::new(), + images: BTreeSet::new(), assets: value .volumes .iter() diff --git a/core/startos/src/s9pk/v2/manifest.rs b/core/startos/src/s9pk/v2/manifest.rs index ea4524400..ee2358b13 100644 --- a/core/startos/src/s9pk/v2/manifest.rs +++ b/core/startos/src/s9pk/v2/manifest.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use color_eyre::eyre::eyre; use helpers::const_true; @@ -43,9 +43,9 @@ pub struct Manifest { #[ts(type = "string | null")] pub donation_url: Option, pub description: Description, - pub images: Vec, - pub assets: Vec, // TODO: AssetsId - pub volumes: Vec, + pub images: BTreeSet, + pub assets: BTreeSet, // TODO: AssetsId + pub volumes: BTreeSet, #[serde(default)] pub alerts: Alerts, #[serde(default)] diff --git a/core/startos/src/service/action.rs b/core/startos/src/service/action.rs index a33869222..5f97685ae 100644 --- a/core/startos/src/service/action.rs +++ b/core/startos/src/service/action.rs @@ -4,19 +4,27 @@ use models::{ActionId, ProcedureName}; use crate::action::ActionResult; use crate::prelude::*; +use crate::service::config::GetConfig; +use crate::service::dependencies::DependencyConfig; use crate::service::{Service, ServiceActor}; -use crate::util::actor::{BackgroundJobs, Handler}; +use crate::util::actor::background::BackgroundJobQueue; +use crate::util::actor::{ConflictBuilder, Handler}; -struct Action { +pub(super) struct Action { id: ActionId, input: Value, } impl Handler for ServiceActor { type Response = Result; + fn conflicts_with(_: &Action) -> ConflictBuilder { + ConflictBuilder::everything() + .except::() + .except::() + } async fn handle( &mut self, Action { id, input }: Action, - _: &mut BackgroundJobs, + _: &BackgroundJobQueue, ) -> Self::Response { let container = &self.0.persistent_container; container diff --git a/core/startos/src/service/config.rs b/core/startos/src/service/config.rs index 754e3e0ba..0f166eedb 100644 --- a/core/startos/src/service/config.rs +++ b/core/startos/src/service/config.rs @@ -3,19 +3,24 @@ use std::time::Duration; use models::ProcedureName; use crate::config::action::ConfigRes; -use crate::config::{action::SetResult, ConfigureContext}; +use crate::config::ConfigureContext; use crate::prelude::*; +use crate::service::dependencies::DependencyConfig; use crate::service::{Service, ServiceActor}; -use crate::util::actor::{BackgroundJobs, Handler}; +use crate::util::actor::background::BackgroundJobQueue; +use crate::util::actor::{ConflictBuilder, Handler}; use crate::util::serde::NoOutput; -struct Configure(ConfigureContext); +pub(super) struct Configure(ConfigureContext); impl Handler for ServiceActor { type Response = Result<(), Error>; + fn conflicts_with(_: &Configure) -> ConflictBuilder { + ConflictBuilder::everything().except::() + } async fn handle( &mut self, Configure(ConfigureContext { timeout, config }): Configure, - _: &mut BackgroundJobs, + _: &BackgroundJobQueue, ) -> Self::Response { let container = &self.0.persistent_container; let package_id = &self.0.id; @@ -41,10 +46,13 @@ impl Handler for ServiceActor { } } -struct GetConfig; +pub(super) struct GetConfig; impl Handler for ServiceActor { type Response = Result; - async fn handle(&mut self, _: GetConfig, _: &mut BackgroundJobs) -> Self::Response { + fn conflicts_with(_: &GetConfig) -> ConflictBuilder { + ConflictBuilder::nothing().except::() + } + async fn handle(&mut self, _: GetConfig, _: &BackgroundJobQueue) -> Self::Response { let container = &self.0.persistent_container; container .execute::( diff --git a/core/startos/src/service/control.rs b/core/startos/src/service/control.rs index 5ef0bbff2..0443b80b1 100644 --- a/core/startos/src/service/control.rs +++ b/core/startos/src/service/control.rs @@ -1,13 +1,21 @@ use crate::prelude::*; +use crate::service::config::GetConfig; +use crate::service::dependencies::DependencyConfig; use crate::service::start_stop::StartStop; use crate::service::transition::TransitionKind; use crate::service::{Service, ServiceActor}; -use crate::util::actor::{BackgroundJobs, Handler}; +use crate::util::actor::background::BackgroundJobQueue; +use crate::util::actor::{ConflictBuilder, Handler}; -struct Start; +pub(super) struct Start; impl Handler for ServiceActor { type Response = (); - async fn handle(&mut self, _: Start, _: &mut BackgroundJobs) -> Self::Response { + fn conflicts_with(_: &Start) -> ConflictBuilder { + ConflictBuilder::everything() + .except::() + .except::() + } + async fn handle(&mut self, _: Start, _: &BackgroundJobQueue) -> Self::Response { self.0.persistent_container.state.send_modify(|x| { x.desired_state = StartStop::Start; }); @@ -23,7 +31,12 @@ impl Service { struct Stop; impl Handler for ServiceActor { type Response = (); - async fn handle(&mut self, _: Stop, _: &mut BackgroundJobs) -> Self::Response { + fn conflicts_with(_: &Stop) -> ConflictBuilder { + ConflictBuilder::everything() + .except::() + .except::() + } + async fn handle(&mut self, _: Stop, _: &BackgroundJobQueue) -> Self::Response { let mut transition_state = None; self.0.persistent_container.state.send_modify(|x| { x.desired_state = StartStop::Stop; diff --git a/core/startos/src/service/dependencies.rs b/core/startos/src/service/dependencies.rs index 67cc5d53f..e71b62917 100644 --- a/core/startos/src/service/dependencies.rs +++ b/core/startos/src/service/dependencies.rs @@ -5,22 +5,26 @@ use models::{PackageId, ProcedureName}; use crate::prelude::*; use crate::service::{Service, ServiceActor}; -use crate::util::actor::{BackgroundJobs, Handler}; +use crate::util::actor::background::BackgroundJobQueue; +use crate::util::actor::{ConflictBuilder, Handler}; use crate::Config; -struct DependencyConfig { +pub(super) struct DependencyConfig { dependency_id: PackageId, remote_config: Option, } impl Handler for ServiceActor { type Response = Result, Error>; + fn conflicts_with(_: &DependencyConfig) -> ConflictBuilder { + ConflictBuilder::nothing() + } async fn handle( &mut self, DependencyConfig { dependency_id, remote_config, }: DependencyConfig, - _: &mut BackgroundJobs, + _: &BackgroundJobQueue, ) -> Self::Response { let container = &self.0.persistent_container; container diff --git a/core/startos/src/service/mod.rs b/core/startos/src/service/mod.rs index c6e64be63..38923d726 100644 --- a/core/startos/src/service/mod.rs +++ b/core/startos/src/service/mod.rs @@ -13,7 +13,6 @@ use start_stop::StartStop; use tokio::sync::Notify; use ts_rs::TS; -use crate::config::action::ConfigRes; use crate::context::{CliContext, RpcContext}; use crate::core::rpc_continuations::RequestGuid; use crate::db::model::package::{ @@ -28,7 +27,9 @@ use crate::service::service_map::InstallProgressHandles; use crate::service::transition::TransitionKind; use crate::status::health_check::HealthCheckResult; use crate::status::MainStatus; -use crate::util::actor::{Actor, BackgroundJobs, SimpleActor}; +use crate::util::actor::background::BackgroundJobQueue; +use crate::util::actor::concurrent::ConcurrentActor; +use crate::util::actor::Actor; use crate::util::serde::Pem; use crate::volume::data_dir; @@ -66,7 +67,7 @@ pub enum LoadDisposition { } pub struct Service { - actor: SimpleActor, + actor: ConcurrentActor, seed: Arc, } impl Service { @@ -90,7 +91,7 @@ impl Service { .init(Arc::downgrade(&seed)) .await?; Ok(Self { - actor: SimpleActor::new(ServiceActor(seed.clone())), + actor: ConcurrentActor::new(ServiceActor(seed.clone())), seed, }) } @@ -391,10 +392,11 @@ impl ServiceActorSeed { }); } } +#[derive(Clone)] struct ServiceActor(Arc); impl Actor for ServiceActor { - fn init(&mut self, jobs: &mut BackgroundJobs) { + fn init(&mut self, jobs: &BackgroundJobQueue) { let seed = self.0.clone(); jobs.add_job(async move { let id = seed.id.clone(); diff --git a/core/startos/src/service/service_effect_handler.rs b/core/startos/src/service/service_effect_handler.rs index b0b11b34b..0effccd5e 100644 --- a/core/startos/src/service/service_effect_handler.rs +++ b/core/startos/src/service/service_effect_handler.rs @@ -11,7 +11,7 @@ use clap::Parser; use emver::VersionRange; use imbl::OrdMap; use imbl_value::{json, InternedString}; -use models::{ActionId, HealthCheckId, HostId, ImageId, PackageId, VolumeId}; +use models::{ActionId, DataUrl, HealthCheckId, HostId, ImageId, PackageId, VolumeId}; use patch_db::json_ptr::JsonPointer; use rpc_toolkit::{from_fn, from_fn_async, AnyContext, Context, Empty, HandlerExt, ParentHandler}; use serde::{Deserialize, Serialize}; @@ -28,7 +28,9 @@ use crate::disk::mount::filesystem::overlayfs::OverlayGuard; use crate::net::host::binding::BindOptions; use crate::net::host::HostKind; use crate::prelude::*; +use crate::s9pk::merkle_archive::source::http::{HttpReader, HttpSource}; use crate::s9pk::rpc::SKIP_ENV; +use crate::s9pk::S9pk; use crate::service::cli::ContainerCliContext; use crate::service::ServiceActorSeed; use crate::status::health_check::HealthCheckResult; @@ -1145,11 +1147,36 @@ async fn set_dependencies( version_spec, ), }; - let icon = todo!(); - let title = todo!(); + let (icon, title) = match async { + let remote_s9pk = S9pk::deserialize( + &HttpSource::new( + ctx.ctx.client.clone(), + registry_url + .join(&format!("package/v2/{}.s9pk?spec={}", dep_id, version_spec))?, + ) + .await?, + ) + .await?; + + let icon = remote_s9pk.icon_data_url().await?; + + Ok::<_, Error>((icon, remote_s9pk.as_manifest().title.clone())) + } + .await + { + Ok(a) => a, + Err(e) => { + tracing::error!("Error fetching remote s9pk: {e}"); + tracing::debug!("{e:?}"); + ( + DataUrl::from_slice("image/png", include_bytes!("../install/package-icon.png")), + dep_id.to_string(), + ) + } + }; let config_satisfied = if let Some(dep_service) = &*ctx.ctx.services.get(&dep_id).await { service - .dependency_config(dep_id, dep_service.get_config().await?.config) + .dependency_config(dep_id.clone(), dep_service.get_config().await?.config) .await? .is_none() } else { @@ -1158,7 +1185,7 @@ async fn set_dependencies( deps.insert( dep_id, CurrentDependencyInfo { - kind: CurrentDependencyKind::Exists, + kind, registry_url, version_spec, icon, diff --git a/core/startos/src/service/transition/mod.rs b/core/startos/src/service/transition/mod.rs index af62ccc1c..25d225492 100644 --- a/core/startos/src/service/transition/mod.rs +++ b/core/startos/src/service/transition/mod.rs @@ -5,7 +5,7 @@ use tokio::sync::watch; use super::persistent_container::ServiceState; use crate::service::start_stop::StartStop; -use crate::util::actor::BackgroundJobs; +use crate::util::actor::background::BackgroundJobQueue; use crate::util::future::{CancellationHandle, RemoteCancellable}; pub mod backup; @@ -41,7 +41,7 @@ impl TransitionState { fn new( task: impl Future + Send + 'static, kind: TransitionKind, - jobs: &mut BackgroundJobs, + jobs: &BackgroundJobQueue, ) -> Self { let task = RemoteCancellable::new(task); let cancel_handle = task.cancellation_handle(); diff --git a/core/startos/src/service/transition/restart.rs b/core/startos/src/service/transition/restart.rs index 7047bd18f..dbf066e6f 100644 --- a/core/startos/src/service/transition/restart.rs +++ b/core/startos/src/service/transition/restart.rs @@ -1,18 +1,24 @@ -use std::sync::Arc; - use futures::FutureExt; use super::TempDesiredState; use crate::prelude::*; +use crate::service::config::GetConfig; +use crate::service::dependencies::DependencyConfig; use crate::service::transition::{TransitionKind, TransitionState}; use crate::service::{Service, ServiceActor}; -use crate::util::actor::{BackgroundJobs, Handler}; +use crate::util::actor::background::BackgroundJobQueue; +use crate::util::actor::{ConflictBuilder, Handler}; use crate::util::future::RemoteCancellable; -struct Restart; +pub(super) struct Restart; impl Handler for ServiceActor { type Response = (); - async fn handle(&mut self, _: Restart, jobs: &mut BackgroundJobs) -> Self::Response { + fn conflicts_with(_: &Restart) -> ConflictBuilder { + ConflictBuilder::everything() + .except::() + .except::() + } + async fn handle(&mut self, _: Restart, jobs: &BackgroundJobQueue) -> Self::Response { // So Need a handle to just a single field in the state let temp = TempDesiredState::new(&self.0.persistent_container.state); let mut current = self.0.persistent_container.state.subscribe(); diff --git a/core/startos/src/util/actor/background.rs b/core/startos/src/util/actor/background.rs new file mode 100644 index 000000000..f37e10c14 --- /dev/null +++ b/core/startos/src/util/actor/background.rs @@ -0,0 +1,60 @@ +use futures::future::BoxFuture; +use futures::{Future, FutureExt}; +use tokio::sync::mpsc; + +#[derive(Clone)] +pub struct BackgroundJobQueue(mpsc::UnboundedSender>); +impl BackgroundJobQueue { + pub fn new() -> (Self, BackgroundJobRunner) { + let (send, recv) = mpsc::unbounded_channel(); + ( + Self(send), + BackgroundJobRunner { + recv, + jobs: Vec::new(), + }, + ) + } + pub fn add_job(&self, fut: impl Future + Send + 'static) { + let _ = self.0.send(fut.boxed()); + } +} + +pub struct BackgroundJobRunner { + recv: mpsc::UnboundedReceiver>, + jobs: Vec>, +} +impl BackgroundJobRunner { + pub fn is_empty(&self) -> bool { + self.recv.is_empty() && self.jobs.is_empty() + } +} +impl Future for BackgroundJobRunner { + type Output = (); + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + while let std::task::Poll::Ready(Some(job)) = self.recv.poll_recv(cx) { + self.jobs.push(job); + } + let complete = self + .jobs + .iter_mut() + .enumerate() + .filter_map(|(i, f)| match f.poll_unpin(cx) { + std::task::Poll::Pending => None, + std::task::Poll::Ready(_) => Some(i), + }) + .collect::>(); + for idx in complete.into_iter().rev() { + #[allow(clippy::let_underscore_future)] + let _ = self.jobs.swap_remove(idx); + } + if self.jobs.is_empty() && self.recv.is_closed() { + std::task::Poll::Ready(()) + } else { + std::task::Poll::Pending + } + } +} diff --git a/core/startos/src/util/actor/concurrent.rs b/core/startos/src/util/actor/concurrent.rs new file mode 100644 index 000000000..d330102ca --- /dev/null +++ b/core/startos/src/util/actor/concurrent.rs @@ -0,0 +1,208 @@ +use std::any::Any; +use std::sync::Arc; +use std::time::Duration; + +use futures::future::{ready, BoxFuture}; +use futures::{Future, FutureExt, TryFutureExt}; +use helpers::NonDetachingJoinHandle; +use tokio::sync::{mpsc, oneshot}; + +use crate::prelude::*; +use crate::util::actor::background::{BackgroundJobQueue, BackgroundJobRunner}; +use crate::util::actor::{Actor, ConflictFn, Handler, PendingMessageStrategy, Request}; + +#[pin_project::pin_project] +struct ConcurrentRunner { + actor: A, + shutdown: Option>, + waiting: Vec>, + recv: mpsc::UnboundedReceiver>, + handlers: Vec<( + Arc>, + oneshot::Sender>, + BoxFuture<'static, Box>, + )>, + queue: BackgroundJobQueue, + #[pin] + bg_runner: BackgroundJobRunner, +} +impl Future for ConcurrentRunner { + type Output = (); + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let mut this = self.project(); + *this.shutdown = this.shutdown.take().and_then(|mut s| { + if s.poll_unpin(cx).is_pending() { + Some(s) + } else { + None + } + }); + if this.shutdown.is_some() { + while let std::task::Poll::Ready(Some((msg, reply))) = this.recv.poll_recv(cx) { + if this.handlers.iter().any(|(f, _, _)| f(&*msg)) { + this.waiting.push((msg, reply)); + } else { + let mut actor = this.actor.clone(); + let queue = this.queue.clone(); + this.handlers.push(( + msg.conflicts_with(), + reply, + async move { msg.handle_with(&mut actor, &queue).await }.boxed(), + )) + } + } + } + // handlers + while { + let mut cont = false; + let complete = this + .handlers + .iter_mut() + .enumerate() + .filter_map(|(i, (_, _, f))| match f.poll_unpin(cx) { + std::task::Poll::Pending => None, + std::task::Poll::Ready(res) => Some((i, res)), + }) + .collect::>(); + for (idx, res) in complete.into_iter().rev() { + #[allow(clippy::let_underscore_future)] + let (f, reply, _) = this.handlers.swap_remove(idx); + let _ = reply.send(res); + // TODO: replace with Vec::extract_if once stable + if this.shutdown.is_some() { + let mut i = 0; + while i < this.waiting.len() { + if f(&*this.waiting[i].0) + && !this.handlers.iter().any(|(f, _, _)| f(&*this.waiting[i].0)) + { + let (msg, reply) = this.waiting.remove(i); + let mut actor = this.actor.clone(); + let queue = this.queue.clone(); + this.handlers.push(( + msg.conflicts_with(), + reply, + async move { msg.handle_with(&mut actor, &queue).await }.boxed(), + )); + cont = true; + } else { + i += 1; + } + } + } + } + cont + } {} + let _ = this.bg_runner.as_mut().poll(cx); + if this.waiting.is_empty() && this.handlers.is_empty() && this.bg_runner.is_empty() { + std::task::Poll::Ready(()) + } else { + std::task::Poll::Pending + } + } +} + +pub struct ConcurrentActor { + shutdown: oneshot::Sender<()>, + runtime: NonDetachingJoinHandle<()>, + messenger: mpsc::UnboundedSender>, +} +impl ConcurrentActor { + pub fn new(mut actor: A) -> Self { + let (shutdown_send, shutdown_recv) = oneshot::channel(); + let (messenger_send, messenger_recv) = mpsc::unbounded_channel::>(); + let runtime = NonDetachingJoinHandle::from(tokio::spawn(async move { + let (queue, runner) = BackgroundJobQueue::new(); + actor.init(&queue); + ConcurrentRunner { + actor, + shutdown: Some(shutdown_recv), + waiting: Vec::new(), + recv: messenger_recv, + handlers: Vec::new(), + queue, + bg_runner: runner, + } + .await + })); + Self { + shutdown: shutdown_send, + runtime, + messenger: messenger_send, + } + } + + /// Message is guaranteed to be queued immediately + pub fn queue( + &self, + message: M, + ) -> impl Future> + where + A: Handler, + { + if self.runtime.is_finished() { + return futures::future::Either::Left(ready(Err(Error::new( + eyre!("actor runtime has exited"), + ErrorKind::Unknown, + )))); + } + let (reply_send, reply_recv) = oneshot::channel(); + self.messenger + .send((Box::new(message), reply_send)) + .unwrap(); + futures::future::Either::Right( + reply_recv + .map_err(|_| Error::new(eyre!("actor runtime has exited"), ErrorKind::Unknown)) + .and_then(|a| { + ready( + a.downcast() + .map_err(|_| { + Error::new( + eyre!("received incorrect type in response"), + ErrorKind::Incoherent, + ) + }) + .map(|a| *a), + ) + }), + ) + } + + pub async fn send(&self, message: M) -> Result + where + A: Handler, + { + self.queue(message).await + } + + pub async fn shutdown(self, strategy: PendingMessageStrategy) { + drop(self.messenger); + let timeout = match strategy { + PendingMessageStrategy::CancelAll => { + self.shutdown.send(()).unwrap(); + Some(Duration::from_secs(0)) + } + PendingMessageStrategy::FinishCurrentCancelPending { timeout } => { + self.shutdown.send(()).unwrap(); + timeout + } + PendingMessageStrategy::FinishAll { timeout } => timeout, + }; + let aborter = if let Some(timeout) = timeout { + let hdl = self.runtime.abort_handle(); + async move { + tokio::time::sleep(timeout).await; + hdl.abort(); + } + .boxed() + } else { + futures::future::pending().boxed() + }; + tokio::select! { + _ = aborter => (), + _ = self.runtime => (), + } + } +} diff --git a/core/startos/src/util/actor/mod.rs b/core/startos/src/util/actor/mod.rs new file mode 100644 index 000000000..5cef7c22e --- /dev/null +++ b/core/startos/src/util/actor/mod.rs @@ -0,0 +1,148 @@ +use std::any::{Any, TypeId}; +use std::collections::BTreeMap; +use std::sync::Arc; +use std::time::Duration; + +use futures::future::BoxFuture; +use futures::{Future, FutureExt}; +use tokio::sync::oneshot; + +#[allow(unused_imports)] +use crate::prelude::*; +use crate::util::actor::background::BackgroundJobQueue; + +pub mod background; +pub mod concurrent; +pub mod simple; + +pub trait Actor: Sized + Send + 'static { + #[allow(unused_variables)] + fn init(&mut self, jobs: &BackgroundJobQueue) {} +} + +pub trait Handler: Actor { + type Response: Any + Send; + /// DRAGONS: this must be correctly implemented bi-directionally in order to work as expected + fn conflicts_with(#[allow(unused_variables)] msg: &M) -> ConflictBuilder { + ConflictBuilder::everything() + } + fn handle( + &mut self, + msg: M, + jobs: &BackgroundJobQueue, + ) -> impl Future + Send; +} + +type ConflictFn = dyn Fn(&dyn Message) -> bool + Send + Sync; + +trait Message: Send + Any { + fn conflicts_with(&self) -> Arc>; + fn handle_with<'a>( + self: Box, + actor: &'a mut A, + jobs: &'a BackgroundJobQueue, + ) -> BoxFuture<'a, Box>; +} +impl Message for M +where + A: Handler, +{ + fn conflicts_with(&self) -> Arc> { + A::conflicts_with(self).build() + } + fn handle_with<'a>( + self: Box, + actor: &'a mut A, + jobs: &'a BackgroundJobQueue, + ) -> BoxFuture<'a, Box> { + async move { Box::new(actor.handle(*self, jobs).await) as Box }.boxed() + } +} +impl dyn Message { + #[inline] + pub fn is>(&self) -> bool { + let t = TypeId::of::(); + let concrete = self.type_id(); + t == concrete + } + #[inline] + pub unsafe fn downcast_ref_unchecked>(&self) -> &M { + debug_assert!(self.is::()); + unsafe { &*(self as *const dyn Message as *const M) } + } + #[inline] + fn downcast_ref>(&self) -> Option<&M> { + if self.is::() { + unsafe { Some(self.downcast_ref_unchecked()) } + } else { + None + } + } +} + +type Request = (Box>, oneshot::Sender>); + +pub enum PendingMessageStrategy { + CancelAll, + FinishCurrentCancelPending { timeout: Option }, + FinishAll { timeout: Option }, +} + +pub struct ConflictBuilder { + base: bool, + except: BTreeMap) -> bool + Send + Sync>>>, +} +impl ConflictBuilder { + pub const fn everything() -> Self { + Self { + base: true, + except: BTreeMap::new(), + } + } + pub const fn nothing() -> Self { + Self { + base: false, + except: BTreeMap::new(), + } + } + pub fn except(mut self) -> Self + where + A: Handler, + { + self.except.insert(TypeId::of::(), None); + self + } + pub fn except_if bool + Send + Sync + 'static>( + mut self, + f: F, + ) -> Self + where + A: Handler, + { + self.except.insert( + TypeId::of::(), + Some(Box::new(move |m| { + if let Some(m) = m.downcast_ref() { + f(m) + } else { + false + } + })), + ); + self + } + fn build(self) -> Arc> { + Arc::new(move |m| { + self.base + ^ if let Some(entry) = self.except.get(&m.type_id()) { + if let Some(f) = entry { + f(m) + } else { + true + } + } else { + false + } + }) + } +} diff --git a/core/startos/src/util/actor.rs b/core/startos/src/util/actor/simple.rs similarity index 60% rename from core/startos/src/util/actor.rs rename to core/startos/src/util/actor/simple.rs index caa83d8b1..6f880a57a 100644 --- a/core/startos/src/util/actor.rs +++ b/core/startos/src/util/actor/simple.rs @@ -1,85 +1,14 @@ -use std::any::Any; -use std::future::ready; use std::time::Duration; -use futures::future::BoxFuture; +use futures::future::ready; use futures::{Future, FutureExt, TryFutureExt}; use helpers::NonDetachingJoinHandle; use tokio::sync::oneshot::error::TryRecvError; use tokio::sync::{mpsc, oneshot}; use crate::prelude::*; -use crate::util::Never; - -pub trait Actor: Send + 'static { - #[allow(unused_variables)] - fn init(&mut self, jobs: &mut BackgroundJobs) {} -} - -pub trait Handler: Actor { - type Response: Any + Send; - fn handle( - &mut self, - msg: M, - jobs: &mut BackgroundJobs, - ) -> impl Future + Send; -} - -#[async_trait::async_trait] -trait Message: Send { - async fn handle_with( - self: Box, - actor: &mut A, - jobs: &mut BackgroundJobs, - ) -> Box; -} -#[async_trait::async_trait] -impl Message for M -where - A: Handler, -{ - async fn handle_with( - self: Box, - actor: &mut A, - jobs: &mut BackgroundJobs, - ) -> Box { - Box::new(actor.handle(*self, jobs).await) - } -} - -type Request = (Box>, oneshot::Sender>); - -#[derive(Default)] -pub struct BackgroundJobs { - jobs: Vec>, -} -impl BackgroundJobs { - pub fn add_job(&mut self, fut: impl Future + Send + 'static) { - self.jobs.push(fut.boxed()); - } -} -impl Future for BackgroundJobs { - type Output = Never; - fn poll( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - let complete = self - .jobs - .iter_mut() - .enumerate() - .filter_map(|(i, f)| match f.poll_unpin(cx) { - std::task::Poll::Pending => None, - std::task::Poll::Ready(_) => Some(i), - }) - .collect::>(); - for idx in complete.into_iter().rev() { - #[allow(clippy::let_underscore_future)] - let _ = self.jobs.swap_remove(idx); - } - std::task::Poll::Pending - } -} +use crate::util::actor::background::BackgroundJobQueue; +use crate::util::actor::{Actor, Handler, PendingMessageStrategy, Request}; pub struct SimpleActor { shutdown: oneshot::Sender<()>, @@ -91,19 +20,17 @@ impl SimpleActor { let (shutdown_send, mut shutdown_recv) = oneshot::channel(); let (messenger_send, mut messenger_recv) = mpsc::unbounded_channel::>(); let runtime = NonDetachingJoinHandle::from(tokio::spawn(async move { - let mut bg = BackgroundJobs::default(); - actor.init(&mut bg); + let (queue, mut runner) = BackgroundJobQueue::new(); + actor.init(&queue); loop { tokio::select! { - _ = &mut bg => (), + _ = &mut runner => (), msg = messenger_recv.recv() => match msg { Some((msg, reply)) if shutdown_recv.try_recv() == Err(TryRecvError::Empty) => { - let mut new_bg = BackgroundJobs::default(); tokio::select! { - res = msg.handle_with(&mut actor, &mut new_bg) => { let _ = reply.send(res); }, - _ = &mut bg => (), + res = msg.handle_with(&mut actor, &queue) => { let _ = reply.send(res); }, + _ = &mut runner => (), } - bg.jobs.append(&mut new_bg.jobs); } _ => break, }, @@ -189,9 +116,3 @@ impl SimpleActor { } } } - -pub enum PendingMessageStrategy { - CancelAll, - FinishCurrentCancelPending { timeout: Option }, - FinishAll { timeout: Option }, -}