installs working

This commit is contained in:
Aiden McClelland
2021-07-26 10:22:29 -06:00
committed by Aiden McClelland
parent 48c33be14c
commit b807323fa4
10 changed files with 257 additions and 101 deletions

View File

@@ -10,20 +10,26 @@
"nullable": []
}
},
"3e57a0e52b69f33e9411c13b03a5d82c5856d63f0375eb4c23b255a09c54f8b1": {
"query": "SELECT key FROM tor WHERE package = ? AND interface = ?",
"8595651866e7db772260bd79e19d55b7271fd795b82a99821c935a9237c1aa16": {
"query": "SELECT interface, key FROM tor WHERE package = ?",
"describe": {
"columns": [
{
"name": "key",
"name": "interface",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "key",
"ordinal": 1,
"type_info": "Blob"
}
],
"parameters": {
"Right": 2
"Right": 1
},
"nullable": [
false,
false
]
}

View File

@@ -54,7 +54,10 @@ impl DockerAction {
.arg("--name")
.arg(Self::container_name(pkg_id, name));
}
cmd.args(self.docker_args(pkg_id, pkg_version, volumes, allow_inject));
cmd.args(
self.docker_args(pkg_id, pkg_version, volumes, allow_inject)
.await,
);
let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) {
cmd.stdin(std::process::Stdio::piped());
Some(format.to_vec(input)?)
@@ -63,7 +66,7 @@ impl DockerAction {
};
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
let mut handle = cmd.spawn().with_kind(crate::ErrorKind::Docker)?;
let mut handle = dbg!(cmd).spawn().with_kind(crate::ErrorKind::Docker)?;
if let (Some(input), Some(stdin)) = (&input_buf, &mut handle.stdin) {
use tokio::io::AsyncWriteExt;
stdin
@@ -111,7 +114,10 @@ impl DockerAction {
) -> Result<Result<O, (i32, String)>, Error> {
let mut cmd = tokio::process::Command::new("docker");
cmd.arg("run").arg("--rm").arg("--network=none");
cmd.args(self.docker_args(pkg_id, pkg_version, &Volumes::default(), false));
cmd.args(
self.docker_args(pkg_id, pkg_version, &Volumes::default(), false)
.await,
);
let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) {
cmd.stdin(std::process::Stdio::piped());
Some(format.to_vec(input)?)
@@ -178,7 +184,7 @@ impl DockerAction {
}
}
fn docker_args<'a>(
async fn docker_args<'a>(
&'a self,
pkg_id: &PackageId,
pkg_version: &Version,
@@ -197,9 +203,8 @@ impl DockerAction {
} else {
continue;
};
let src = volume.path_for(pkg_id, pkg_version, volume_id);
if !src.exists() {
// TODO: this is a blocking call, make this async?
let src = dbg!(volume.path_for(pkg_id, pkg_version, volume_id));
if tokio::fs::metadata(&src).await.is_err() {
continue;
}
res.push(OsStr::new("--mount").into());

78
appmgr/src/control.rs Normal file
View File

@@ -0,0 +1,78 @@
use anyhow::anyhow;
use chrono::{DateTime, Utc};
use indexmap::IndexMap;
use rpc_toolkit::command;
use crate::context::EitherContext;
use crate::s9pk::manifest::PackageId;
use crate::status::MainStatus;
use crate::util::display_none;
use crate::{Error, ResultExt};
#[command(display(display_none))]
pub async fn start(#[context] ctx: EitherContext, #[arg] id: PackageId) -> Result<(), Error> {
let rpc_ctx = ctx.as_rpc().unwrap();
let mut db = rpc_ctx.db.handle();
let installed = crate::db::DatabaseModel::new()
.package_data()
.idx_model(&id)
.and_then(|pkg| pkg.installed())
.expect(&mut db)
.await
.with_ctx(|_| {
(
crate::ErrorKind::NotFound,
format!("{} is not installed", id),
)
})?;
let version = installed
.clone()
.manifest()
.version()
.get(&mut db, true)
.await?
.to_owned();
let mut status = installed.status().main().get_mut(&mut db).await?;
*status = MainStatus::Running {
started: Utc::now(),
health: IndexMap::new(),
};
status
.synchronize(
&*rpc_ctx.managers.get(&(id, version)).await.ok_or_else(|| {
Error::new(anyhow!("Manager not found"), crate::ErrorKind::Docker)
})?,
)
.await?;
status.save(&mut db).await?;
Ok(())
}
#[command(display(display_none))]
pub async fn stop(#[context] ctx: EitherContext, #[arg] id: PackageId) -> Result<(), Error> {
let rpc_ctx = ctx.as_rpc().unwrap();
let mut db = rpc_ctx.db.handle();
let mut status = crate::db::DatabaseModel::new()
.package_data()
.idx_model(&id)
.and_then(|pkg| pkg.installed())
.expect(&mut db)
.await
.with_ctx(|_| {
(
crate::ErrorKind::NotFound,
format!("{} is not installed", id),
)
})?
.status()
.main()
.get_mut(&mut db)
.await?;
*status = MainStatus::Stopping;
status.save(&mut db).await?;
Ok(())
}

View File

@@ -43,7 +43,7 @@ pub const PKG_PUBLIC_DIR: &'static str = "/mnt/embassy-os/public/package-data";
#[command(display(display_none))]
pub async fn install(#[context] ctx: EitherContext, #[arg] id: String) -> Result<(), Error> {
let rpc_ctx = ctx.as_rpc().unwrap();
let rpc_ctx = ctx.to_rpc().unwrap();
let (pkg_id, version_str) = if let Some(split) = id.split_once("@") {
split
} else {
@@ -63,7 +63,13 @@ pub async fn install(#[context] ctx: EitherContext, #[arg] id: String) -> Result
)
.with_kind(crate::ErrorKind::Registry)?;
let man = man_res.json().await.with_kind(crate::ErrorKind::Registry)?;
download_install_s9pk(rpc_ctx, &man, s9pk).await
tokio::spawn(async move {
if let Err(e) = download_install_s9pk(&rpc_ctx, &man, s9pk).await {
log::error!("Install of {}@{} Failed: {}", man.id, man.version, e);
}
});
Ok(())
}
pub async fn download_install_s9pk(
@@ -126,7 +132,6 @@ pub async fn download_install_s9pk(
progress: &Arc<InstallProgress>,
model: OptionModel<InstallProgress>,
ctx: &RpcContext,
db: &mut PatchDbHandle,
) -> Option<S9pkReader<InstallProgressTracker<File>>> {
fn warn_ok<T, E: Display>(
pkg_id: &PackageId,
@@ -170,7 +175,6 @@ pub async fn download_install_s9pk(
&progress,
progress_model.clone(),
&ctx,
&mut ctx.db.handle(),
)
.await;
@@ -363,14 +367,25 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
let mut tx = handle.begin().await?;
let mut sql_tx = ctx.secret_store.begin().await?;
log::info!("Install {}@{}: Creating manager", pkg_id, version);
todo!("create manager");
log::info!("Install {}@{}: Created manager", pkg_id, version);
log::info!("Install {}@{}: Creating volumes", pkg_id, version);
manifest.volumes.install(pkg_id, version).await?;
log::info!("Install {}@{}: Created volumes", pkg_id, version);
log::info!("Install {}@{}: Installing interfaces", pkg_id, version);
let interface_addresses = manifest.interfaces.install(&mut sql_tx, pkg_id).await?;
log::info!("Install {}@{}: Installed interfaces", pkg_id, version);
log::info!("Install {}@{}: Creating manager", pkg_id, version);
ctx.managers
.add(
ctx.docker.clone(),
ctx.net_controller.clone(),
manifest.clone(),
manifest.interfaces.tor_keys(&mut sql_tx, pkg_id).await?,
)
.await?;
log::info!("Install {}@{}: Created manager", pkg_id, version);
let static_files = StaticFiles::local(pkg_id, version, manifest.assets.icon_type());
let current_dependencies = manifest
.dependencies
@@ -472,6 +487,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
}
}
sql_tx.commit().await?;
tx.commit(None).await?;
log::info!("Install {}@{}: Complete", pkg_id, version);

View File

@@ -20,6 +20,7 @@ pub mod action;
pub mod backup;
pub mod config;
pub mod context;
pub mod control;
pub mod db;
pub mod dependencies;
pub mod developer;
@@ -50,19 +51,23 @@ pub fn echo(#[context] _ctx: EitherContext, #[arg] message: String) -> Result<St
}
#[command(subcommands(
config::config,
version::git_info,
echo,
s9pk::pack,
s9pk::verify,
developer::init,
install::install,
inspect::inspect,
package,
))]
pub fn main_api(#[context] ctx: EitherContext) -> Result<EitherContext, RpcError> {
Ok(ctx)
}
#[command(subcommands(install::install, config::config, control::start, control::stop))]
pub fn package(#[context] ctx: EitherContext) -> Result<EitherContext, RpcError> {
Ok(ctx)
}
#[command(subcommands(
version::git_info,
s9pk::pack,

View File

@@ -1,6 +1,5 @@
use std::collections::HashMap;
use std::future::Future;
use std::net::Ipv4Addr;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::task::Poll;
@@ -8,7 +7,7 @@ use std::task::Poll;
use anyhow::anyhow;
use bollard::container::StopContainerOptions;
use bollard::Docker;
use patch_db::{DbHandle, PatchDbHandle};
use patch_db::DbHandle;
use sqlx::{Executor, Sqlite};
use tokio::sync::watch::error::RecvError;
use tokio::sync::watch::{channel, Receiver, Sender};
@@ -34,16 +33,30 @@ impl ManagerMap {
where
for<'a> &'a mut Ex: Executor<'a, Database = Sqlite>,
{
// let mut res = ManagerMap(RwLock::new(HashMap::new()));
// for package in crate::db::DatabaseModel::new()
// .package_data()
// .keys(db, true)
// .await?
// {
// let man = crate::db::DatabaseModel::new().package_data().idx_model(&package).
// res.add(docker.clone(), net_ctl.clone(), manifest, tor_keys)
// }
todo!()
let mut res = HashMap::new();
for package in crate::db::DatabaseModel::new()
.package_data()
.keys(db, true)
.await?
{
let man = if let Some(installed) = crate::db::DatabaseModel::new()
.package_data()
.idx_model(&package)
.and_then(|pkg| pkg.installed())
.check(db)
.await?
{
installed.manifest().get(db, true).await?.to_owned()
} else {
continue;
};
let tor_keys = man.interfaces.tor_keys(secrets, &package).await?;
res.insert(
(package, man.version.clone()),
Arc::new(Manager::create(docker.clone(), net_ctl.clone(), man, tor_keys).await?),
);
}
Ok(ManagerMap(RwLock::new(res)))
}
pub async fn add(
@@ -124,7 +137,7 @@ async fn run_main(state: &Arc<ManagerSharedState>) -> Result<Result<(), (i32, St
)
.await
});
let mut ip;
let ip;
loop {
match state
.docker
@@ -132,14 +145,18 @@ async fn run_main(state: &Arc<ManagerSharedState>) -> Result<Result<(), (i32, St
.await
{
Ok(res) => {
ip = res
if let Some(ip_addr) = res
.network_settings
.and_then(|ns| ns.networks)
.and_then(|mut n| n.remove("start9"))
.and_then(|es| es.ip_address)
.filter(|ip| !ip.is_empty())
.map(|ip| ip.parse())
.transpose()?;
break;
.transpose()?
{
ip = ip_addr;
break;
}
}
Err(bollard::errors::Error::DockerResponseNotFoundError { .. }) => (),
Err(e) => Err(e)?,
@@ -158,12 +175,6 @@ async fn run_main(state: &Arc<ManagerSharedState>) -> Result<Result<(), (i32, St
_ => (),
}
}
let ip = ip.ok_or_else(|| {
Error::new(
anyhow!("inspect did not return ip"),
crate::ErrorKind::Docker,
)
})?;
state
.net_ctl
@@ -283,7 +294,7 @@ impl Manager {
todo!("application crashed")
}
Err(e) => {
todo!("failed to start application")
todo!("failed to start application: {}", e)
}
}
}

View File

@@ -1,6 +1,10 @@
use std::collections::HashMap;
use std::path::Path;
use anyhow::anyhow;
use futures::TryStreamExt;
use indexmap::IndexMap;
use itertools::Either;
use serde::{Deserialize, Deserializer, Serialize};
use sqlx::{Executor, Sqlite};
use torut::onion::TorSecretKeyV3;
@@ -53,10 +57,47 @@ impl Interfaces {
}
Ok(interface_addresses)
}
pub async fn tor_keys<Ex>(
&self,
secrets: &mut Ex,
package_id: &PackageId,
) -> Result<HashMap<InterfaceId, TorSecretKeyV3>, Error>
where
for<'a> &'a mut Ex: Executor<'a, Database = Sqlite>,
{
Ok(sqlx::query!(
"SELECT interface, key FROM tor WHERE package = ?",
**package_id
)
.fetch_many(secrets)
.map_err(Error::from)
.try_filter_map(|qr| async move {
Ok(if let Either::Right(r) = qr {
let mut buf = [0; 64];
buf.clone_from_slice(r.key.get(0..64).ok_or_else(|| {
Error::new(
anyhow!("Invalid Tor Key Length"),
crate::ErrorKind::Database,
)
})?);
Some((InterfaceId::from(Id::try_from(r.interface)?), buf.into()))
} else {
None
})
})
.try_collect()
.await?)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize)]
pub struct InterfaceId<S: AsRef<str> = String>(Id<S>);
impl<S: AsRef<str>> From<Id<S>> for InterfaceId<S> {
fn from(id: Id<S>) -> Self {
Self(id)
}
}
impl<S: AsRef<str>> std::fmt::Display for InterfaceId<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", &self.0)

View File

@@ -25,6 +25,11 @@ pub const SYSTEM_PACKAGE_ID: PackageId<&'static str> = PackageId(SYSTEM_ID);
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct PackageId<S: AsRef<str> = String>(Id<S>);
impl<'a> PackageId<&'a str> {
pub fn owned(&self) -> PackageId {
PackageId(self.0.owned())
}
}
impl FromStr for PackageId {
type Err = InvalidId;
fn from_str(s: &str) -> Result<Self, Self::Err> {

View File

@@ -29,58 +29,17 @@ pub mod health_check;
// Assume docker for now
pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> {
let mut db = ctx.db.handle();
let mut pkg_ids = crate::db::DatabaseModel::new()
.package_data()
.keys(&mut db, true)
.keys(&mut ctx.db.handle(), true)
.await?;
let mut container_names = Vec::with_capacity(pkg_ids.len());
for id in pkg_ids.clone().into_iter() {
if let Some(version) = &*crate::db::DatabaseModel::new()
.package_data()
.idx_model(&id)
.expect(&mut db)
.await?
.installed()
.map(|i| i.manifest().version())
.get(&mut db, true)
.await?
{
container_names.push(DockerAction::container_name(id.as_ref(), None));
} else {
pkg_ids.remove(&id);
}
}
let mut filters = HashMap::new();
filters.insert("name".to_owned(), container_names);
let info = ctx
.docker
.list_containers(Some(ListContainersOptions {
all: true,
size: false,
limit: None,
filters,
}))
.await?;
for summary in info {
let id = if let Some(id) = summary.names.iter().flatten().find_map(|s| {
// DockerAction::uncontainer_name(s.as_str()).and_then(|(id, _)| pkg_ids.take(&id))
todo!()
}) {
id
} else {
continue;
};
async fn status<Db: DbHandle>(
docker: &Docker,
id: &PackageId,
db: &mut Db,
summary: &ContainerSummaryInner,
) -> Result<(), Error> {
for id in pkg_ids {
async fn status(ctx: &RpcContext, id: PackageId) -> Result<(), Error> {
let mut db = ctx.db.handle();
let pkg_data = crate::db::DatabaseModel::new()
.package_data()
.idx_model(id)
.check(db)
.idx_model(&id)
.check(&mut db)
.await?
.ok_or_else(|| {
Error::new(
@@ -88,31 +47,40 @@ pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> {
crate::ErrorKind::Database,
)
})?;
let (mut status, manifest) =
if let Some(installed) = pkg_data.installed().check(db).await? {
let (mut status, manager) =
if let Some(installed) = pkg_data.installed().check(&mut db).await? {
(
installed.clone().status().get_mut(db).await?,
installed.manifest().get(db, true).await?,
installed.clone().status().get_mut(&mut db).await?,
ctx.managers
.get(&(
id,
installed
.manifest()
.version()
.get(&mut db, true)
.await?
.to_owned(),
))
.await
.ok_or_else(|| {
Error::new(anyhow!("No Manager"), crate::ErrorKind::Docker)
})?,
)
} else {
return Ok(());
};
let res = status.main.synchronize(todo!()).await?;
let res = status.main.synchronize(&manager).await?;
status.save(db).await?;
status.save(&mut db).await?;
Ok(res)
}
if let Err(e) = status(&ctx.docker, &id, &mut db, &summary).await {
if let Err(e) = status(ctx, id.clone()).await {
log::error!("Error syncronizing status of {}: {}", id, e);
}
}
for id in pkg_ids {
log::warn!("No container for {}", id);
}
Ok(())
}

View File

@@ -10,6 +10,7 @@ use crate::id::{Id, IdUnchecked};
use crate::net::interface::InterfaceId;
use crate::s9pk::manifest::PackageId;
use crate::util::Version;
use crate::Error;
pub mod disk;
@@ -69,6 +70,12 @@ where
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct Volumes(IndexMap<VolumeId, Volume>);
impl Volumes {
pub async fn install(&self, pkg_id: &PackageId, version: &Version) -> Result<(), Error> {
for (volume_id, volume) in &self.0 {
volume.install(pkg_id, version, volume_id).await?; // TODO: concurrent?
}
Ok(())
}
pub fn get_path_for(
&self,
pkg_id: &PackageId,
@@ -139,6 +146,20 @@ pub enum Volume {
Backup { readonly: bool },
}
impl Volume {
pub async fn install(
&self,
pkg_id: &PackageId,
version: &Version,
volume_id: &VolumeId,
) -> Result<(), Error> {
match self {
Volume::Data { .. } => {
tokio::fs::create_dir_all(self.path_for(pkg_id, version, volume_id)).await?;
}
_ => (),
}
Ok(())
}
pub fn path_for(&self, pkg_id: &PackageId, version: &Version, volume_id: &VolumeId) -> PathBuf {
match self {
Volume::Data { .. } => Path::new(PKG_VOLUME_DIR)