From b807323fa4d77a92e1c39291898cb72e1c41f649 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Mon, 26 Jul 2021 10:22:29 -0600 Subject: [PATCH] installs working --- appmgr/sqlx-data.json | 14 +++++-- appmgr/src/action/docker.rs | 19 +++++---- appmgr/src/control.rs | 78 ++++++++++++++++++++++++++++++++++ appmgr/src/install/mod.rs | 30 +++++++++---- appmgr/src/lib.rs | 9 +++- appmgr/src/manager/mod.rs | 57 +++++++++++++++---------- appmgr/src/net/interface.rs | 41 ++++++++++++++++++ appmgr/src/s9pk/manifest.rs | 5 +++ appmgr/src/status/mod.rs | 84 ++++++++++++------------------------- appmgr/src/volume/mod.rs | 21 ++++++++++ 10 files changed, 257 insertions(+), 101 deletions(-) create mode 100644 appmgr/src/control.rs diff --git a/appmgr/sqlx-data.json b/appmgr/sqlx-data.json index 5dbb5bf18..ec08040b3 100644 --- a/appmgr/sqlx-data.json +++ b/appmgr/sqlx-data.json @@ -10,20 +10,26 @@ "nullable": [] } }, - "3e57a0e52b69f33e9411c13b03a5d82c5856d63f0375eb4c23b255a09c54f8b1": { - "query": "SELECT key FROM tor WHERE package = ? AND interface = ?", + "8595651866e7db772260bd79e19d55b7271fd795b82a99821c935a9237c1aa16": { + "query": "SELECT interface, key FROM tor WHERE package = ?", "describe": { "columns": [ { - "name": "key", + "name": "interface", "ordinal": 0, + "type_info": "Text" + }, + { + "name": "key", + "ordinal": 1, "type_info": "Blob" } ], "parameters": { - "Right": 2 + "Right": 1 }, "nullable": [ + false, false ] } diff --git a/appmgr/src/action/docker.rs b/appmgr/src/action/docker.rs index 6ec80f9ce..a3a1d06ad 100644 --- a/appmgr/src/action/docker.rs +++ b/appmgr/src/action/docker.rs @@ -54,7 +54,10 @@ impl DockerAction { .arg("--name") .arg(Self::container_name(pkg_id, name)); } - cmd.args(self.docker_args(pkg_id, pkg_version, volumes, allow_inject)); + cmd.args( + self.docker_args(pkg_id, pkg_version, volumes, allow_inject) + .await, + ); let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) { cmd.stdin(std::process::Stdio::piped()); Some(format.to_vec(input)?) @@ -63,7 +66,7 @@ impl DockerAction { }; cmd.stdout(std::process::Stdio::piped()); cmd.stderr(std::process::Stdio::piped()); - let mut handle = cmd.spawn().with_kind(crate::ErrorKind::Docker)?; + let mut handle = dbg!(cmd).spawn().with_kind(crate::ErrorKind::Docker)?; if let (Some(input), Some(stdin)) = (&input_buf, &mut handle.stdin) { use tokio::io::AsyncWriteExt; stdin @@ -111,7 +114,10 @@ impl DockerAction { ) -> Result, Error> { let mut cmd = tokio::process::Command::new("docker"); cmd.arg("run").arg("--rm").arg("--network=none"); - cmd.args(self.docker_args(pkg_id, pkg_version, &Volumes::default(), false)); + cmd.args( + self.docker_args(pkg_id, pkg_version, &Volumes::default(), false) + .await, + ); let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) { cmd.stdin(std::process::Stdio::piped()); Some(format.to_vec(input)?) @@ -178,7 +184,7 @@ impl DockerAction { } } - fn docker_args<'a>( + async fn docker_args<'a>( &'a self, pkg_id: &PackageId, pkg_version: &Version, @@ -197,9 +203,8 @@ impl DockerAction { } else { continue; }; - let src = volume.path_for(pkg_id, pkg_version, volume_id); - if !src.exists() { - // TODO: this is a blocking call, make this async? + let src = dbg!(volume.path_for(pkg_id, pkg_version, volume_id)); + if tokio::fs::metadata(&src).await.is_err() { continue; } res.push(OsStr::new("--mount").into()); diff --git a/appmgr/src/control.rs b/appmgr/src/control.rs new file mode 100644 index 000000000..3279e1429 --- /dev/null +++ b/appmgr/src/control.rs @@ -0,0 +1,78 @@ +use anyhow::anyhow; +use chrono::{DateTime, Utc}; +use indexmap::IndexMap; +use rpc_toolkit::command; + +use crate::context::EitherContext; +use crate::s9pk::manifest::PackageId; +use crate::status::MainStatus; +use crate::util::display_none; +use crate::{Error, ResultExt}; + +#[command(display(display_none))] +pub async fn start(#[context] ctx: EitherContext, #[arg] id: PackageId) -> Result<(), Error> { + let rpc_ctx = ctx.as_rpc().unwrap(); + let mut db = rpc_ctx.db.handle(); + let installed = crate::db::DatabaseModel::new() + .package_data() + .idx_model(&id) + .and_then(|pkg| pkg.installed()) + .expect(&mut db) + .await + .with_ctx(|_| { + ( + crate::ErrorKind::NotFound, + format!("{} is not installed", id), + ) + })?; + let version = installed + .clone() + .manifest() + .version() + .get(&mut db, true) + .await? + .to_owned(); + let mut status = installed.status().main().get_mut(&mut db).await?; + + *status = MainStatus::Running { + started: Utc::now(), + health: IndexMap::new(), + }; + status + .synchronize( + &*rpc_ctx.managers.get(&(id, version)).await.ok_or_else(|| { + Error::new(anyhow!("Manager not found"), crate::ErrorKind::Docker) + })?, + ) + .await?; + status.save(&mut db).await?; + + Ok(()) +} + +#[command(display(display_none))] +pub async fn stop(#[context] ctx: EitherContext, #[arg] id: PackageId) -> Result<(), Error> { + let rpc_ctx = ctx.as_rpc().unwrap(); + let mut db = rpc_ctx.db.handle(); + let mut status = crate::db::DatabaseModel::new() + .package_data() + .idx_model(&id) + .and_then(|pkg| pkg.installed()) + .expect(&mut db) + .await + .with_ctx(|_| { + ( + crate::ErrorKind::NotFound, + format!("{} is not installed", id), + ) + })? + .status() + .main() + .get_mut(&mut db) + .await?; + + *status = MainStatus::Stopping; + status.save(&mut db).await?; + + Ok(()) +} diff --git a/appmgr/src/install/mod.rs b/appmgr/src/install/mod.rs index eb1e8e12a..a430dcb27 100644 --- a/appmgr/src/install/mod.rs +++ b/appmgr/src/install/mod.rs @@ -43,7 +43,7 @@ pub const PKG_PUBLIC_DIR: &'static str = "/mnt/embassy-os/public/package-data"; #[command(display(display_none))] pub async fn install(#[context] ctx: EitherContext, #[arg] id: String) -> Result<(), Error> { - let rpc_ctx = ctx.as_rpc().unwrap(); + let rpc_ctx = ctx.to_rpc().unwrap(); let (pkg_id, version_str) = if let Some(split) = id.split_once("@") { split } else { @@ -63,7 +63,13 @@ pub async fn install(#[context] ctx: EitherContext, #[arg] id: String) -> Result ) .with_kind(crate::ErrorKind::Registry)?; let man = man_res.json().await.with_kind(crate::ErrorKind::Registry)?; - download_install_s9pk(rpc_ctx, &man, s9pk).await + tokio::spawn(async move { + if let Err(e) = download_install_s9pk(&rpc_ctx, &man, s9pk).await { + log::error!("Install of {}@{} Failed: {}", man.id, man.version, e); + } + }); + + Ok(()) } pub async fn download_install_s9pk( @@ -126,7 +132,6 @@ pub async fn download_install_s9pk( progress: &Arc, model: OptionModel, ctx: &RpcContext, - db: &mut PatchDbHandle, ) -> Option>> { fn warn_ok( pkg_id: &PackageId, @@ -170,7 +175,6 @@ pub async fn download_install_s9pk( &progress, progress_model.clone(), &ctx, - &mut ctx.db.handle(), ) .await; @@ -363,14 +367,25 @@ pub async fn install_s9pk( let mut tx = handle.begin().await?; let mut sql_tx = ctx.secret_store.begin().await?; - log::info!("Install {}@{}: Creating manager", pkg_id, version); - todo!("create manager"); - log::info!("Install {}@{}: Created manager", pkg_id, version); + log::info!("Install {}@{}: Creating volumes", pkg_id, version); + manifest.volumes.install(pkg_id, version).await?; + log::info!("Install {}@{}: Created volumes", pkg_id, version); log::info!("Install {}@{}: Installing interfaces", pkg_id, version); let interface_addresses = manifest.interfaces.install(&mut sql_tx, pkg_id).await?; log::info!("Install {}@{}: Installed interfaces", pkg_id, version); + log::info!("Install {}@{}: Creating manager", pkg_id, version); + ctx.managers + .add( + ctx.docker.clone(), + ctx.net_controller.clone(), + manifest.clone(), + manifest.interfaces.tor_keys(&mut sql_tx, pkg_id).await?, + ) + .await?; + log::info!("Install {}@{}: Created manager", pkg_id, version); + let static_files = StaticFiles::local(pkg_id, version, manifest.assets.icon_type()); let current_dependencies = manifest .dependencies @@ -472,6 +487,7 @@ pub async fn install_s9pk( } } + sql_tx.commit().await?; tx.commit(None).await?; log::info!("Install {}@{}: Complete", pkg_id, version); diff --git a/appmgr/src/lib.rs b/appmgr/src/lib.rs index 8ca2e733d..c46d4b1ef 100644 --- a/appmgr/src/lib.rs +++ b/appmgr/src/lib.rs @@ -20,6 +20,7 @@ pub mod action; pub mod backup; pub mod config; pub mod context; +pub mod control; pub mod db; pub mod dependencies; pub mod developer; @@ -50,19 +51,23 @@ pub fn echo(#[context] _ctx: EitherContext, #[arg] message: String) -> Result Result { Ok(ctx) } +#[command(subcommands(install::install, config::config, control::start, control::stop))] +pub fn package(#[context] ctx: EitherContext) -> Result { + Ok(ctx) +} + #[command(subcommands( version::git_info, s9pk::pack, diff --git a/appmgr/src/manager/mod.rs b/appmgr/src/manager/mod.rs index ce4177eff..89b8502dc 100644 --- a/appmgr/src/manager/mod.rs +++ b/appmgr/src/manager/mod.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; use std::future::Future; -use std::net::Ipv4Addr; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::task::Poll; @@ -8,7 +7,7 @@ use std::task::Poll; use anyhow::anyhow; use bollard::container::StopContainerOptions; use bollard::Docker; -use patch_db::{DbHandle, PatchDbHandle}; +use patch_db::DbHandle; use sqlx::{Executor, Sqlite}; use tokio::sync::watch::error::RecvError; use tokio::sync::watch::{channel, Receiver, Sender}; @@ -34,16 +33,30 @@ impl ManagerMap { where for<'a> &'a mut Ex: Executor<'a, Database = Sqlite>, { - // let mut res = ManagerMap(RwLock::new(HashMap::new())); - // for package in crate::db::DatabaseModel::new() - // .package_data() - // .keys(db, true) - // .await? - // { - // let man = crate::db::DatabaseModel::new().package_data().idx_model(&package). - // res.add(docker.clone(), net_ctl.clone(), manifest, tor_keys) - // } - todo!() + let mut res = HashMap::new(); + for package in crate::db::DatabaseModel::new() + .package_data() + .keys(db, true) + .await? + { + let man = if let Some(installed) = crate::db::DatabaseModel::new() + .package_data() + .idx_model(&package) + .and_then(|pkg| pkg.installed()) + .check(db) + .await? + { + installed.manifest().get(db, true).await?.to_owned() + } else { + continue; + }; + let tor_keys = man.interfaces.tor_keys(secrets, &package).await?; + res.insert( + (package, man.version.clone()), + Arc::new(Manager::create(docker.clone(), net_ctl.clone(), man, tor_keys).await?), + ); + } + Ok(ManagerMap(RwLock::new(res))) } pub async fn add( @@ -124,7 +137,7 @@ async fn run_main(state: &Arc) -> Result) -> Result { - ip = res + if let Some(ip_addr) = res .network_settings .and_then(|ns| ns.networks) .and_then(|mut n| n.remove("start9")) .and_then(|es| es.ip_address) + .filter(|ip| !ip.is_empty()) .map(|ip| ip.parse()) - .transpose()?; - break; + .transpose()? + { + ip = ip_addr; + break; + } } Err(bollard::errors::Error::DockerResponseNotFoundError { .. }) => (), Err(e) => Err(e)?, @@ -158,12 +175,6 @@ async fn run_main(state: &Arc) -> Result (), } } - let ip = ip.ok_or_else(|| { - Error::new( - anyhow!("inspect did not return ip"), - crate::ErrorKind::Docker, - ) - })?; state .net_ctl @@ -283,7 +294,7 @@ impl Manager { todo!("application crashed") } Err(e) => { - todo!("failed to start application") + todo!("failed to start application: {}", e) } } } diff --git a/appmgr/src/net/interface.rs b/appmgr/src/net/interface.rs index 557b5be50..63327b918 100644 --- a/appmgr/src/net/interface.rs +++ b/appmgr/src/net/interface.rs @@ -1,6 +1,10 @@ +use std::collections::HashMap; use std::path::Path; +use anyhow::anyhow; +use futures::TryStreamExt; use indexmap::IndexMap; +use itertools::Either; use serde::{Deserialize, Deserializer, Serialize}; use sqlx::{Executor, Sqlite}; use torut::onion::TorSecretKeyV3; @@ -53,10 +57,47 @@ impl Interfaces { } Ok(interface_addresses) } + + pub async fn tor_keys( + &self, + secrets: &mut Ex, + package_id: &PackageId, + ) -> Result, Error> + where + for<'a> &'a mut Ex: Executor<'a, Database = Sqlite>, + { + Ok(sqlx::query!( + "SELECT interface, key FROM tor WHERE package = ?", + **package_id + ) + .fetch_many(secrets) + .map_err(Error::from) + .try_filter_map(|qr| async move { + Ok(if let Either::Right(r) = qr { + let mut buf = [0; 64]; + buf.clone_from_slice(r.key.get(0..64).ok_or_else(|| { + Error::new( + anyhow!("Invalid Tor Key Length"), + crate::ErrorKind::Database, + ) + })?); + Some((InterfaceId::from(Id::try_from(r.interface)?), buf.into())) + } else { + None + }) + }) + .try_collect() + .await?) + } } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize)] pub struct InterfaceId = String>(Id); +impl> From> for InterfaceId { + fn from(id: Id) -> Self { + Self(id) + } +} impl> std::fmt::Display for InterfaceId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", &self.0) diff --git a/appmgr/src/s9pk/manifest.rs b/appmgr/src/s9pk/manifest.rs index 24f92f797..2f0d04b5e 100644 --- a/appmgr/src/s9pk/manifest.rs +++ b/appmgr/src/s9pk/manifest.rs @@ -25,6 +25,11 @@ pub const SYSTEM_PACKAGE_ID: PackageId<&'static str> = PackageId(SYSTEM_ID); #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct PackageId = String>(Id); +impl<'a> PackageId<&'a str> { + pub fn owned(&self) -> PackageId { + PackageId(self.0.owned()) + } +} impl FromStr for PackageId { type Err = InvalidId; fn from_str(s: &str) -> Result { diff --git a/appmgr/src/status/mod.rs b/appmgr/src/status/mod.rs index b39b05879..af15b130c 100644 --- a/appmgr/src/status/mod.rs +++ b/appmgr/src/status/mod.rs @@ -29,58 +29,17 @@ pub mod health_check; // Assume docker for now pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> { - let mut db = ctx.db.handle(); let mut pkg_ids = crate::db::DatabaseModel::new() .package_data() - .keys(&mut db, true) + .keys(&mut ctx.db.handle(), true) .await?; - let mut container_names = Vec::with_capacity(pkg_ids.len()); - for id in pkg_ids.clone().into_iter() { - if let Some(version) = &*crate::db::DatabaseModel::new() - .package_data() - .idx_model(&id) - .expect(&mut db) - .await? - .installed() - .map(|i| i.manifest().version()) - .get(&mut db, true) - .await? - { - container_names.push(DockerAction::container_name(id.as_ref(), None)); - } else { - pkg_ids.remove(&id); - } - } - let mut filters = HashMap::new(); - filters.insert("name".to_owned(), container_names); - let info = ctx - .docker - .list_containers(Some(ListContainersOptions { - all: true, - size: false, - limit: None, - filters, - })) - .await?; - for summary in info { - let id = if let Some(id) = summary.names.iter().flatten().find_map(|s| { - // DockerAction::uncontainer_name(s.as_str()).and_then(|(id, _)| pkg_ids.take(&id)) - todo!() - }) { - id - } else { - continue; - }; - async fn status( - docker: &Docker, - id: &PackageId, - db: &mut Db, - summary: &ContainerSummaryInner, - ) -> Result<(), Error> { + for id in pkg_ids { + async fn status(ctx: &RpcContext, id: PackageId) -> Result<(), Error> { + let mut db = ctx.db.handle(); let pkg_data = crate::db::DatabaseModel::new() .package_data() - .idx_model(id) - .check(db) + .idx_model(&id) + .check(&mut db) .await? .ok_or_else(|| { Error::new( @@ -88,31 +47,40 @@ pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> { crate::ErrorKind::Database, ) })?; - let (mut status, manifest) = - if let Some(installed) = pkg_data.installed().check(db).await? { + let (mut status, manager) = + if let Some(installed) = pkg_data.installed().check(&mut db).await? { ( - installed.clone().status().get_mut(db).await?, - installed.manifest().get(db, true).await?, + installed.clone().status().get_mut(&mut db).await?, + ctx.managers + .get(&( + id, + installed + .manifest() + .version() + .get(&mut db, true) + .await? + .to_owned(), + )) + .await + .ok_or_else(|| { + Error::new(anyhow!("No Manager"), crate::ErrorKind::Docker) + })?, ) } else { return Ok(()); }; - let res = status.main.synchronize(todo!()).await?; + let res = status.main.synchronize(&manager).await?; - status.save(db).await?; + status.save(&mut db).await?; Ok(res) } - if let Err(e) = status(&ctx.docker, &id, &mut db, &summary).await { + if let Err(e) = status(ctx, id.clone()).await { log::error!("Error syncronizing status of {}: {}", id, e); } } - for id in pkg_ids { - log::warn!("No container for {}", id); - } - Ok(()) } diff --git a/appmgr/src/volume/mod.rs b/appmgr/src/volume/mod.rs index e309b60af..373ac17b0 100644 --- a/appmgr/src/volume/mod.rs +++ b/appmgr/src/volume/mod.rs @@ -10,6 +10,7 @@ use crate::id::{Id, IdUnchecked}; use crate::net::interface::InterfaceId; use crate::s9pk::manifest::PackageId; use crate::util::Version; +use crate::Error; pub mod disk; @@ -69,6 +70,12 @@ where #[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct Volumes(IndexMap); impl Volumes { + pub async fn install(&self, pkg_id: &PackageId, version: &Version) -> Result<(), Error> { + for (volume_id, volume) in &self.0 { + volume.install(pkg_id, version, volume_id).await?; // TODO: concurrent? + } + Ok(()) + } pub fn get_path_for( &self, pkg_id: &PackageId, @@ -139,6 +146,20 @@ pub enum Volume { Backup { readonly: bool }, } impl Volume { + pub async fn install( + &self, + pkg_id: &PackageId, + version: &Version, + volume_id: &VolumeId, + ) -> Result<(), Error> { + match self { + Volume::Data { .. } => { + tokio::fs::create_dir_all(self.path_for(pkg_id, version, volume_id)).await?; + } + _ => (), + } + Ok(()) + } pub fn path_for(&self, pkg_id: &PackageId, version: &Version, volume_id: &VolumeId) -> PathBuf { match self { Volume::Data { .. } => Path::new(PKG_VOLUME_DIR)