Files
start-os/core/startos/src/util/actor.rs
Aiden McClelland 089199e7c2 Feature/lxc container runtime (#2562)
* wip(fix): Dependencies

* wip: context

* wip(fix) Sorta auth

* wip: warnings

* wip(fix): registry/admin

* wip(fix) marketplace

* wip(fix) Some more converted and fixed with the linter and config

* wip: Working on the static server

* wip(fix)static server

* wip: Remove some asynnc

* wip: Something about the request and regular rpc

* wip: gut install

Co-authored-by: J H <Blu-J@users.noreply.github.com>

* wip: Convert the static server into the new system

* wip delete file

* test

* wip(fix) vhost does not need the with safe defaults

* wip: Adding in the wifi

* wip: Fix the developer and the verify

* wip: new install flow

Co-authored-by: J H <Blu-J@users.noreply.github.com>

* fix middleware

* wip

* wip: Fix the auth

* wip

* continue service refactor

* feature: Service get_config

* feat: Action

* wip: Fighting the great fight against the borrow checker

* wip: Remove an error in a file that I just need to deel with later

* chore: Add in some more lifetime stuff to the services

* wip: Install fix on lifetime

* cleanup

* wip: Deal with the borrow later

* more cleanup

* resolve borrowchecker errors

* wip(feat): add in the handler for the socket, for now

* wip(feat): Update the service_effect_handler::action

* chore: Add in the changes to make sure the from_service goes to context

* chore: Change the

* refactor service map

* fix references to service map

* fill out restore

* wip: Before I work on the store stuff

* fix backup module

* handle some warnings

* feat: add in the ui components on the rust side

* feature: Update the procedures

* chore: Update the js side of the main and a few of the others

* chore: Update the rpc listener to match the persistant container

* wip: Working on updating some things to have a better name

* wip(feat): Try and get the rpc to return the correct shape?

* lxc wip

* wip(feat): Try and get the rpc to return the correct shape?

* build for container runtime wip

* remove container-init

* fix build

* fix error

* chore: Update to work I suppose

* lxc wip

* remove docker module and feature

* download alpine squashfs automatically

* overlays effect

Co-authored-by: Jade <Blu-J@users.noreply.github.com>

* chore: Add the overlay effect

* feat: Add the mounter in the main

* chore: Convert to use the mounts, still need to work with the sandbox

* install fixes

* fix ssl

* fixes from testing

* implement tmpfile for upload

* wip

* misc fixes

* cleanup

* cleanup

* better progress reporting

* progress for sideload

* return real guid

* add devmode script

* fix lxc rootfs path

* fix percentage bar

* fix progress bar styling

* fix build for unstable

* tweaks

* label progress

* tweaks

* update progress more often

* make symlink in rpc_client

* make socket dir

* fix parent path

* add start-cli to container

* add echo and gitInfo commands

* wip: Add the init + errors

* chore: Add in the exit effect for the system

* chore: Change the type to null for failure to parse

* move sigterm timeout to stopping status

* update order

* chore: Update the return type

* remove dbg

* change the map error

* chore: Update the thing to capture id

* chore add some life changes

* chore: Update the loging

* chore: Update the package to run module

* us From for RpcError

* chore: Update to use import instead

* chore: update

* chore: Use require for the backup

* fix a default

* update the type that is wrong

* chore: Update the type of the manifest

* chore: Update to make null

* only symlink if not exists

* get rid of double result

* better debug info for ErrorCollection

* chore: Update effects

* chore: fix

* mount assets and volumes

* add exec instead of spawn

* fix mounting in image

* fix overlay mounts

Co-authored-by: Jade <Blu-J@users.noreply.github.com>

* misc fixes

* feat: Fix two

* fix: systemForEmbassy main

* chore: Fix small part of main loop

* chore: Modify the bundle

* merge

* fixMain loop"

* move tsc to makefile

* chore: Update the return types of the health check

* fix client

* chore: Convert the todo to use tsmatches

* add in the fixes for the seen and create the hack to allow demo

* chore: Update to include the systemForStartOs

* chore UPdate to the latest types from the expected outout

* fixes

* fix typo

* Don't emit if failure on tsc

* wip

Co-authored-by: Jade <Blu-J@users.noreply.github.com>

* add s9pk api

* add inspection

* add inspect manifest

* newline after display serializable

* fix squashfs in image name

* edit manifest

Co-authored-by: Jade <Blu-J@users.noreply.github.com>

* wait for response on repl

* ignore sig for now

* ignore sig for now

* re-enable sig verification

* fix

* wip

* env and chroot

* add profiling logs

* set uid & gid in squashfs to 100000

* set uid of sqfs to 100000

* fix mksquashfs args

* add env to compat

* fix

* re-add docker feature flag

* fix docker output format being stupid

* here be dragons

* chore: Add in the cross compiling for something

* fix npm link

* extract logs from container on exit

* chore: Update for testing

* add log capture to drop trait

* chore: add in the modifications that I make

* chore: Update small things for no updates

* chore: Update the types of something

* chore: Make main not complain

* idmapped mounts

* idmapped volumes

* re-enable kiosk

* chore: Add in some logging for the new system

* bring in start-sdk

* remove avahi

* chore: Update the deps

* switch to musl

* chore: Update the version of prettier

* chore: Organize'

* chore: Update some of the headers back to the standard of fetch

* fix musl build

* fix idmapped mounts

* fix cross build

* use cross compiler for correct arch

* feat: Add in the faked ssl stuff for the effects

* @dr_bonez Did a solution here

* chore: Something that DrBonez

* chore: up

* wip: We have a working server!!!

* wip

* uninstall

* wip

* tes

* misc fixes

* fix cli

* replace interface with host

* chore: Fix the types in some ts files

* chore: quick update for the system for embassy to update the types

* replace br-start9 with lxcbr0

* split patchdb into public/private

* chore: Add changes for config set

* Feat: Adding some debugging for the errors

* wip: Working on getting the set config to work

* chore: Update and fix the small issue with the deserialization

* lightning, masked, schemeOverride, invert host-iface relationship

* feat: Add in the changes for just the sdk

* feat: Add in the changes for the new effects I suppose for now

---------

Co-authored-by: J H <2364004+Blu-J@users.noreply.github.com>
Co-authored-by: J H <Blu-J@users.noreply.github.com>
Co-authored-by: J H <dragondef@gmail.com>
Co-authored-by: Matt Hill <mattnine@protonmail.com>
2024-02-22 21:00:49 +00:00

195 lines
6.0 KiB
Rust

use std::any::Any;
use std::future::ready;
use std::time::Duration;
use futures::future::BoxFuture;
use futures::{Future, FutureExt, TryFutureExt};
use helpers::NonDetachingJoinHandle;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::{mpsc, oneshot};
use crate::prelude::*;
use crate::util::Never;
pub trait Actor: Send + 'static {
#[allow(unused_variables)]
fn init(&mut self, jobs: &mut BackgroundJobs) {}
}
#[async_trait::async_trait]
pub trait Handler<M>: Actor {
type Response: Any + Send;
async fn handle(&mut self, msg: M, jobs: &mut BackgroundJobs) -> Self::Response;
}
#[async_trait::async_trait]
trait Message<A>: Send {
async fn handle_with(
self: Box<Self>,
actor: &mut A,
jobs: &mut BackgroundJobs,
) -> Box<dyn Any + Send>;
}
#[async_trait::async_trait]
impl<M: Send, A: Actor> Message<A> for M
where
A: Handler<M>,
{
async fn handle_with(
self: Box<Self>,
actor: &mut A,
jobs: &mut BackgroundJobs,
) -> Box<dyn Any + Send> {
Box::new(actor.handle(*self, jobs).await)
}
}
type Request<A> = (Box<dyn Message<A>>, oneshot::Sender<Box<dyn Any + Send>>);
#[derive(Default)]
pub struct BackgroundJobs {
jobs: Vec<BoxFuture<'static, ()>>,
}
impl BackgroundJobs {
pub fn add_job(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
self.jobs.push(fut.boxed());
}
}
impl Future for BackgroundJobs {
type Output = Never;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let complete = self
.jobs
.iter_mut()
.enumerate()
.filter_map(|(i, f)| match f.poll_unpin(cx) {
std::task::Poll::Pending => None,
std::task::Poll::Ready(_) => Some(i),
})
.collect::<Vec<_>>();
for idx in complete.into_iter().rev() {
#[allow(clippy::let_underscore_future)]
let _ = self.jobs.swap_remove(idx);
}
std::task::Poll::Pending
}
}
pub struct SimpleActor<A: Actor> {
shutdown: oneshot::Sender<()>,
runtime: NonDetachingJoinHandle<()>,
messenger: mpsc::UnboundedSender<Request<A>>,
}
impl<A: Actor> SimpleActor<A> {
pub fn new(mut actor: A) -> Self {
let (shutdown_send, mut shutdown_recv) = oneshot::channel();
let (messenger_send, mut messenger_recv) = mpsc::unbounded_channel::<Request<A>>();
let runtime = NonDetachingJoinHandle::from(tokio::spawn(async move {
let mut bg = BackgroundJobs::default();
actor.init(&mut bg);
loop {
tokio::select! {
_ = &mut bg => (),
msg = messenger_recv.recv() => match msg {
Some((msg, reply)) if shutdown_recv.try_recv() == Err(TryRecvError::Empty) => {
let mut new_bg = BackgroundJobs::default();
tokio::select! {
res = msg.handle_with(&mut actor, &mut new_bg) => { let _ = reply.send(res); },
_ = &mut bg => (),
}
bg.jobs.append(&mut new_bg.jobs);
}
_ => break,
},
}
}
}));
Self {
shutdown: shutdown_send,
runtime,
messenger: messenger_send,
}
}
/// Message is guaranteed to be queued immediately
pub fn queue<M: Send + 'static>(
&self,
message: M,
) -> impl Future<Output = Result<A::Response, Error>>
where
A: Handler<M>,
{
if self.runtime.is_finished() {
return futures::future::Either::Left(ready(Err(Error::new(
eyre!("actor runtime has exited"),
ErrorKind::Unknown,
))));
}
let (reply_send, reply_recv) = oneshot::channel();
self.messenger
.send((Box::new(message), reply_send))
.unwrap();
futures::future::Either::Right(
reply_recv
.map_err(|_| Error::new(eyre!("actor runtime has exited"), ErrorKind::Unknown))
.and_then(|a| {
ready(
a.downcast()
.map_err(|_| {
Error::new(
eyre!("received incorrect type in response"),
ErrorKind::Incoherent,
)
})
.map(|a| *a),
)
}),
)
}
pub async fn send<M: Send + 'static>(&self, message: M) -> Result<A::Response, Error>
where
A: Handler<M>,
{
self.queue(message).await
}
pub async fn shutdown(self, strategy: PendingMessageStrategy) {
drop(self.messenger);
let timeout = match strategy {
PendingMessageStrategy::CancelAll => {
self.shutdown.send(()).unwrap();
Some(Duration::from_secs(0))
}
PendingMessageStrategy::FinishCurrentCancelPending { timeout } => {
self.shutdown.send(()).unwrap();
timeout
}
PendingMessageStrategy::FinishAll { timeout } => timeout,
};
let aborter = if let Some(timeout) = timeout {
let hdl = self.runtime.abort_handle();
async move {
tokio::time::sleep(timeout).await;
hdl.abort();
}
.boxed()
} else {
futures::future::pending().boxed()
};
tokio::select! {
_ = aborter => (),
_ = self.runtime => (),
}
}
}
pub enum PendingMessageStrategy {
CancelAll,
FinishCurrentCancelPending { timeout: Option<Duration> },
FinishAll { timeout: Option<Duration> },
}