From 3877e43b84a86569af906076824ef43cbfa53727 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Fri, 3 Sep 2021 14:03:08 -0600 Subject: [PATCH] configure datadir on context --- appmgr/build-portable.sh | 2 +- appmgr/src/action/docker.rs | 10 +- appmgr/src/action/mod.rs | 11 +- appmgr/src/backup/mod.rs | 5 + appmgr/src/bin/embassy-init.rs | 62 +++++++--- appmgr/src/bin/embassyd.rs | 93 +++++++++++++-- appmgr/src/config/action.rs | 5 + appmgr/src/config/mod.rs | 27 +++-- appmgr/src/config/spec.rs | 69 +++++++----- appmgr/src/context/mod.rs | 4 +- appmgr/src/context/rpc.rs | 118 ++++++++++++-------- appmgr/src/daemon/mod.rs | 1 - appmgr/src/daemon/tor_health_check.rs | 52 --------- appmgr/src/db/mod.rs | 7 ++ appmgr/src/dependencies.rs | 9 +- appmgr/src/disk/main.rs | 45 ++++++++ appmgr/src/disk/mod.rs | 2 + appmgr/src/{volume/disk.rs => disk/util.rs} | 46 +++++++- appmgr/src/error.rs | 6 +- appmgr/src/hostname.rs | 4 +- appmgr/src/install/cleanup.rs | 11 +- appmgr/src/install/mod.rs | 46 +++++--- appmgr/src/lib.rs | 3 +- appmgr/src/manager/mod.rs | 62 ++++++---- appmgr/src/migration.rs | 5 + appmgr/src/net/tor.rs | 40 +++++++ appmgr/src/net/wifi.rs | 44 ++++---- appmgr/src/shutdown.rs | 4 + appmgr/src/ssh.rs | 3 +- appmgr/src/status/health_check.rs | 6 +- appmgr/src/status/mod.rs | 54 +++++---- appmgr/src/util/mod.rs | 40 ++++++- appmgr/src/{volume/mod.rs => volume.rs} | 44 ++++++-- 33 files changed, 664 insertions(+), 276 deletions(-) delete mode 100644 appmgr/src/daemon/mod.rs delete mode 100644 appmgr/src/daemon/tor_health_check.rs create mode 100644 appmgr/src/disk/main.rs create mode 100644 appmgr/src/disk/mod.rs rename appmgr/src/{volume/disk.rs => disk/util.rs} (88%) create mode 100644 appmgr/src/shutdown.rs rename appmgr/src/{volume/mod.rs => volume.rs} (83%) diff --git a/appmgr/build-portable.sh b/appmgr/build-portable.sh index de2708968..5cd6cd576 100755 --- a/appmgr/build-portable.sh +++ b/appmgr/build-portable.sh @@ -11,5 +11,5 @@ fi alias 'rust-musl-builder'='docker run --rm -it -v "$HOME"/.cargo/registry:/root/.cargo/registry -v "$(pwd)":/home/rust/src start9/rust-musl-cross:x86_64-musl' cd ../.. -rust-musl-builder sh -c "(cd embassy-os/appmgr && cargo +beta build --release --target=x86_64-unknown-linux-musl --no-default-features) --bin=embassy-sdk" +rust-musl-builder sh -c "(cd embassy-os/appmgr && cargo +beta build --release --target=x86_64-unknown-linux-musl --no-default-features)" cd embassy-os/appmgr diff --git a/appmgr/src/action/docker.rs b/appmgr/src/action/docker.rs index 026c627f9..ce4f3ed8a 100644 --- a/appmgr/src/action/docker.rs +++ b/appmgr/src/action/docker.rs @@ -7,6 +7,7 @@ use indexmap::IndexMap; use serde::{Deserialize, Serialize}; use serde_json::Value; +use crate::context::RpcContext; use crate::id::{Id, ImageId}; use crate::s9pk::manifest::{PackageId, SYSTEM_PACKAGE_ID}; use crate::util::{IoFormat, Version}; @@ -36,6 +37,7 @@ pub struct DockerAction { impl DockerAction { pub async fn execute Deserialize<'de>>( &self, + ctx: &RpcContext, pkg_id: &PackageId, pkg_version: &Version, name: Option<&str>, @@ -55,7 +57,7 @@ impl DockerAction { .arg(Self::container_name(pkg_id, name)); } cmd.args( - self.docker_args(pkg_id, pkg_version, volumes, allow_inject) + self.docker_args(ctx, pkg_id, pkg_version, volumes, allow_inject) .await, ); let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) { @@ -108,6 +110,7 @@ impl DockerAction { pub async fn sandboxed Deserialize<'de>>( &self, + ctx: &RpcContext, pkg_id: &PackageId, pkg_version: &Version, volumes: &Volumes, @@ -116,7 +119,7 @@ impl DockerAction { let mut cmd = tokio::process::Command::new("docker"); cmd.arg("run").arg("--rm").arg("--network=none"); cmd.args( - self.docker_args(pkg_id, pkg_version, &volumes.to_readonly(), false) + self.docker_args(ctx, pkg_id, pkg_version, &volumes.to_readonly(), false) .await, ); let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) { @@ -187,6 +190,7 @@ impl DockerAction { async fn docker_args<'a>( &'a self, + ctx: &RpcContext, pkg_id: &PackageId, pkg_version: &Version, volumes: &Volumes, @@ -204,7 +208,7 @@ impl DockerAction { } else { continue; }; - let src = dbg!(volume.path_for(pkg_id, pkg_version, volume_id)); + let src = dbg!(volume.path_for(ctx, pkg_id, pkg_version, volume_id)); if tokio::fs::metadata(&src).await.is_err() { continue; } diff --git a/appmgr/src/action/mod.rs b/appmgr/src/action/mod.rs index 6d5a7eacb..49991218e 100644 --- a/appmgr/src/action/mod.rs +++ b/appmgr/src/action/mod.rs @@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize}; use self::docker::DockerAction; use crate::config::{Config, ConfigSpec}; +use crate::context::RpcContext; use crate::id::Id; use crate::s9pk::manifest::PackageId; use crate::util::{ValuePrimative, Version}; @@ -90,6 +91,7 @@ pub struct Action { impl Action { pub async fn execute( &self, + ctx: &RpcContext, pkg_id: &PackageId, pkg_version: &Version, volumes: &Volumes, @@ -100,6 +102,7 @@ impl Action { .with_kind(crate::ErrorKind::ConfigSpecViolation)?; self.implementation .execute( + ctx, pkg_id, pkg_version, Some(&format!("{}Action", self.name)), @@ -121,6 +124,7 @@ pub enum ActionImplementation { impl ActionImplementation { pub async fn execute Deserialize<'de>>( &self, + ctx: &RpcContext, pkg_id: &PackageId, pkg_version: &Version, name: Option<&str>, @@ -131,13 +135,14 @@ impl ActionImplementation { match self { ActionImplementation::Docker(action) => { action - .execute(pkg_id, pkg_version, name, volumes, input, allow_inject) + .execute(ctx, pkg_id, pkg_version, name, volumes, input, allow_inject) .await } } } pub async fn sandboxed Deserialize<'de>>( &self, + ctx: &RpcContext, pkg_id: &PackageId, pkg_version: &Version, volumes: &Volumes, @@ -145,7 +150,9 @@ impl ActionImplementation { ) -> Result, Error> { match self { ActionImplementation::Docker(action) => { - action.sandboxed(pkg_id, pkg_version, volumes, input).await + action + .sandboxed(ctx, pkg_id, pkg_version, volumes, input) + .await } } } diff --git a/appmgr/src/backup/mod.rs b/appmgr/src/backup/mod.rs index b6592ad08..40b42b919 100644 --- a/appmgr/src/backup/mod.rs +++ b/appmgr/src/backup/mod.rs @@ -3,6 +3,7 @@ use patch_db::HasModel; use serde::{Deserialize, Serialize}; use crate::action::ActionImplementation; +use crate::context::RpcContext; use crate::s9pk::manifest::PackageId; use crate::util::Version; use crate::volume::{Volume, VolumeId, Volumes}; @@ -16,6 +17,7 @@ pub struct BackupActions { impl BackupActions { pub async fn create( &self, + ctx: &RpcContext, pkg_id: &PackageId, pkg_version: &Version, volumes: &Volumes, @@ -24,6 +26,7 @@ impl BackupActions { volumes.insert(VolumeId::Backup, Volume::Backup { readonly: false }); self.create .execute( + ctx, pkg_id, pkg_version, Some("CreateBackup"), @@ -39,6 +42,7 @@ impl BackupActions { pub async fn restore( &self, + ctx: &RpcContext, pkg_id: &PackageId, pkg_version: &Version, volumes: &Volumes, @@ -47,6 +51,7 @@ impl BackupActions { volumes.insert(VolumeId::Backup, Volume::Backup { readonly: true }); self.restore .execute( + ctx, pkg_id, pkg_version, Some("RestoreBackup"), diff --git a/appmgr/src/bin/embassy-init.rs b/appmgr/src/bin/embassy-init.rs index 5382daf6f..8f883b25c 100644 --- a/appmgr/src/bin/embassy-init.rs +++ b/appmgr/src/bin/embassy-init.rs @@ -1,28 +1,60 @@ +use embassy::context::rpc::RpcContextConfig; use embassy::Error; -async fn inner_main() -> Result<(), Error> { - // host setup flow if needed - +async fn init(cfg: &RpcContextConfig) -> Result<(), Error> { // mount disk - embassy::volume::disk::mount("/dev/sda", "/mnt/embassy-os-crypt").await?; // TODO: by uuid - - // unlock disk - - // mount /var/log/journal - - // sync ssh - - // sync wifi - - // hostname-set + if embassy::disk::main::importable().await? { + embassy::disk::main::load("password").await?; + } else { + // embassy::setup::host_setup().await?; + } + embassy::disk::util::bind("/embassy-data/main/logs", "/var/log/journal", false).await?; + embassy::ssh::sync_keys_from_db(todo!(), "/root/.ssh/authorized_keys").await?; + todo!("sync wifi"); embassy::hostname::sync_hostname().await?; Ok(()) } +async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> { + if let Err(e) = init(&RpcContextConfig::load(cfg_path).await?).await { + embassy::sound::BEETHOVEN.play().await?; + log::error!("{}", e.source); + log::debug!("{}", e.source) + } else { + embassy::sound::MARIO_COIN.play().await? + } + + Ok(()) +} + fn main() { + let matches = clap::App::new("embassyd") + .arg( + clap::Arg::with_name("config") + .short("c") + .long("config") + .takes_value(true), + ) + .arg( + clap::Arg::with_name("verbosity") + .short("v") + .multiple(true) + .takes_value(false), + ) + .get_matches(); + + simple_logging::log_to_stderr(match matches.occurrences_of("verbosity") { + 0 => log::LevelFilter::Off, + 1 => log::LevelFilter::Error, + 2 => log::LevelFilter::Warn, + 3 => log::LevelFilter::Info, + 4 => log::LevelFilter::Debug, + _ => log::LevelFilter::Trace, + }); + let cfg_path = matches.value_of("config"); let rt = tokio::runtime::Runtime::new().expect("failed to initialize runtime"); - match rt.block_on(inner_main()) { + match rt.block_on(inner_main(cfg_path)) { Ok(_) => (), Err(e) => { drop(rt); diff --git a/appmgr/src/bin/embassyd.rs b/appmgr/src/bin/embassyd.rs index ccc7e28f4..e4911b239 100644 --- a/appmgr/src/bin/embassyd.rs +++ b/appmgr/src/bin/embassyd.rs @@ -1,4 +1,3 @@ -use std::path::Path; use std::time::Duration; use anyhow::anyhow; @@ -8,14 +7,16 @@ use embassy::db::subscribe; use embassy::hostname::{get_hostname, get_id}; use embassy::middleware::auth::auth; use embassy::middleware::cors::cors; -use embassy::net::tor::os_key; +use embassy::net::tor::{os_key, tor_health_check}; use embassy::status::{check_all, synchronize_all}; use embassy::util::daemon; use embassy::{Error, ErrorKind, ResultExt}; -use futures::TryFutureExt; +use futures::{FutureExt, TryFutureExt}; use patch_db::json_ptr::JsonPointer; +use reqwest::{Client, Proxy}; use rpc_toolkit::hyper::{Body, Response, Server, StatusCode}; -use rpc_toolkit::rpc_server; +use rpc_toolkit::{rpc_server, Context}; +use tokio::signal::unix::signal; fn status_fn(_: i32) -> StatusCode { StatusCode::OK @@ -30,7 +31,37 @@ fn err_to_500(e: Error) -> Response { } async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> { - let rpc_ctx = RpcContext::init(cfg_path).await?; + let (shutdown, _) = tokio::sync::broadcast::channel(1); + + let rpc_ctx = RpcContext::init(cfg_path, shutdown).await?; + + let sig_handler_ctx = rpc_ctx.clone(); + let sig_handler = tokio::spawn(async move { + use tokio::signal::unix::SignalKind; + futures::future::select_all( + [ + SignalKind::interrupt(), + SignalKind::quit(), + SignalKind::terminate(), + ] + .iter() + .map(|s| { + async move { + signal(*s) + .expect(&format!("register {:?} handler", s)) + .recv() + .await + } + .boxed() + }), + ) + .await; + sig_handler_ctx + .shutdown + .send(None) + .expect("send shutdown signal"); + }); + if !rpc_ctx.db.exists(&::default()).await? { rpc_ctx .db @@ -55,12 +86,22 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> { cors, auth, ] + }) + .with_graceful_shutdown({ + let mut shutdown = rpc_ctx.shutdown.subscribe(); + async move { + shutdown.recv().await.expect("context dropped"); + } }); let rev_cache_ctx = rpc_ctx.clone(); let revision_cache_task = tokio::spawn(async move { let mut sub = rev_cache_ctx.db.subscribe(); - loop { + let mut shutdown = rev_cache_ctx.shutdown.subscribe(); + while matches!( + shutdown.try_recv(), + Err(tokio::sync::broadcast::error::TryRecvError::Empty) + ) { let rev = match sub.recv().await { Ok(a) => a, Err(_) => { @@ -76,8 +117,8 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> { } }); - let tor_health_check_task = - embassy::daemon::tor_health_check::tor_health_check_daemon(&rpc_ctx.net_controller.tor); + // let tor_health_check_task = + // embassy::daemon::tor_health_check::tor_health_check_daemon(&rpc_ctx.net_controller.tor); let ws_ctx = rpc_ctx.clone(); let ws_server = { let builder = Server::bind(&ws_ctx.bind_ws); @@ -101,7 +142,13 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> { } }); builder.serve(make_svc) - }; + } + .with_graceful_shutdown({ + let mut shutdown = rpc_ctx.shutdown.subscribe(); + async move { + shutdown.recv().await.expect("context dropped"); + } + }); let status_ctx = rpc_ctx.clone(); let status_daemon = daemon( @@ -117,6 +164,7 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> { } }, Duration::from_millis(500), + rpc_ctx.shutdown.subscribe(), ); let health_ctx = rpc_ctx.clone(); let health_daemon = daemon( @@ -132,7 +180,26 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> { } }, Duration::from_millis(500), + rpc_ctx.shutdown.subscribe(), ); + let tor_health_ctx = rpc_ctx.clone(); + let tor_client = Client::builder() + .proxy( + Proxy::all(format!("socks5h://{}:{}", rpc_ctx.host(), rpc_ctx.port())) + .with_kind(crate::ErrorKind::Network)?, + ) + .build() + .with_kind(crate::ErrorKind::Network)?; + let tor_health_daemon = daemon( + move || { + let ctx = tor_health_ctx.clone(); + let client = tor_client.clone(); + async move { tor_health_check(&client, &ctx.net_controller.tor).await } + }, + Duration::from_secs(300), + rpc_ctx.shutdown.subscribe(), + ); + futures::try_join!( server.map_err(|e| Error::new(e, ErrorKind::Network)), revision_cache_task.map_err(|e| Error::new( @@ -148,8 +215,14 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> { e.context("Health Check daemon panicked!"), ErrorKind::Unknown )), - futures::FutureExt::map(tor_health_check_task, Ok) + tor_health_daemon + .map_err(|e| Error::new(e.context("Tor Health daemon panicked!"), ErrorKind::Unknown)), )?; + + rpc_ctx.managers.empty().await?; + + sig_handler.abort(); + Ok(()) } diff --git a/appmgr/src/config/action.rs b/appmgr/src/config/action.rs index 63d2401ee..bd08fb974 100644 --- a/appmgr/src/config/action.rs +++ b/appmgr/src/config/action.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; use super::{Config, ConfigSpec}; use crate::action::ActionImplementation; +use crate::context::RpcContext; use crate::dependencies::Dependencies; use crate::s9pk::manifest::PackageId; use crate::status::health_check::HealthCheckId; @@ -28,12 +29,14 @@ pub struct ConfigActions { impl ConfigActions { pub async fn get( &self, + ctx: &RpcContext, pkg_id: &PackageId, pkg_version: &Version, volumes: &Volumes, ) -> Result { self.get .execute( + ctx, pkg_id, pkg_version, Some("GetConfig"), @@ -49,6 +52,7 @@ impl ConfigActions { pub async fn set( &self, + ctx: &RpcContext, pkg_id: &PackageId, pkg_version: &Version, dependencies: &Dependencies, @@ -58,6 +62,7 @@ impl ConfigActions { let res: SetResult = self .set .execute( + ctx, pkg_id, pkg_version, Some("SetConfig"), diff --git a/appmgr/src/config/mod.rs b/appmgr/src/config/mod.rs index 1b2d4b362..19927ca98 100644 --- a/appmgr/src/config/mod.rs +++ b/appmgr/src/config/mod.rs @@ -2,7 +2,6 @@ use std::time::Duration; use anyhow::anyhow; use bollard::container::KillContainerOptions; -use bollard::Docker; use futures::future::{BoxFuture, FutureExt}; use indexmap::{IndexMap, IndexSet}; use itertools::Itertools; @@ -172,7 +171,7 @@ pub async fn get( .get(&mut db, true) .await?; let volumes = pkg_model.manifest().volumes().get(&mut db, true).await?; - action.get(&id, &*version, &*volumes).await + action.get(&ctx, &id, &*version, &*volumes).await } #[command( @@ -205,8 +204,8 @@ pub async fn set_dry( let mut tx = db.begin().await?; let mut breakages = IndexMap::new(); configure( + &ctx, &mut tx, - &ctx.docker, &id, config, &timeout, @@ -241,8 +240,8 @@ pub async fn set_impl( let mut tx = db.begin().await?; let mut breakages = IndexMap::new(); configure( + &ctx, &mut tx, - &ctx.docker, &id, config, &timeout, @@ -270,8 +269,8 @@ pub async fn set_impl( } pub fn configure<'a, Db: DbHandle>( + ctx: &'a RpcContext, db: &'a mut Db, - docker: &'a Docker, id: &'a PackageId, config: Option, timeout: &'a Option, @@ -311,7 +310,7 @@ pub fn configure<'a, Db: DbHandle>( let ConfigRes { config: old_config, spec, - } = action.get(id, &*version, &*volumes).await?; + } = action.get(ctx, id, &*version, &*volumes).await?; // determine new config to use let mut config = if let Some(config) = config.or_else(|| old_config.clone()) { @@ -321,7 +320,7 @@ pub fn configure<'a, Db: DbHandle>( }; spec.matches(&config)?; // check that new config matches spec - spec.update(db, &*overrides, &mut config).await?; // dereference pointers in the new config + spec.update(ctx, db, &*overrides, &mut config).await?; // dereference pointers in the new config // create backreferences to pointers let mut sys = pkg_model.clone().system_pointers().get_mut(db).await?; @@ -360,7 +359,7 @@ pub fn configure<'a, Db: DbHandle>( let signal = if !dry_run { // run config action let res = action - .set(id, &*version, &*dependencies, &*volumes, &config) + .set(ctx, id, &*version, &*dependencies, &*volumes, &config) .await?; // track dependencies with no pointers @@ -489,7 +488,13 @@ pub fn configure<'a, Db: DbHandle>( { let manifest = dependent_model.clone().manifest().get(db, true).await?; if let Err(error) = cfg - .check(dependent, &manifest.version, &manifest.volumes, &config) + .check( + ctx, + dependent, + &manifest.version, + &manifest.volumes, + &config, + ) .await? { let dep_err = DependencyError::ConfigUnsatisfied { error }; @@ -509,7 +514,7 @@ pub fn configure<'a, Db: DbHandle>( if let PackagePointerSpecVariant::Config { selector, multi } = ptr { if selector.select(*multi, &next) != selector.select(*multi, &prev) { if let Err(e) = configure( - db, docker, dependent, None, timeout, dry_run, overrides, breakages, + ctx, db, dependent, None, timeout, dry_run, overrides, breakages, ) .await { @@ -542,7 +547,7 @@ pub fn configure<'a, Db: DbHandle>( } if let Some(signal) = signal { - docker + ctx.docker .kill_container( &DockerAction::container_name(id, None), Some(KillContainerOptions { diff --git a/appmgr/src/config/spec.rs b/appmgr/src/config/spec.rs index 278800abd..482438740 100644 --- a/appmgr/src/config/spec.rs +++ b/appmgr/src/config/spec.rs @@ -18,6 +18,7 @@ use serde_json::{Number, Value}; use super::util::{self, CharSet, NumRange, UniqueBy, STATIC_NULL}; use super::{Config, MatchError, NoMatchWithPath, TimeoutError, TypeOf}; use crate::config::ConfigurationError; +use crate::context::RpcContext; use crate::net::interface::InterfaceId; use crate::s9pk::manifest::{Manifest, PackageId}; use crate::Error; @@ -34,6 +35,7 @@ pub trait ValueSpec { // update is to fill in values for environment pointers recursively async fn update( &self, + ctx: &RpcContext, db: &mut Db, config_overrides: &IndexMap, value: &mut Value, @@ -148,11 +150,12 @@ where } async fn update( &self, + ctx: &RpcContext, db: &mut Db, config_overrides: &IndexMap, value: &mut Value, ) -> Result<(), ConfigurationError> { - self.inner.update(db, config_overrides, value).await + self.inner.update(ctx, db, config_overrides, value).await } fn pointers(&self, value: &Value) -> Result, NoMatchWithPath> { self.inner.pointers(value) @@ -188,11 +191,12 @@ where } async fn update( &self, + ctx: &RpcContext, db: &mut Db, config_overrides: &IndexMap, value: &mut Value, ) -> Result<(), ConfigurationError> { - self.inner.update(db, config_overrides, value).await + self.inner.update(ctx, db, config_overrides, value).await } fn pointers(&self, value: &Value) -> Result, NoMatchWithPath> { self.inner.pointers(value) @@ -261,11 +265,12 @@ where } async fn update( &self, + ctx: &RpcContext, db: &mut Db, config_overrides: &IndexMap, value: &mut Value, ) -> Result<(), ConfigurationError> { - self.inner.update(db, config_overrides, value).await + self.inner.update(ctx, db, config_overrides, value).await } fn pointers(&self, value: &Value) -> Result, NoMatchWithPath> { self.inner.pointers(value) @@ -371,19 +376,20 @@ impl ValueSpec for ValueSpecAny { } async fn update( &self, + ctx: &RpcContext, db: &mut Db, config_overrides: &IndexMap, value: &mut Value, ) -> Result<(), ConfigurationError> { match self { - ValueSpecAny::Boolean(a) => a.update(db, config_overrides, value).await, - ValueSpecAny::Enum(a) => a.update(db, config_overrides, value).await, - ValueSpecAny::List(a) => a.update(db, config_overrides, value).await, - ValueSpecAny::Number(a) => a.update(db, config_overrides, value).await, - ValueSpecAny::Object(a) => a.update(db, config_overrides, value).await, - ValueSpecAny::String(a) => a.update(db, config_overrides, value).await, - ValueSpecAny::Union(a) => a.update(db, config_overrides, value).await, - ValueSpecAny::Pointer(a) => a.update(db, config_overrides, value).await, + ValueSpecAny::Boolean(a) => a.update(ctx, db, config_overrides, value).await, + ValueSpecAny::Enum(a) => a.update(ctx, db, config_overrides, value).await, + ValueSpecAny::List(a) => a.update(ctx, db, config_overrides, value).await, + ValueSpecAny::Number(a) => a.update(ctx, db, config_overrides, value).await, + ValueSpecAny::Object(a) => a.update(ctx, db, config_overrides, value).await, + ValueSpecAny::String(a) => a.update(ctx, db, config_overrides, value).await, + ValueSpecAny::Union(a) => a.update(ctx, db, config_overrides, value).await, + ValueSpecAny::Pointer(a) => a.update(ctx, db, config_overrides, value).await, } } fn pointers(&self, value: &Value) -> Result, NoMatchWithPath> { @@ -463,6 +469,7 @@ impl ValueSpec for ValueSpecBoolean { } async fn update( &self, + ctx: &RpcContext, _db: &mut Db, _config_overrides: &IndexMap, _value: &mut Value, @@ -550,6 +557,7 @@ impl ValueSpec for ValueSpecEnum { } async fn update( &self, + ctx: &RpcContext, _db: &mut Db, _config_overrides: &IndexMap, _value: &mut Value, @@ -634,13 +642,14 @@ where } async fn update( &self, + ctx: &RpcContext, db: &mut Db, config_overrides: &IndexMap, value: &mut Value, ) -> Result<(), ConfigurationError> { if let Value::Array(ref mut ls) = value { for (i, val) in ls.into_iter().enumerate() { - match self.spec.update(db, config_overrides, val).await { + match self.spec.update(ctx, db, config_overrides, val).await { Err(ConfigurationError::NoMatch(e)) => { Err(ConfigurationError::NoMatch(e.prepend(format!("{}", i)))) } @@ -735,16 +744,17 @@ impl ValueSpec for ValueSpecList { } async fn update( &self, + ctx: &RpcContext, db: &mut Db, config_overrides: &IndexMap, value: &mut Value, ) -> Result<(), ConfigurationError> { match self { - ValueSpecList::Enum(a) => a.update(db, config_overrides, value).await, - ValueSpecList::Number(a) => a.update(db, config_overrides, value).await, - ValueSpecList::Object(a) => a.update(db, config_overrides, value).await, - ValueSpecList::String(a) => a.update(db, config_overrides, value).await, - ValueSpecList::Union(a) => a.update(db, config_overrides, value).await, + ValueSpecList::Enum(a) => a.update(ctx, db, config_overrides, value).await, + ValueSpecList::Number(a) => a.update(ctx, db, config_overrides, value).await, + ValueSpecList::Object(a) => a.update(ctx, db, config_overrides, value).await, + ValueSpecList::String(a) => a.update(ctx, db, config_overrides, value).await, + ValueSpecList::Union(a) => a.update(ctx, db, config_overrides, value).await, } } fn pointers(&self, value: &Value) -> Result, NoMatchWithPath> { @@ -857,6 +867,7 @@ impl ValueSpec for ValueSpecNumber { } async fn update( &self, + ctx: &RpcContext, _db: &mut Db, _config_overrides: &IndexMap, _value: &mut Value, @@ -968,12 +979,13 @@ impl ValueSpec for ValueSpecObject { } async fn update( &self, + ctx: &RpcContext, db: &mut Db, config_overrides: &IndexMap, value: &mut Value, ) -> Result<(), ConfigurationError> { if let Value::Object(o) = value { - self.spec.update(db, config_overrides, o).await + self.spec.update(ctx, db, config_overrides, o).await } else { Err(ConfigurationError::NoMatch(NoMatchWithPath::new( MatchError::InvalidType("object", value.type_of()), @@ -1066,6 +1078,7 @@ impl ConfigSpec { pub async fn update( &self, + ctx: &RpcContext, db: &mut Db, config_overrides: &IndexMap, cfg: &mut Config, @@ -1074,10 +1087,10 @@ impl ConfigSpec { match cfg.get_mut(k) { None => { let mut v = Value::Null; - vs.update(db, config_overrides, &mut v).await?; + vs.update(ctx, db, config_overrides, &mut v).await?; cfg.insert(k.clone(), v); } - Some(v) => match vs.update(db, config_overrides, v).await { + Some(v) => match vs.update(ctx, db, config_overrides, v).await { Err(ConfigurationError::NoMatch(e)) => { Err(ConfigurationError::NoMatch(e.prepend(k.clone()))) } @@ -1156,6 +1169,7 @@ impl ValueSpec for ValueSpecString { } async fn update( &self, + ctx: &RpcContext, _db: &mut Db, _config_overrides: &IndexMap, _value: &mut Value, @@ -1365,6 +1379,7 @@ impl ValueSpec for ValueSpecUnion { } async fn update( &self, + ctx: &RpcContext, db: &mut Db, config_overrides: &IndexMap, value: &mut Value, @@ -1378,7 +1393,7 @@ impl ValueSpec for ValueSpecUnion { None => Err(ConfigurationError::NoMatch(NoMatchWithPath::new( MatchError::Union(tag.clone(), self.variants.keys().cloned().collect()), ))), - Some(spec) => spec.update(db, config_overrides, o).await, + Some(spec) => spec.update(ctx, db, config_overrides, o).await, }, Some(other) => Err(ConfigurationError::NoMatch( NoMatchWithPath::new(MatchError::InvalidType("string", other.type_of())) @@ -1505,13 +1520,14 @@ impl ValueSpec for ValueSpecPointer { } async fn update( &self, + ctx: &RpcContext, db: &mut Db, config_overrides: &IndexMap, value: &mut Value, ) -> Result<(), ConfigurationError> { match self { - ValueSpecPointer::Package(a) => a.update(db, config_overrides, value).await, - ValueSpecPointer::System(a) => a.update(db, config_overrides, value).await, + ValueSpecPointer::Package(a) => a.update(ctx, db, config_overrides, value).await, + ValueSpecPointer::System(a) => a.update(ctx, db, config_overrides, value).await, } } fn pointers(&self, _value: &Value) -> Result, NoMatchWithPath> { @@ -1543,6 +1559,7 @@ impl fmt::Display for PackagePointerSpec { impl PackagePointerSpec { async fn deref( &self, + ctx: &RpcContext, db: &mut Db, config_overrides: &IndexMap, ) -> Result { @@ -1602,7 +1619,7 @@ impl PackagePointerSpec { (&*version, &*cfg_actions, &*volumes) { let cfg_res = cfg_actions - .get(&self.package_id, version, volumes) + .get(&ctx, &self.package_id, version, volumes) .await .map_err(|e| ConfigurationError::SystemError(Error::from(e)))?; if let Some(cfg) = cfg_res.config { @@ -1646,11 +1663,12 @@ impl ValueSpec for PackagePointerSpec { } async fn update( &self, + ctx: &RpcContext, db: &mut Db, config_overrides: &IndexMap, value: &mut Value, ) -> Result<(), ConfigurationError> { - *value = self.deref(db, config_overrides).await?; + *value = self.deref(ctx, db, config_overrides).await?; Ok(()) } fn pointers(&self, _value: &Value) -> Result, NoMatchWithPath> { @@ -1762,6 +1780,7 @@ impl ValueSpec for SystemPointerSpec { } async fn update( &self, + ctx: &RpcContext, db: &mut Db, _config_overrides: &IndexMap, value: &mut Value, diff --git a/appmgr/src/context/mod.rs b/appmgr/src/context/mod.rs index d2a6ba3da..a3417a634 100644 --- a/appmgr/src/context/mod.rs +++ b/appmgr/src/context/mod.rs @@ -1,5 +1,5 @@ -mod cli; -mod rpc; +pub mod cli; +pub mod rpc; pub use cli::CliContext; pub use rpc::RpcContext; diff --git a/appmgr/src/context/rpc.rs b/appmgr/src/context/rpc.rs index 83a61fb89..9f2b69b07 100644 --- a/appmgr/src/context/rpc.rs +++ b/appmgr/src/context/rpc.rs @@ -13,10 +13,12 @@ use serde::Deserialize; use sqlx::migrate::MigrateDatabase; use sqlx::{Sqlite, SqlitePool}; use tokio::fs::File; +use tokio::sync::broadcast::Sender; use tokio::sync::RwLock; use crate::manager::ManagerMap; use crate::net::NetController; +use crate::shutdown::Shutdown; use crate::util::{from_toml_async_reader, AsyncFileExt}; use crate::{Error, ResultExt}; @@ -26,50 +28,39 @@ pub struct RpcContextConfig { pub bind_rpc: Option, pub bind_ws: Option, pub tor_control: Option, - pub db: Option, - pub secret_store: Option, pub revision_cache_size: Option, + pub datadir: Option, } - -pub struct RpcContextSeed { - pub bind_rpc: SocketAddr, - pub bind_ws: SocketAddr, - pub db: PatchDb, - pub secret_store: SqlitePool, - pub docker: Docker, - pub net_controller: Arc, - pub managers: ManagerMap, - pub revision_cache_size: usize, - pub revision_cache: RwLock>>, - pub metrics_cache: RwLock>, -} - -#[derive(Clone)] -pub struct RpcContext(Arc); -impl RpcContext { - pub async fn init>(cfg_path: Option

) -> Result { - let cfg_path = cfg_path +impl RpcContextConfig { + pub async fn load>(path: Option

) -> Result { + let cfg_path = path .as_ref() .map(|p| p.as_ref()) .unwrap_or(Path::new(crate::CONFIG_PATH)); - let base = if let Some(f) = File::maybe_open(cfg_path) + if let Some(f) = File::maybe_open(cfg_path) .await .with_ctx(|_| (crate::ErrorKind::Filesystem, cfg_path.display().to_string()))? { - from_toml_async_reader(f).await? + from_toml_async_reader(f).await } else { - RpcContextConfig::default() - }; - let db = PatchDb::open( - base.db - .unwrap_or_else(|| Path::new("/mnt/embassy-os/embassy.db").to_owned()), - ) - .await?; + Ok(RpcContextConfig::default()) + } + } + pub fn datadir(&self) -> &Path { + self.datadir + .as_ref() + .map(|a| a.as_path()) + .unwrap_or_else(|| Path::new("/embassy-data")) + } + pub async fn db(&self) -> Result { + PatchDb::open(self.datadir().join("main").join("embassy.db")) + .await + .map_err(Error::from) + } + pub async fn secret_store(&self) -> Result { let secret_store_url = format!( "sqlite://{}", - base.secret_store - .unwrap_or_else(|| Path::new("/mnt/embassy-os/secrets.db").to_owned()) - .display() + self.datadir().join("main").join("secrets.db").display() ); if !Sqlite::database_exists(&secret_store_url).await? { Sqlite::create_database(&secret_store_url).await?; @@ -79,26 +70,48 @@ impl RpcContext { .run(&secret_store) .await .with_kind(crate::ErrorKind::Database)?; + Ok(secret_store) + } +} + +pub struct RpcContextSeed { + pub bind_rpc: SocketAddr, + pub bind_ws: SocketAddr, + pub datadir: PathBuf, + pub db: PatchDb, + pub secret_store: SqlitePool, + pub docker: Docker, + pub net_controller: NetController, + pub managers: ManagerMap, + pub revision_cache_size: usize, + pub revision_cache: RwLock>>, + pub metrics_cache: RwLock>, + pub shutdown: Sender>, +} + +#[derive(Clone)] +pub struct RpcContext(Arc); +impl RpcContext { + pub async fn init>( + cfg_path: Option

, + shutdown: Sender>, + ) -> Result { + let base = RpcContextConfig::load(cfg_path).await?; + let db = base.db().await?; + let secret_store = base.secret_store().await?; let docker = Docker::connect_with_unix_defaults()?; - let net_controller = Arc::new( - NetController::init( - ([127, 0, 0, 1], 80).into(), - crate::net::tor::os_key(&mut secret_store.acquire().await?).await?, - base.tor_control - .unwrap_or(SocketAddr::from(([127, 0, 0, 1], 9051))), - ) - .await?, - ); - let managers = ManagerMap::init( - &mut db.handle(), - &mut secret_store.acquire().await?, - docker.clone(), - net_controller.clone(), + let net_controller = NetController::init( + ([127, 0, 0, 1], 80).into(), + crate::net::tor::os_key(&mut secret_store.acquire().await?).await?, + base.tor_control + .unwrap_or(SocketAddr::from(([127, 0, 0, 1], 9051))), ) .await?; + let managers = ManagerMap::default(); let seed = Arc::new(RpcContextSeed { bind_rpc: base.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()), bind_ws: base.bind_ws.unwrap_or(([127, 0, 0, 1], 5960).into()), + datadir: base.datadir().to_path_buf(), db, secret_store, docker, @@ -107,9 +120,18 @@ impl RpcContext { revision_cache_size: base.revision_cache_size.unwrap_or(512), revision_cache: RwLock::new(VecDeque::new()), metrics_cache: RwLock::new(None), + shutdown, }); + let res = Self(seed); + res.managers + .init( + &res, + &mut res.db.handle(), + &mut res.secret_store.acquire().await?, + ) + .await?; // TODO: handle apps in bad / transient state - Ok(Self(seed)) + Ok(res) } pub async fn package_registry_url(&self) -> Result { Ok( diff --git a/appmgr/src/daemon/mod.rs b/appmgr/src/daemon/mod.rs deleted file mode 100644 index bf47aee65..000000000 --- a/appmgr/src/daemon/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod tor_health_check; diff --git a/appmgr/src/daemon/tor_health_check.rs b/appmgr/src/daemon/tor_health_check.rs deleted file mode 100644 index 239ac7d41..000000000 --- a/appmgr/src/daemon/tor_health_check.rs +++ /dev/null @@ -1,52 +0,0 @@ -use std::time::Duration; - -use serde_json::json; - -use crate::net::tor::TorController; - -lazy_static::lazy_static! { - static ref PROXY: reqwest::Proxy = reqwest::Proxy::http("socks5h://localhost:9050").expect("PROXY"); - static ref CLIENT: reqwest::Client = reqwest::Client::builder().proxy(PROXY.clone()).build().expect("CLIENT"); -} - -pub async fn tor_health_check_daemon(tor_controller: &TorController) { - loop { - // call out to tor address - let onion = tor_controller.embassyd_onion().await; - let result = CLIENT - .post(format!("http://{}/rpc/v1", onion)) - .body( - json!({ - "jsonrpc": "2.0", - "method": "echo", - "params": { "message": "Follow the orange rabbit" }, - }) - .to_string() - .into_bytes(), - ) - .send() - .await; - match result { - // if success, do nothing - Ok(_) => {} - // if failure, disconnect tor control port, and restart tor controller - Err(e) => { - log::error!("Unable to reach self over tor: {}", e); - loop { - match tor_controller.replace().await { - Ok(restarted) => { - if restarted { - log::error!("Tor has been recently restarted, refusing to restart"); - } - break; - } - Err(e) => { - log::error!("Unable to restart tor: {}", e); - } - } - } - } - } - tokio::time::sleep(Duration::from_secs(300)).await; - } -} diff --git a/appmgr/src/db/mod.rs b/appmgr/src/db/mod.rs index dca78fbdf..ca9c0b37c 100644 --- a/appmgr/src/db/mod.rs +++ b/appmgr/src/db/mod.rs @@ -3,6 +3,7 @@ pub mod util; use std::future::Future; use std::sync::Arc; +use std::time::Duration; use futures::{FutureExt, SinkExt, StreamExt}; use patch_db::json_ptr::JsonPointer; @@ -85,6 +86,12 @@ async fn ws_handler< _ => (), } } + _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => { + stream + .send(Message::Ping(Vec::new())) + .await + .with_kind(crate::ErrorKind::Network)?; + } } } } diff --git a/appmgr/src/dependencies.rs b/appmgr/src/dependencies.rs index eda2fda6e..110aa140f 100644 --- a/appmgr/src/dependencies.rs +++ b/appmgr/src/dependencies.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; use crate::action::ActionImplementation; use crate::config::Config; +use crate::context::RpcContext; use crate::db::model::CurrentDependencyInfo; use crate::s9pk::manifest::PackageId; use crate::status::health_check::{HealthCheckId, HealthCheckResult, HealthCheckResultVariant}; @@ -130,6 +131,7 @@ pub struct DepInfo { impl DepInfo { pub async fn satisfied( &self, + ctx: &RpcContext, db: &mut Db, dependency_id: &PackageId, dependency_config: Option, // fetch if none @@ -161,7 +163,7 @@ impl DepInfo { cfg } else if let Some(cfg_info) = &manifest.config { cfg_info - .get(dependency_id, &manifest.version, &manifest.volumes) + .get(ctx, dependency_id, &manifest.version, &manifest.volumes) .await? .config .unwrap_or_default() @@ -171,6 +173,7 @@ impl DepInfo { if let Some(cfg_req) = &self.config { if let Err(e) = cfg_req .check( + ctx, dependent_id, dependent_version, dependent_volumes, @@ -218,6 +221,7 @@ pub struct DependencyConfig { impl DependencyConfig { pub async fn check( &self, + ctx: &RpcContext, dependent_id: &PackageId, dependent_version: &Version, dependent_volumes: &Volumes, @@ -226,6 +230,7 @@ impl DependencyConfig { Ok(self .check .sandboxed( + ctx, dependent_id, dependent_version, dependent_volumes, @@ -236,6 +241,7 @@ impl DependencyConfig { } pub async fn auto_configure( &self, + ctx: &RpcContext, dependent_id: &PackageId, dependent_version: &Version, dependent_volumes: &Volumes, @@ -243,6 +249,7 @@ impl DependencyConfig { ) -> Result { self.auto_configure .sandboxed( + ctx, dependent_id, dependent_version, dependent_volumes, diff --git a/appmgr/src/disk/main.rs b/appmgr/src/disk/main.rs new file mode 100644 index 000000000..669590d3c --- /dev/null +++ b/appmgr/src/disk/main.rs @@ -0,0 +1,45 @@ +use tokio::process::Command; + +use crate::util::Invoke; +use crate::Error; + +pub async fn importable() -> Result { + todo!() +} + +pub async fn create(disks: &[&str]) -> Result<(), Error> { + todo!() +} + +pub async fn load(password: &str) -> Result<(), Error> { + todo!() +} + +pub async fn create_pool(disks: &[&str]) -> Result<(), Error> { + Command::new("zpool") + .arg("create") + .arg("embassy-data") + .args(disks) + .invoke(crate::ErrorKind::Zfs) + .await?; + Ok(()) +} + +pub async fn create_fs() -> Result<(), Error> { + todo!() +} + +pub async fn import() -> Result<(), Error> { + Command::new("zpool") + .arg("import") + .arg("-f") + .arg("embassy-data") + .invoke(crate::ErrorKind::Zfs) + .await?; + Ok(()) +} + +pub async fn mount(password: &str) -> Result<(), Error> { + // zfs get -H -ovalue mountpoint embassy-data + todo!() +} diff --git a/appmgr/src/disk/mod.rs b/appmgr/src/disk/mod.rs new file mode 100644 index 000000000..ad7722d56 --- /dev/null +++ b/appmgr/src/disk/mod.rs @@ -0,0 +1,2 @@ +pub mod main; +pub mod util; diff --git a/appmgr/src/volume/disk.rs b/appmgr/src/disk/util.rs similarity index 88% rename from appmgr/src/volume/disk.rs rename to appmgr/src/disk/util.rs index 97e6af3a2..225e81348 100644 --- a/appmgr/src/volume/disk.rs +++ b/appmgr/src/disk/util.rs @@ -9,9 +9,6 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::util::Invoke; use crate::{Error, ResultExt as _}; -pub const ROOT_DISK: &'static str = "/dev/mmcblk0"; -pub const MAIN_DISK: &'static str = "/dev/sda"; - pub struct Disks(IndexMap); #[derive(Clone, Debug, Deserialize, Serialize)] @@ -190,6 +187,49 @@ pub async fn mount_encfs, P1: AsRef>( } } +pub async fn bind, P1: AsRef>( + src: P0, + dst: P1, + read_only: bool, +) -> Result<(), Error> { + log::info!( + "Binding {} to {}", + src.as_ref().display(), + dst.as_ref().display() + ); + let is_mountpoint = tokio::process::Command::new("mountpoint") + .arg(dst.as_ref()) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .status() + .await?; + if is_mountpoint.success() { + unmount(dst.as_ref()).await?; + } + tokio::fs::create_dir_all(&dst).await?; + let mut mount_cmd = tokio::process::Command::new("mount"); + mount_cmd.arg("--bind"); + if read_only { + mount_cmd.arg("-o").arg("ro"); + } + mount_cmd + .arg(src.as_ref()) + .arg(dst.as_ref()) + .invoke(crate::ErrorKind::Filesystem) + .await + .map_err(|e| { + Error::new( + e.source.context(format!( + "Binding {} to {}", + src.as_ref().display(), + dst.as_ref().display(), + )), + e.kind, + ) + })?; + Ok(()) +} + pub async fn unmount>(mount_point: P) -> Result<(), Error> { log::info!("Unmounting {}.", mount_point.as_ref().display()); let umount_output = tokio::process::Command::new("umount") diff --git a/appmgr/src/error.rs b/appmgr/src/error.rs index c7c4be34e..31a6b6d9f 100644 --- a/appmgr/src/error.rs +++ b/appmgr/src/error.rs @@ -51,8 +51,9 @@ pub enum ErrorKind { SoundError = 43, ParseTimestamp = 44, ParseSysInfo = 45, - WifiError = 46, + Wifi = 46, Journald = 47, + Zfs = 48, } impl ErrorKind { pub fn as_str(&self) -> &'static str { @@ -103,8 +104,9 @@ impl ErrorKind { SoundError => "Sound Interface Error", ParseTimestamp => "Timestamp Parsing Error", ParseSysInfo => "System Info Parsing Error", - WifiError => "Wifi Internal Error", + Wifi => "WiFi Internal Error", Journald => "Journald Error", + Zfs => "ZFS Error", } } } diff --git a/appmgr/src/hostname.rs b/appmgr/src/hostname.rs index 1ca923332..8e4b9e1c7 100644 --- a/appmgr/src/hostname.rs +++ b/appmgr/src/hostname.rs @@ -22,7 +22,7 @@ pub async fn set_hostname(hostname: &str) -> Result<(), Error> { } pub async fn get_product_key() -> Result { - let out = tokio::fs::read_to_string("/boot/product_key.txt").await?; + let out = tokio::fs::read_to_string("/boot/embassy-os/product_key.txt").await?; Ok(out.trim().to_owned()) } @@ -34,7 +34,7 @@ pub async fn get_id() -> Result { Ok(hex::encode(&res[0..4])) } -// cat /boot/product_key.txt | shasum -a 256 | head -c 8 | awk '{print "start9-"$1}' | xargs hostnamectl set-hostname +// cat /boot/embassy-os/product_key.txt | shasum -a 256 | head -c 8 | awk '{print "start9-"$1}' | xargs hostnamectl set-hostname pub async fn sync_hostname() -> Result<(), Error> { set_hostname(&format!("start9-{}", get_id().await?)).await?; Ok(()) diff --git a/appmgr/src/install/cleanup.rs b/appmgr/src/install/cleanup.rs index 9303f7c0b..c8166d246 100644 --- a/appmgr/src/install/cleanup.rs +++ b/appmgr/src/install/cleanup.rs @@ -11,6 +11,7 @@ use crate::util::Version; use crate::Error; pub async fn update_dependents<'a, Db: DbHandle, I: IntoIterator>( + ctx: &RpcContext, db: &mut Db, id: &PackageId, deps: I, @@ -37,7 +38,7 @@ pub async fn update_dependents<'a, Db: DbHandle, I: IntoIterator::as_ref(pkg_id).with_extension("s9pk"); @@ -368,7 +372,9 @@ pub async fn install_s9pk( } .with_kind(crate::ErrorKind::Registry)?; if let Some(manifest) = manifest { - let dir = Path::new(PKG_PUBLIC_DIR) + let dir = ctx + .datadir + .join(PKG_PUBLIC_DIR) .join(&manifest.id) .join(manifest.version.as_str()); let icon_path = dir.join(format!("icon.{}", manifest.assets.icon_type())); @@ -416,7 +422,9 @@ pub async fn install_s9pk( } log::info!("Install {}@{}: Fetched Dependency Info", pkg_id, version); - let public_dir_path = Path::new(PKG_PUBLIC_DIR) + let public_dir_path = ctx + .datadir + .join(PKG_PUBLIC_DIR) .join(pkg_id) .join(version.as_str()); tokio::fs::create_dir_all(&public_dir_path).await?; @@ -512,7 +520,7 @@ pub async fn install_s9pk( let mut sql_tx = ctx.secret_store.begin().await?; log::info!("Install {}@{}: Creating volumes", pkg_id, version); - manifest.volumes.install(pkg_id, version).await?; + manifest.volumes.install(ctx, pkg_id, version).await?; log::info!("Install {}@{}: Created volumes", pkg_id, version); log::info!("Install {}@{}: Installing interfaces", pkg_id, version); @@ -522,8 +530,7 @@ pub async fn install_s9pk( log::info!("Install {}@{}: Creating manager", pkg_id, version); ctx.managers .add( - ctx.docker.clone(), - ctx.net_controller.clone(), + ctx.clone(), manifest.clone(), manifest.interfaces.tor_keys(&mut sql_tx, pkg_id).await?, ) @@ -572,8 +579,13 @@ pub async fn install_s9pk( status: Status { configured: manifest.config.is_none(), main: MainStatus::Stopped, - dependency_errors: DependencyErrors::init(&mut tx, &manifest, ¤t_dependencies) - .await?, + dependency_errors: DependencyErrors::init( + ctx, + &mut tx, + &manifest, + ¤t_dependencies, + ) + .await?, }, manifest: manifest.clone(), system_pointers: Vec::new(), @@ -600,6 +612,7 @@ pub async fn install_s9pk( } = prev { update_dependents( + ctx, &mut tx, pkg_id, current_dependents @@ -612,6 +625,7 @@ pub async fn install_s9pk( if let Some(res) = prev_manifest .migrations .to( + ctx, version, pkg_id, &prev_manifest.version, @@ -626,15 +640,21 @@ pub async fn install_s9pk( } if let Some(res) = manifest .migrations - .from(&prev_manifest.version, pkg_id, version, &manifest.volumes) + .from( + ctx, + &prev_manifest.version, + pkg_id, + version, + &manifest.volumes, + ) .await? { configured &= res.configured; } if configured { crate::config::configure( + ctx, &mut tx, - &ctx.docker, pkg_id, None, &None, @@ -646,7 +666,7 @@ pub async fn install_s9pk( todo!("set as running if viable"); } } else { - update_dependents(&mut tx, pkg_id, current_dependents.keys()).await?; + update_dependents(ctx, &mut tx, pkg_id, current_dependents.keys()).await?; } sql_tx.commit().await?; diff --git a/appmgr/src/lib.rs b/appmgr/src/lib.rs index 02c8b86f0..898c51b6b 100644 --- a/appmgr/src/lib.rs +++ b/appmgr/src/lib.rs @@ -22,10 +22,10 @@ pub mod backup; pub mod config; pub mod context; pub mod control; -pub mod daemon; pub mod db; pub mod dependencies; pub mod developer; +pub mod disk; pub mod error; pub mod hostname; pub mod id; @@ -38,6 +38,7 @@ pub mod migration; pub mod net; pub mod registry; pub mod s9pk; +pub mod shutdown; pub mod sound; pub mod ssh; pub mod status; diff --git a/appmgr/src/manager/mod.rs b/appmgr/src/manager/mod.rs index da2341679..fcb1cfbca 100644 --- a/appmgr/src/manager/mod.rs +++ b/appmgr/src/manager/mod.rs @@ -6,30 +6,29 @@ use std::task::Poll; use anyhow::anyhow; use bollard::container::StopContainerOptions; -use bollard::Docker; use patch_db::DbHandle; use sqlx::{Executor, Sqlite}; use tokio::sync::watch::error::RecvError; use tokio::sync::watch::{channel, Receiver, Sender}; use tokio::sync::RwLock; -use tokio::task::JoinHandle; use torut::onion::TorSecretKeyV3; use crate::action::docker::DockerAction; +use crate::context::RpcContext; use crate::net::interface::InterfaceId; -use crate::net::NetController; use crate::s9pk::manifest::{Manifest, PackageId}; -use crate::util::{Container, Version}; +use crate::util::{Container, NonDetachingJoinHandle, Version}; use crate::Error; +#[derive(Default)] pub struct ManagerMap(RwLock>>); impl ManagerMap { pub async fn init( + &self, + ctx: &RpcContext, db: &mut Db, secrets: &mut Ex, - docker: Docker, - net_ctl: Arc, - ) -> Result + ) -> Result<(), Error> where for<'a> &'a mut Ex: Executor<'a, Database = Sqlite>, { @@ -53,16 +52,16 @@ impl ManagerMap { let tor_keys = man.interfaces.tor_keys(secrets, &package).await?; res.insert( (package, man.version.clone()), - Arc::new(Manager::create(docker.clone(), net_ctl.clone(), man, tor_keys).await?), + Arc::new(Manager::create(ctx.clone(), man, tor_keys).await?), ); } - Ok(ManagerMap(RwLock::new(res))) + *self.0.write().await = res; + Ok(()) } pub async fn add( &self, - docker: Docker, - net_ctl: Arc, + ctx: RpcContext, manifest: Manifest, tor_keys: HashMap, ) -> Result<(), Error> { @@ -75,7 +74,7 @@ impl ManagerMap { } lock.insert( id, - Arc::new(Manager::create(docker, net_ctl, manifest, tor_keys).await?), + Arc::new(Manager::create(ctx, manifest, tor_keys).await?), ); Ok(()) } @@ -88,6 +87,20 @@ impl ManagerMap { } } + pub async fn empty(&self) -> Result<(), Error> { + let res = futures::future::join_all( + std::mem::take(&mut *self.0.write().await) + .into_iter() + .map(|(_, man)| async move { man.exit().await }), + ) + .await; + res.into_iter().fold(Ok(()), |res, x| match (res, x) { + (Ok(()), x) => x, + (Err(e), Ok(())) => Err(e), + (Err(e1), Err(e2)) => Err(Error::new(anyhow!("{}, {}", e1.source, e2.source), e1.kind)), + }) + } + pub async fn get(&self, id: &(PackageId, Version)) -> Option> { self.0.read().await.get(id).cloned() } @@ -95,7 +108,7 @@ impl ManagerMap { pub struct Manager { shared: Arc, - thread: Container>, + thread: Container>, } pub enum Status { @@ -105,10 +118,9 @@ pub enum Status { } struct ManagerSharedState { + ctx: RpcContext, status: AtomicUsize, on_stop: Sender, - docker: Docker, - net_ctl: Arc, manifest: Manifest, container_name: String, tor_keys: HashMap, @@ -128,6 +140,7 @@ async fn run_main(state: &Arc) -> Result( + &rt_state.ctx, &rt_state.manifest.id, &rt_state.manifest.version, None, @@ -140,6 +153,7 @@ async fn run_main(state: &Arc) -> Result) -> Result) -> Result) -> Result, + ctx: RpcContext, manifest: Manifest, tor_keys: HashMap, ) -> Result { let (on_stop, mut recv) = channel(OnStop::Sleep); let shared = Arc::new(ManagerSharedState { + ctx, status: AtomicUsize::new(Status::Stopped as usize), on_stop, - docker, - net_ctl, container_name: DockerAction::container_name(&manifest.id, None), manifest, tor_keys, @@ -301,7 +315,7 @@ impl Manager { }); Ok(Manager { shared, - thread: Container::new(Some(thread)), + thread: Container::new(Some(thread.into())), }) } @@ -326,6 +340,7 @@ impl Manager { } match self .shared + .ctx .docker .stop_container( &self.shared.container_name, @@ -360,6 +375,7 @@ impl Manager { pub async fn pause(&self) -> Result<(), Error> { self.shared + .ctx .docker .pause_container(&self.shared.container_name) .await?; @@ -371,6 +387,7 @@ impl Manager { pub async fn resume(&self) -> Result<(), Error> { self.shared + .ctx .docker .unpause_container(&self.shared.container_name) .await?; @@ -385,6 +402,7 @@ impl Manager { let _ = self.shared.on_stop.send(OnStop::Exit); match self .shared + .ctx .docker .stop_container( &self.shared.container_name, diff --git a/appmgr/src/migration.rs b/appmgr/src/migration.rs index 16f1b0dfa..b8e3daa26 100644 --- a/appmgr/src/migration.rs +++ b/appmgr/src/migration.rs @@ -5,6 +5,7 @@ use patch_db::HasModel; use serde::{Deserialize, Serialize}; use crate::action::ActionImplementation; +use crate::context::RpcContext; use crate::s9pk::manifest::PackageId; use crate::util::Version; use crate::volume::Volumes; @@ -19,6 +20,7 @@ pub struct Migrations { impl Migrations { pub async fn from( &self, + ctx: &RpcContext, version: &Version, pkg_id: &PackageId, pkg_version: &Version, @@ -33,6 +35,7 @@ impl Migrations { Some( migration .execute( + ctx, pkg_id, pkg_version, Some("Migration"), // Migrations cannot be executed concurrently @@ -52,6 +55,7 @@ impl Migrations { } pub async fn to( &self, + ctx: &RpcContext, version: &Version, pkg_id: &PackageId, pkg_version: &Version, @@ -64,6 +68,7 @@ impl Migrations { Some( migration .execute( + ctx, pkg_id, pkg_version, Some("Migration"), diff --git a/appmgr/src/net/tor.rs b/appmgr/src/net/tor.rs index 8f5eac5ae..0b0789065 100644 --- a/appmgr/src/net/tor.rs +++ b/appmgr/src/net/tor.rs @@ -6,7 +6,9 @@ use anyhow::anyhow; use clap::ArgMatches; use futures::future::BoxFuture; use futures::FutureExt; +use reqwest::Client; use rpc_toolkit::command; +use serde_json::json; use sqlx::{Executor, Sqlite}; use tokio::net::TcpStream; use tokio::sync::Mutex; @@ -346,6 +348,44 @@ impl TorControllerInner { } } +pub async fn tor_health_check(client: &Client, tor_controller: &TorController) { + let onion = tor_controller.embassyd_onion().await; + let result = client + .post(format!("http://{}/rpc/v1", onion)) + .body( + json!({ + "jsonrpc": "2.0", + "method": "echo", + "params": { "message": "Follow the orange rabbit" }, + }) + .to_string() + .into_bytes(), + ) + .send() + .await; + match result { + // if success, do nothing + Ok(_) => {} + // if failure, disconnect tor control port, and restart tor controller + Err(e) => { + log::error!("Unable to reach self over tor: {}", e); + loop { + match tor_controller.replace().await { + Ok(restarted) => { + if restarted { + log::error!("Tor has been recently restarted, refusing to restart"); + } + break; + } + Err(e) => { + log::error!("Unable to restart tor: {}", e); + } + } + } + } + } +} + #[tokio::test] async fn test() { let mut conn = torut::control::UnauthenticatedConn::new( diff --git a/appmgr/src/net/wifi.rs b/appmgr/src/net/wifi.rs index ab5d90cb5..be8574760 100644 --- a/appmgr/src/net/wifi.rs +++ b/appmgr/src/net/wifi.rs @@ -25,13 +25,13 @@ pub async fn add( if !ssid.is_ascii() { return Err(Error::new( anyhow::anyhow!("SSID may not have special characters"), - ErrorKind::WifiError, + ErrorKind::Wifi, )); } if !password.is_ascii() { return Err(Error::new( anyhow::anyhow!("WiFi Password may not have special characters"), - ErrorKind::WifiError, + ErrorKind::Wifi, )); } async fn add_procedure<'a>( @@ -75,7 +75,7 @@ pub async fn connect(#[arg] ssid: String) -> Result<(), Error> { if !ssid.is_ascii() { return Err(Error::new( anyhow::anyhow!("SSID may not have special characters"), - ErrorKind::WifiError, + ErrorKind::Wifi, )); } async fn connect_procedure<'a>(wpa_supplicant: WpaCli<'a>, ssid: &String) -> Result<(), Error> { @@ -113,7 +113,7 @@ pub async fn delete(#[arg] ssid: String) -> Result<(), Error> { if !ssid.is_ascii() { return Err(Error::new( anyhow::anyhow!("SSID may not have special characters"), - ErrorKind::WifiError, + ErrorKind::Wifi, )); } let wpa_supplicant = WpaCli { interface: "wlan0" }; @@ -127,7 +127,7 @@ pub async fn delete(#[arg] ssid: String) -> Result<(), Error> { if interface_connected("eth0").await? { wpa_supplicant.remove_network(&ssid).await?; } else { - return Err(Error::new(anyhow::anyhow!("Forbidden: Deleting this Network would make your Embassy Unreachable. Either connect to ethernet or connect to a different WiFi network to remedy this."), ErrorKind::WifiError)); + return Err(Error::new(anyhow::anyhow!("Forbidden: Deleting this Network would make your Embassy Unreachable. Either connect to ethernet or connect to a different WiFi network to remedy this."), ErrorKind::Wifi)); } } } @@ -277,7 +277,7 @@ impl<'a> WpaCli<'a> { .arg("-i") .arg(self.interface) .arg("add_network") - .invoke(ErrorKind::WifiError) + .invoke(ErrorKind::Wifi) .await?; let s = std::str::from_utf8(&r)?; Ok(NetworkId(s.trim().to_owned())) @@ -289,7 +289,7 @@ impl<'a> WpaCli<'a> { .arg("set_network") .arg(&id.0) .arg(format!("{}", attr)) - .invoke(ErrorKind::WifiError) + .invoke(ErrorKind::Wifi) .await?; Ok(()) } @@ -300,7 +300,7 @@ impl<'a> WpaCli<'a> { .arg("set") .arg("country") .arg(country_code) - .invoke(ErrorKind::WifiError) + .invoke(ErrorKind::Wifi) .await?; Ok(()) } @@ -310,7 +310,7 @@ impl<'a> WpaCli<'a> { .arg(self.interface) .arg("get") .arg("country") - .invoke(ErrorKind::WifiError) + .invoke(ErrorKind::Wifi) .await?; Ok(CountryCode::for_alpha2(&String::from_utf8(r)?).unwrap()) } @@ -320,7 +320,7 @@ impl<'a> WpaCli<'a> { .arg(self.interface) .arg("enable_network") .arg(&id.0) - .invoke(ErrorKind::WifiError) + .invoke(ErrorKind::Wifi) .await?; Ok(()) } @@ -329,7 +329,7 @@ impl<'a> WpaCli<'a> { .arg("-i") .arg(self.interface) .arg("save_config") - .invoke(ErrorKind::WifiError) + .invoke(ErrorKind::Wifi) .await?; Ok(()) } @@ -339,7 +339,7 @@ impl<'a> WpaCli<'a> { .arg(self.interface) .arg("remove_network") .arg(&id.0) - .invoke(ErrorKind::WifiError) + .invoke(ErrorKind::Wifi) .await?; Ok(()) } @@ -348,7 +348,7 @@ impl<'a> WpaCli<'a> { .arg("-i") .arg(self.interface) .arg("reconfigure") - .invoke(ErrorKind::WifiError) + .invoke(ErrorKind::Wifi) .await?; Ok(()) } @@ -357,7 +357,7 @@ impl<'a> WpaCli<'a> { .arg("-i") .arg(self.interface) .arg("list_networks") - .invoke(ErrorKind::WifiError) + .invoke(ErrorKind::Wifi) .await?; Ok(String::from_utf8(r)? .lines() @@ -376,7 +376,7 @@ impl<'a> WpaCli<'a> { .arg(self.interface) .arg("select_network") .arg(&id.0) - .invoke(ErrorKind::WifiError) + .invoke(ErrorKind::Wifi) .await?; Ok(()) } @@ -387,7 +387,7 @@ impl<'a> WpaCli<'a> { .arg("new_password") .arg(&id.0) .arg(pass) - .invoke(ErrorKind::WifiError) + .invoke(ErrorKind::Wifi) .await?; Ok(()) } @@ -396,12 +396,12 @@ impl<'a> WpaCli<'a> { .arg("-i") .arg(self.interface) .arg("signal_poll") - .invoke(ErrorKind::WifiError) + .invoke(ErrorKind::Wifi) .await?; let e = || { Error::new( anyhow::anyhow!("Invalid output from wpa_cli signal_poll"), - ErrorKind::WifiError, + ErrorKind::Wifi, ) }; let output = String::from_utf8(r)?; @@ -423,7 +423,7 @@ impl<'a> WpaCli<'a> { match m_id { None => Err(Error::new( anyhow::anyhow!("SSID Not Found"), - ErrorKind::WifiError, + ErrorKind::Wifi, )), Some(x) => { self.select_network_low(&x).await?; @@ -456,7 +456,7 @@ impl<'a> WpaCli<'a> { let r = Command::new("iwgetid") .arg(self.interface) .arg("--raw") - .invoke(ErrorKind::WifiError) + .invoke(ErrorKind::Wifi) .await?; let output = String::from_utf8(r)?; if output.trim().is_empty() { @@ -501,7 +501,7 @@ impl<'a> WpaCli<'a> { pub async fn interface_connected(interface: &str) -> Result { let out = Command::new("ifconfig") .arg(interface) - .invoke(ErrorKind::WifiError) + .invoke(ErrorKind::Wifi) .await?; let v = std::str::from_utf8(&out)? .lines() @@ -513,7 +513,7 @@ pub async fn interface_connected(interface: &str) -> Result { pub fn country_code_parse(code: &str, _matches: &ArgMatches<'_>) -> Result { CountryCode::for_alpha2(code).or(Err(Error::new( anyhow::anyhow!("Invalid Country Code: {}", code), - ErrorKind::WifiError, + ErrorKind::Wifi, ))) } diff --git a/appmgr/src/shutdown.rs b/appmgr/src/shutdown.rs new file mode 100644 index 000000000..3392edf9a --- /dev/null +++ b/appmgr/src/shutdown.rs @@ -0,0 +1,4 @@ +#[derive(Debug, Clone)] +pub struct Shutdown { + restart: bool, +} diff --git a/appmgr/src/ssh.rs b/appmgr/src/ssh.rs index 04319ce4e..dee7c30e4 100644 --- a/appmgr/src/ssh.rs +++ b/appmgr/src/ssh.rs @@ -153,7 +153,8 @@ pub async fn list( .collect()) } -pub async fn sync_keys_from_db(pool: &Pool, dest: &Path) -> Result<(), Error> { +pub async fn sync_keys_from_db>(pool: &Pool, dest: P) -> Result<(), Error> { + let dest = dest.as_ref(); let keys = sqlx::query!("SELECT openssh_pubkey FROM ssh_keys") .fetch_all(pool) .await?; diff --git a/appmgr/src/status/health_check.rs b/appmgr/src/status/health_check.rs index c9345fbd2..243d07cbd 100644 --- a/appmgr/src/status/health_check.rs +++ b/appmgr/src/status/health_check.rs @@ -5,6 +5,7 @@ use indexmap::IndexMap; use serde::{Deserialize, Deserializer, Serialize}; use crate::action::ActionImplementation; +use crate::context::RpcContext; use crate::id::Id; use crate::s9pk::manifest::PackageId; use crate::util::Version; @@ -46,6 +47,7 @@ pub struct HealthChecks(pub IndexMap); impl HealthChecks { pub async fn check_all( &self, + ctx: &RpcContext, started: &DateTime, pkg_id: &PackageId, pkg_version: &Version, @@ -55,7 +57,7 @@ impl HealthChecks { Ok::<_, Error>(( id.clone(), check - .check(id, started, pkg_id, pkg_version, volumes) + .check(ctx, id, started, pkg_id, pkg_version, volumes) .await?, )) })) @@ -73,6 +75,7 @@ pub struct HealthCheck { impl HealthCheck { pub async fn check( &self, + ctx: &RpcContext, id: &HealthCheckId, started: &DateTime, pkg_id: &PackageId, @@ -82,6 +85,7 @@ impl HealthCheck { let res = self .implementation .execute( + ctx, pkg_id, pkg_version, Some(&format!("{}Health", id)), diff --git a/appmgr/src/status/mod.rs b/appmgr/src/status/mod.rs index 3d3a08694..30578b334 100644 --- a/appmgr/src/status/mod.rs +++ b/appmgr/src/status/mod.rs @@ -109,13 +109,14 @@ pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> { } drop(db); async fn main_status( + ctx: RpcContext, status_model: StatusModel, manifest: Arc>, mut db: Db, ) -> Result { let mut status = status_model.get_mut(&mut db).await?; - status.main.check(&*manifest).await?; + status.main.check(&ctx, &*manifest).await?; let res = status.main.clone(); @@ -125,25 +126,30 @@ pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> { } let (status_sender, mut statuses_recv) = tokio::sync::mpsc::channel(status_manifest.len() + 1); let mut statuses = HashMap::with_capacity(status_manifest.len()); - futures::stream::iter(status_manifest.into_iter().zip(pkg_ids.clone())) - .for_each_concurrent(None, move |((status, manifest), id)| { - let status_sender = status_sender.clone(); - async move { - match tokio::spawn(main_status(status, manifest, ctx.db.handle())) - .await - .unwrap() - { - Err(e) => { - log::error!("Error running main health check for {}: {}", id, e); - log::debug!("{:?}", e); - } - Ok(status) => { - status_sender.send((id, status)).await.expect("unreachable"); - } + futures::stream::iter( + status_manifest + .into_iter() + .zip(pkg_ids.clone()) + .zip(std::iter::repeat(ctx)), + ) + .for_each_concurrent(None, move |(((status, manifest), id), ctx)| { + let status_sender = status_sender.clone(); + async move { + match tokio::spawn(main_status(ctx.clone(), status, manifest, ctx.db.handle())) + .await + .unwrap() + { + Err(e) => { + log::error!("Error running main health check for {}: {}", id, e); + log::debug!("{:?}", e); + } + Ok(status) => { + status_sender.send((id, status)).await.expect("unreachable"); } } - }) - .await; + } + }) + .await; while let Some((id, status)) = statuses_recv.recv().await { statuses.insert(id, status); } @@ -246,12 +252,18 @@ impl MainStatus { } Ok(()) } - pub async fn check(&mut self, manifest: &Manifest) -> Result<(), Error> { + pub async fn check(&mut self, ctx: &RpcContext, manifest: &Manifest) -> Result<(), Error> { match self { MainStatus::Running { started, health } => { *health = manifest .health_checks - .check_all(started, &manifest.id, &manifest.version, &manifest.volumes) + .check_all( + ctx, + started, + &manifest.id, + &manifest.version, + &manifest.volumes, + ) .await?; for (check, res) in health { if matches!( @@ -313,6 +325,7 @@ impl HasModel for DependencyErrors { } impl DependencyErrors { pub async fn init( + ctx: &RpcContext, db: &mut Db, manifest: &Manifest, current_dependencies: &IndexMap, @@ -330,6 +343,7 @@ impl DependencyErrors { ) })? .satisfied( + ctx, db, dep_id, None, diff --git a/appmgr/src/util/mod.rs b/appmgr/src/util/mod.rs index 10532e16d..65f209146 100644 --- a/appmgr/src/util/mod.rs +++ b/appmgr/src/util/mod.rs @@ -16,7 +16,9 @@ use serde_json::Value; use tokio::fs::File; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio::sync::RwLock; +use tokio::task::{JoinError, JoinHandle}; +use crate::shutdown::Shutdown; use crate::{Error, ResultExt as _}; #[derive(Clone, Copy, Debug)] @@ -339,17 +341,22 @@ pub fn serialize_display_opt( Option::::serialize(&t.as_ref().map(|t| t.to_string()), serializer) } -pub async fn daemon Fut, Fut: Future + Send + 'static>( - f: F, +pub async fn daemon Fut, Fut: Future + Send + 'static>( + mut f: F, cooldown: std::time::Duration, -) -> Result { - loop { + mut shutdown: tokio::sync::broadcast::Receiver>, +) -> Result<(), anyhow::Error> { + while matches!( + shutdown.try_recv(), + Err(tokio::sync::broadcast::error::TryRecvError::Empty) + ) { match tokio::spawn(f()).await { Err(e) if e.is_panic() => return Err(anyhow!("daemon panicked!")), _ => (), } tokio::time::sleep(cooldown).await } + Ok(()) } pub trait SOption {} @@ -999,3 +1006,28 @@ where Deserialize::deserialize_in_place(deserializer, &mut place.data) } } + +#[pin_project::pin_project(PinnedDrop)] +pub struct NonDetachingJoinHandle(#[pin] JoinHandle); +impl From> for NonDetachingJoinHandle { + fn from(t: JoinHandle) -> Self { + NonDetachingJoinHandle(t) + } +} +#[pin_project::pinned_drop] +impl PinnedDrop for NonDetachingJoinHandle { + fn drop(self: std::pin::Pin<&mut Self>) { + let this = self.project(); + this.0.into_ref().get_ref().abort() + } +} +impl Future for NonDetachingJoinHandle { + type Output = Result; + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let this = self.project(); + this.0.poll(cx) + } +} diff --git a/appmgr/src/volume/mod.rs b/appmgr/src/volume.rs similarity index 83% rename from appmgr/src/volume/mod.rs rename to appmgr/src/volume.rs index ddd24d970..06f934057 100644 --- a/appmgr/src/volume/mod.rs +++ b/appmgr/src/volume.rs @@ -6,15 +6,14 @@ use indexmap::IndexMap; use patch_db::{HasModel, Map, MapModel}; use serde::{Deserialize, Deserializer, Serialize}; +use crate::context::RpcContext; use crate::id::{Id, IdUnchecked}; use crate::net::interface::InterfaceId; use crate::s9pk::manifest::PackageId; use crate::util::Version; use crate::Error; -pub mod disk; - -pub const PKG_VOLUME_DIR: &'static str = "/mnt/embassy-os/volumes/package-data"; +pub const PKG_VOLUME_DIR: &'static str = "main/volumes/package-data"; pub const BACKUP_DIR: &'static str = "/mnt/embassy-os-backups/EmbassyBackups"; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] @@ -76,21 +75,27 @@ impl> Serialize for VolumeId { #[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct Volumes(IndexMap); impl Volumes { - pub async fn install(&self, pkg_id: &PackageId, version: &Version) -> Result<(), Error> { + pub async fn install( + &self, + ctx: &RpcContext, + pkg_id: &PackageId, + version: &Version, + ) -> Result<(), Error> { for (volume_id, volume) in &self.0 { - volume.install(pkg_id, version, volume_id).await?; // TODO: concurrent? + volume.install(ctx, pkg_id, version, volume_id).await?; // TODO: concurrent? } Ok(()) } pub fn get_path_for( &self, + ctx: &RpcContext, pkg_id: &PackageId, version: &Version, volume_id: &VolumeId, ) -> Option { self.0 .get(volume_id) - .map(|volume| volume.path_for(pkg_id, version, volume_id)) + .map(|volume| volume.path_for(ctx, pkg_id, version, volume_id)) } pub fn to_readonly(&self) -> Self { Volumes( @@ -154,25 +159,36 @@ pub enum Volume { impl Volume { pub async fn install( &self, + ctx: &RpcContext, pkg_id: &PackageId, version: &Version, volume_id: &VolumeId, ) -> Result<(), Error> { match self { Volume::Data { .. } => { - tokio::fs::create_dir_all(self.path_for(pkg_id, version, volume_id)).await?; + tokio::fs::create_dir_all(self.path_for(ctx, pkg_id, version, volume_id)).await?; } _ => (), } Ok(()) } - pub fn path_for(&self, pkg_id: &PackageId, version: &Version, volume_id: &VolumeId) -> PathBuf { + pub fn path_for( + &self, + ctx: &RpcContext, + pkg_id: &PackageId, + version: &Version, + volume_id: &VolumeId, + ) -> PathBuf { match self { - Volume::Data { .. } => Path::new(PKG_VOLUME_DIR) + Volume::Data { .. } => ctx + .datadir + .join(PKG_VOLUME_DIR) .join(pkg_id) .join("volumes") .join(volume_id), - Volume::Assets {} => Path::new(PKG_VOLUME_DIR) + Volume::Assets {} => ctx + .datadir + .join(PKG_VOLUME_DIR) .join(pkg_id) .join("assets") .join(version.as_str()) @@ -182,7 +198,9 @@ impl Volume { volume_id, path, .. - } => dbg!(Path::new(PKG_VOLUME_DIR) + } => dbg!(ctx + .datadir + .join(PKG_VOLUME_DIR) .join(package_id) .join("volumes") .join(volume_id) @@ -191,7 +209,9 @@ impl Volume { } else { path.as_ref() })), - Volume::Certificate { interface_id } => Path::new(PKG_VOLUME_DIR) + Volume::Certificate { interface_id } => ctx + .datadir + .join(PKG_VOLUME_DIR) .join(pkg_id) .join("certificates") .join(interface_id),