configure datadir on context

This commit is contained in:
Aiden McClelland
2021-09-03 14:03:08 -06:00
committed by Aiden McClelland
parent ee381ebce7
commit 3877e43b84
33 changed files with 664 additions and 276 deletions

View File

@@ -11,5 +11,5 @@ fi
alias 'rust-musl-builder'='docker run --rm -it -v "$HOME"/.cargo/registry:/root/.cargo/registry -v "$(pwd)":/home/rust/src start9/rust-musl-cross:x86_64-musl'
cd ../..
rust-musl-builder sh -c "(cd embassy-os/appmgr && cargo +beta build --release --target=x86_64-unknown-linux-musl --no-default-features) --bin=embassy-sdk"
rust-musl-builder sh -c "(cd embassy-os/appmgr && cargo +beta build --release --target=x86_64-unknown-linux-musl --no-default-features)"
cd embassy-os/appmgr

View File

@@ -7,6 +7,7 @@ use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::context::RpcContext;
use crate::id::{Id, ImageId};
use crate::s9pk::manifest::{PackageId, SYSTEM_PACKAGE_ID};
use crate::util::{IoFormat, Version};
@@ -36,6 +37,7 @@ pub struct DockerAction {
impl DockerAction {
pub async fn execute<I: Serialize, O: for<'de> Deserialize<'de>>(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
pkg_version: &Version,
name: Option<&str>,
@@ -55,7 +57,7 @@ impl DockerAction {
.arg(Self::container_name(pkg_id, name));
}
cmd.args(
self.docker_args(pkg_id, pkg_version, volumes, allow_inject)
self.docker_args(ctx, pkg_id, pkg_version, volumes, allow_inject)
.await,
);
let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) {
@@ -108,6 +110,7 @@ impl DockerAction {
pub async fn sandboxed<I: Serialize, O: for<'de> Deserialize<'de>>(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
pkg_version: &Version,
volumes: &Volumes,
@@ -116,7 +119,7 @@ impl DockerAction {
let mut cmd = tokio::process::Command::new("docker");
cmd.arg("run").arg("--rm").arg("--network=none");
cmd.args(
self.docker_args(pkg_id, pkg_version, &volumes.to_readonly(), false)
self.docker_args(ctx, pkg_id, pkg_version, &volumes.to_readonly(), false)
.await,
);
let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) {
@@ -187,6 +190,7 @@ impl DockerAction {
async fn docker_args<'a>(
&'a self,
ctx: &RpcContext,
pkg_id: &PackageId,
pkg_version: &Version,
volumes: &Volumes,
@@ -204,7 +208,7 @@ impl DockerAction {
} else {
continue;
};
let src = dbg!(volume.path_for(pkg_id, pkg_version, volume_id));
let src = dbg!(volume.path_for(ctx, pkg_id, pkg_version, volume_id));
if tokio::fs::metadata(&src).await.is_err() {
continue;
}

View File

@@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
use self::docker::DockerAction;
use crate::config::{Config, ConfigSpec};
use crate::context::RpcContext;
use crate::id::Id;
use crate::s9pk::manifest::PackageId;
use crate::util::{ValuePrimative, Version};
@@ -90,6 +91,7 @@ pub struct Action {
impl Action {
pub async fn execute(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
pkg_version: &Version,
volumes: &Volumes,
@@ -100,6 +102,7 @@ impl Action {
.with_kind(crate::ErrorKind::ConfigSpecViolation)?;
self.implementation
.execute(
ctx,
pkg_id,
pkg_version,
Some(&format!("{}Action", self.name)),
@@ -121,6 +124,7 @@ pub enum ActionImplementation {
impl ActionImplementation {
pub async fn execute<I: Serialize, O: for<'de> Deserialize<'de>>(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
pkg_version: &Version,
name: Option<&str>,
@@ -131,13 +135,14 @@ impl ActionImplementation {
match self {
ActionImplementation::Docker(action) => {
action
.execute(pkg_id, pkg_version, name, volumes, input, allow_inject)
.execute(ctx, pkg_id, pkg_version, name, volumes, input, allow_inject)
.await
}
}
}
pub async fn sandboxed<I: Serialize, O: for<'de> Deserialize<'de>>(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
pkg_version: &Version,
volumes: &Volumes,
@@ -145,7 +150,9 @@ impl ActionImplementation {
) -> Result<Result<O, (i32, String)>, Error> {
match self {
ActionImplementation::Docker(action) => {
action.sandboxed(pkg_id, pkg_version, volumes, input).await
action
.sandboxed(ctx, pkg_id, pkg_version, volumes, input)
.await
}
}
}

View File

@@ -3,6 +3,7 @@ use patch_db::HasModel;
use serde::{Deserialize, Serialize};
use crate::action::ActionImplementation;
use crate::context::RpcContext;
use crate::s9pk::manifest::PackageId;
use crate::util::Version;
use crate::volume::{Volume, VolumeId, Volumes};
@@ -16,6 +17,7 @@ pub struct BackupActions {
impl BackupActions {
pub async fn create(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
pkg_version: &Version,
volumes: &Volumes,
@@ -24,6 +26,7 @@ impl BackupActions {
volumes.insert(VolumeId::Backup, Volume::Backup { readonly: false });
self.create
.execute(
ctx,
pkg_id,
pkg_version,
Some("CreateBackup"),
@@ -39,6 +42,7 @@ impl BackupActions {
pub async fn restore(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
pkg_version: &Version,
volumes: &Volumes,
@@ -47,6 +51,7 @@ impl BackupActions {
volumes.insert(VolumeId::Backup, Volume::Backup { readonly: true });
self.restore
.execute(
ctx,
pkg_id,
pkg_version,
Some("RestoreBackup"),

View File

@@ -1,28 +1,60 @@
use embassy::context::rpc::RpcContextConfig;
use embassy::Error;
async fn inner_main() -> Result<(), Error> {
// host setup flow if needed
async fn init(cfg: &RpcContextConfig) -> Result<(), Error> {
// mount disk
embassy::volume::disk::mount("/dev/sda", "/mnt/embassy-os-crypt").await?; // TODO: by uuid
// unlock disk
// mount /var/log/journal
// sync ssh
// sync wifi
// hostname-set
if embassy::disk::main::importable().await? {
embassy::disk::main::load("password").await?;
} else {
// embassy::setup::host_setup().await?;
}
embassy::disk::util::bind("/embassy-data/main/logs", "/var/log/journal", false).await?;
embassy::ssh::sync_keys_from_db(todo!(), "/root/.ssh/authorized_keys").await?;
todo!("sync wifi");
embassy::hostname::sync_hostname().await?;
Ok(())
}
async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
if let Err(e) = init(&RpcContextConfig::load(cfg_path).await?).await {
embassy::sound::BEETHOVEN.play().await?;
log::error!("{}", e.source);
log::debug!("{}", e.source)
} else {
embassy::sound::MARIO_COIN.play().await?
}
Ok(())
}
fn main() {
let matches = clap::App::new("embassyd")
.arg(
clap::Arg::with_name("config")
.short("c")
.long("config")
.takes_value(true),
)
.arg(
clap::Arg::with_name("verbosity")
.short("v")
.multiple(true)
.takes_value(false),
)
.get_matches();
simple_logging::log_to_stderr(match matches.occurrences_of("verbosity") {
0 => log::LevelFilter::Off,
1 => log::LevelFilter::Error,
2 => log::LevelFilter::Warn,
3 => log::LevelFilter::Info,
4 => log::LevelFilter::Debug,
_ => log::LevelFilter::Trace,
});
let cfg_path = matches.value_of("config");
let rt = tokio::runtime::Runtime::new().expect("failed to initialize runtime");
match rt.block_on(inner_main()) {
match rt.block_on(inner_main(cfg_path)) {
Ok(_) => (),
Err(e) => {
drop(rt);

View File

@@ -1,4 +1,3 @@
use std::path::Path;
use std::time::Duration;
use anyhow::anyhow;
@@ -8,14 +7,16 @@ use embassy::db::subscribe;
use embassy::hostname::{get_hostname, get_id};
use embassy::middleware::auth::auth;
use embassy::middleware::cors::cors;
use embassy::net::tor::os_key;
use embassy::net::tor::{os_key, tor_health_check};
use embassy::status::{check_all, synchronize_all};
use embassy::util::daemon;
use embassy::{Error, ErrorKind, ResultExt};
use futures::TryFutureExt;
use futures::{FutureExt, TryFutureExt};
use patch_db::json_ptr::JsonPointer;
use reqwest::{Client, Proxy};
use rpc_toolkit::hyper::{Body, Response, Server, StatusCode};
use rpc_toolkit::rpc_server;
use rpc_toolkit::{rpc_server, Context};
use tokio::signal::unix::signal;
fn status_fn(_: i32) -> StatusCode {
StatusCode::OK
@@ -30,7 +31,37 @@ fn err_to_500(e: Error) -> Response<Body> {
}
async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
let rpc_ctx = RpcContext::init(cfg_path).await?;
let (shutdown, _) = tokio::sync::broadcast::channel(1);
let rpc_ctx = RpcContext::init(cfg_path, shutdown).await?;
let sig_handler_ctx = rpc_ctx.clone();
let sig_handler = tokio::spawn(async move {
use tokio::signal::unix::SignalKind;
futures::future::select_all(
[
SignalKind::interrupt(),
SignalKind::quit(),
SignalKind::terminate(),
]
.iter()
.map(|s| {
async move {
signal(*s)
.expect(&format!("register {:?} handler", s))
.recv()
.await
}
.boxed()
}),
)
.await;
sig_handler_ctx
.shutdown
.send(None)
.expect("send shutdown signal");
});
if !rpc_ctx.db.exists(&<JsonPointer>::default()).await? {
rpc_ctx
.db
@@ -55,12 +86,22 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
cors,
auth,
]
})
.with_graceful_shutdown({
let mut shutdown = rpc_ctx.shutdown.subscribe();
async move {
shutdown.recv().await.expect("context dropped");
}
});
let rev_cache_ctx = rpc_ctx.clone();
let revision_cache_task = tokio::spawn(async move {
let mut sub = rev_cache_ctx.db.subscribe();
loop {
let mut shutdown = rev_cache_ctx.shutdown.subscribe();
while matches!(
shutdown.try_recv(),
Err(tokio::sync::broadcast::error::TryRecvError::Empty)
) {
let rev = match sub.recv().await {
Ok(a) => a,
Err(_) => {
@@ -76,8 +117,8 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
}
});
let tor_health_check_task =
embassy::daemon::tor_health_check::tor_health_check_daemon(&rpc_ctx.net_controller.tor);
// let tor_health_check_task =
// embassy::daemon::tor_health_check::tor_health_check_daemon(&rpc_ctx.net_controller.tor);
let ws_ctx = rpc_ctx.clone();
let ws_server = {
let builder = Server::bind(&ws_ctx.bind_ws);
@@ -101,7 +142,13 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
}
});
builder.serve(make_svc)
};
}
.with_graceful_shutdown({
let mut shutdown = rpc_ctx.shutdown.subscribe();
async move {
shutdown.recv().await.expect("context dropped");
}
});
let status_ctx = rpc_ctx.clone();
let status_daemon = daemon(
@@ -117,6 +164,7 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
}
},
Duration::from_millis(500),
rpc_ctx.shutdown.subscribe(),
);
let health_ctx = rpc_ctx.clone();
let health_daemon = daemon(
@@ -132,7 +180,26 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
}
},
Duration::from_millis(500),
rpc_ctx.shutdown.subscribe(),
);
let tor_health_ctx = rpc_ctx.clone();
let tor_client = Client::builder()
.proxy(
Proxy::all(format!("socks5h://{}:{}", rpc_ctx.host(), rpc_ctx.port()))
.with_kind(crate::ErrorKind::Network)?,
)
.build()
.with_kind(crate::ErrorKind::Network)?;
let tor_health_daemon = daemon(
move || {
let ctx = tor_health_ctx.clone();
let client = tor_client.clone();
async move { tor_health_check(&client, &ctx.net_controller.tor).await }
},
Duration::from_secs(300),
rpc_ctx.shutdown.subscribe(),
);
futures::try_join!(
server.map_err(|e| Error::new(e, ErrorKind::Network)),
revision_cache_task.map_err(|e| Error::new(
@@ -148,8 +215,14 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
e.context("Health Check daemon panicked!"),
ErrorKind::Unknown
)),
futures::FutureExt::map(tor_health_check_task, Ok)
tor_health_daemon
.map_err(|e| Error::new(e.context("Tor Health daemon panicked!"), ErrorKind::Unknown)),
)?;
rpc_ctx.managers.empty().await?;
sig_handler.abort();
Ok(())
}

View File

@@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize};
use super::{Config, ConfigSpec};
use crate::action::ActionImplementation;
use crate::context::RpcContext;
use crate::dependencies::Dependencies;
use crate::s9pk::manifest::PackageId;
use crate::status::health_check::HealthCheckId;
@@ -28,12 +29,14 @@ pub struct ConfigActions {
impl ConfigActions {
pub async fn get(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
pkg_version: &Version,
volumes: &Volumes,
) -> Result<ConfigRes, Error> {
self.get
.execute(
ctx,
pkg_id,
pkg_version,
Some("GetConfig"),
@@ -49,6 +52,7 @@ impl ConfigActions {
pub async fn set(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
pkg_version: &Version,
dependencies: &Dependencies,
@@ -58,6 +62,7 @@ impl ConfigActions {
let res: SetResult = self
.set
.execute(
ctx,
pkg_id,
pkg_version,
Some("SetConfig"),

View File

@@ -2,7 +2,6 @@ use std::time::Duration;
use anyhow::anyhow;
use bollard::container::KillContainerOptions;
use bollard::Docker;
use futures::future::{BoxFuture, FutureExt};
use indexmap::{IndexMap, IndexSet};
use itertools::Itertools;
@@ -172,7 +171,7 @@ pub async fn get(
.get(&mut db, true)
.await?;
let volumes = pkg_model.manifest().volumes().get(&mut db, true).await?;
action.get(&id, &*version, &*volumes).await
action.get(&ctx, &id, &*version, &*volumes).await
}
#[command(
@@ -205,8 +204,8 @@ pub async fn set_dry(
let mut tx = db.begin().await?;
let mut breakages = IndexMap::new();
configure(
&ctx,
&mut tx,
&ctx.docker,
&id,
config,
&timeout,
@@ -241,8 +240,8 @@ pub async fn set_impl(
let mut tx = db.begin().await?;
let mut breakages = IndexMap::new();
configure(
&ctx,
&mut tx,
&ctx.docker,
&id,
config,
&timeout,
@@ -270,8 +269,8 @@ pub async fn set_impl(
}
pub fn configure<'a, Db: DbHandle>(
ctx: &'a RpcContext,
db: &'a mut Db,
docker: &'a Docker,
id: &'a PackageId,
config: Option<Config>,
timeout: &'a Option<Duration>,
@@ -311,7 +310,7 @@ pub fn configure<'a, Db: DbHandle>(
let ConfigRes {
config: old_config,
spec,
} = action.get(id, &*version, &*volumes).await?;
} = action.get(ctx, id, &*version, &*volumes).await?;
// determine new config to use
let mut config = if let Some(config) = config.or_else(|| old_config.clone()) {
@@ -321,7 +320,7 @@ pub fn configure<'a, Db: DbHandle>(
};
spec.matches(&config)?; // check that new config matches spec
spec.update(db, &*overrides, &mut config).await?; // dereference pointers in the new config
spec.update(ctx, db, &*overrides, &mut config).await?; // dereference pointers in the new config
// create backreferences to pointers
let mut sys = pkg_model.clone().system_pointers().get_mut(db).await?;
@@ -360,7 +359,7 @@ pub fn configure<'a, Db: DbHandle>(
let signal = if !dry_run {
// run config action
let res = action
.set(id, &*version, &*dependencies, &*volumes, &config)
.set(ctx, id, &*version, &*dependencies, &*volumes, &config)
.await?;
// track dependencies with no pointers
@@ -489,7 +488,13 @@ pub fn configure<'a, Db: DbHandle>(
{
let manifest = dependent_model.clone().manifest().get(db, true).await?;
if let Err(error) = cfg
.check(dependent, &manifest.version, &manifest.volumes, &config)
.check(
ctx,
dependent,
&manifest.version,
&manifest.volumes,
&config,
)
.await?
{
let dep_err = DependencyError::ConfigUnsatisfied { error };
@@ -509,7 +514,7 @@ pub fn configure<'a, Db: DbHandle>(
if let PackagePointerSpecVariant::Config { selector, multi } = ptr {
if selector.select(*multi, &next) != selector.select(*multi, &prev) {
if let Err(e) = configure(
db, docker, dependent, None, timeout, dry_run, overrides, breakages,
ctx, db, dependent, None, timeout, dry_run, overrides, breakages,
)
.await
{
@@ -542,7 +547,7 @@ pub fn configure<'a, Db: DbHandle>(
}
if let Some(signal) = signal {
docker
ctx.docker
.kill_container(
&DockerAction::container_name(id, None),
Some(KillContainerOptions {

View File

@@ -18,6 +18,7 @@ use serde_json::{Number, Value};
use super::util::{self, CharSet, NumRange, UniqueBy, STATIC_NULL};
use super::{Config, MatchError, NoMatchWithPath, TimeoutError, TypeOf};
use crate::config::ConfigurationError;
use crate::context::RpcContext;
use crate::net::interface::InterfaceId;
use crate::s9pk::manifest::{Manifest, PackageId};
use crate::Error;
@@ -34,6 +35,7 @@ pub trait ValueSpec {
// update is to fill in values for environment pointers recursively
async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
db: &mut Db,
config_overrides: &IndexMap<PackageId, Config>,
value: &mut Value,
@@ -148,11 +150,12 @@ where
}
async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
db: &mut Db,
config_overrides: &IndexMap<PackageId, Config>,
value: &mut Value,
) -> Result<(), ConfigurationError> {
self.inner.update(db, config_overrides, value).await
self.inner.update(ctx, db, config_overrides, value).await
}
fn pointers(&self, value: &Value) -> Result<Vec<ValueSpecPointer>, NoMatchWithPath> {
self.inner.pointers(value)
@@ -188,11 +191,12 @@ where
}
async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
db: &mut Db,
config_overrides: &IndexMap<PackageId, Config>,
value: &mut Value,
) -> Result<(), ConfigurationError> {
self.inner.update(db, config_overrides, value).await
self.inner.update(ctx, db, config_overrides, value).await
}
fn pointers(&self, value: &Value) -> Result<Vec<ValueSpecPointer>, NoMatchWithPath> {
self.inner.pointers(value)
@@ -261,11 +265,12 @@ where
}
async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
db: &mut Db,
config_overrides: &IndexMap<PackageId, Config>,
value: &mut Value,
) -> Result<(), ConfigurationError> {
self.inner.update(db, config_overrides, value).await
self.inner.update(ctx, db, config_overrides, value).await
}
fn pointers(&self, value: &Value) -> Result<Vec<ValueSpecPointer>, NoMatchWithPath> {
self.inner.pointers(value)
@@ -371,19 +376,20 @@ impl ValueSpec for ValueSpecAny {
}
async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
db: &mut Db,
config_overrides: &IndexMap<PackageId, Config>,
value: &mut Value,
) -> Result<(), ConfigurationError> {
match self {
ValueSpecAny::Boolean(a) => a.update(db, config_overrides, value).await,
ValueSpecAny::Enum(a) => a.update(db, config_overrides, value).await,
ValueSpecAny::List(a) => a.update(db, config_overrides, value).await,
ValueSpecAny::Number(a) => a.update(db, config_overrides, value).await,
ValueSpecAny::Object(a) => a.update(db, config_overrides, value).await,
ValueSpecAny::String(a) => a.update(db, config_overrides, value).await,
ValueSpecAny::Union(a) => a.update(db, config_overrides, value).await,
ValueSpecAny::Pointer(a) => a.update(db, config_overrides, value).await,
ValueSpecAny::Boolean(a) => a.update(ctx, db, config_overrides, value).await,
ValueSpecAny::Enum(a) => a.update(ctx, db, config_overrides, value).await,
ValueSpecAny::List(a) => a.update(ctx, db, config_overrides, value).await,
ValueSpecAny::Number(a) => a.update(ctx, db, config_overrides, value).await,
ValueSpecAny::Object(a) => a.update(ctx, db, config_overrides, value).await,
ValueSpecAny::String(a) => a.update(ctx, db, config_overrides, value).await,
ValueSpecAny::Union(a) => a.update(ctx, db, config_overrides, value).await,
ValueSpecAny::Pointer(a) => a.update(ctx, db, config_overrides, value).await,
}
}
fn pointers(&self, value: &Value) -> Result<Vec<ValueSpecPointer>, NoMatchWithPath> {
@@ -463,6 +469,7 @@ impl ValueSpec for ValueSpecBoolean {
}
async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
_db: &mut Db,
_config_overrides: &IndexMap<PackageId, Config>,
_value: &mut Value,
@@ -550,6 +557,7 @@ impl ValueSpec for ValueSpecEnum {
}
async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
_db: &mut Db,
_config_overrides: &IndexMap<PackageId, Config>,
_value: &mut Value,
@@ -634,13 +642,14 @@ where
}
async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
db: &mut Db,
config_overrides: &IndexMap<PackageId, Config>,
value: &mut Value,
) -> Result<(), ConfigurationError> {
if let Value::Array(ref mut ls) = value {
for (i, val) in ls.into_iter().enumerate() {
match self.spec.update(db, config_overrides, val).await {
match self.spec.update(ctx, db, config_overrides, val).await {
Err(ConfigurationError::NoMatch(e)) => {
Err(ConfigurationError::NoMatch(e.prepend(format!("{}", i))))
}
@@ -735,16 +744,17 @@ impl ValueSpec for ValueSpecList {
}
async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
db: &mut Db,
config_overrides: &IndexMap<PackageId, Config>,
value: &mut Value,
) -> Result<(), ConfigurationError> {
match self {
ValueSpecList::Enum(a) => a.update(db, config_overrides, value).await,
ValueSpecList::Number(a) => a.update(db, config_overrides, value).await,
ValueSpecList::Object(a) => a.update(db, config_overrides, value).await,
ValueSpecList::String(a) => a.update(db, config_overrides, value).await,
ValueSpecList::Union(a) => a.update(db, config_overrides, value).await,
ValueSpecList::Enum(a) => a.update(ctx, db, config_overrides, value).await,
ValueSpecList::Number(a) => a.update(ctx, db, config_overrides, value).await,
ValueSpecList::Object(a) => a.update(ctx, db, config_overrides, value).await,
ValueSpecList::String(a) => a.update(ctx, db, config_overrides, value).await,
ValueSpecList::Union(a) => a.update(ctx, db, config_overrides, value).await,
}
}
fn pointers(&self, value: &Value) -> Result<Vec<ValueSpecPointer>, NoMatchWithPath> {
@@ -857,6 +867,7 @@ impl ValueSpec for ValueSpecNumber {
}
async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
_db: &mut Db,
_config_overrides: &IndexMap<PackageId, Config>,
_value: &mut Value,
@@ -968,12 +979,13 @@ impl ValueSpec for ValueSpecObject {
}
async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
db: &mut Db,
config_overrides: &IndexMap<PackageId, Config>,
value: &mut Value,
) -> Result<(), ConfigurationError> {
if let Value::Object(o) = value {
self.spec.update(db, config_overrides, o).await
self.spec.update(ctx, db, config_overrides, o).await
} else {
Err(ConfigurationError::NoMatch(NoMatchWithPath::new(
MatchError::InvalidType("object", value.type_of()),
@@ -1066,6 +1078,7 @@ impl ConfigSpec {
pub async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
db: &mut Db,
config_overrides: &IndexMap<PackageId, Config>,
cfg: &mut Config,
@@ -1074,10 +1087,10 @@ impl ConfigSpec {
match cfg.get_mut(k) {
None => {
let mut v = Value::Null;
vs.update(db, config_overrides, &mut v).await?;
vs.update(ctx, db, config_overrides, &mut v).await?;
cfg.insert(k.clone(), v);
}
Some(v) => match vs.update(db, config_overrides, v).await {
Some(v) => match vs.update(ctx, db, config_overrides, v).await {
Err(ConfigurationError::NoMatch(e)) => {
Err(ConfigurationError::NoMatch(e.prepend(k.clone())))
}
@@ -1156,6 +1169,7 @@ impl ValueSpec for ValueSpecString {
}
async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
_db: &mut Db,
_config_overrides: &IndexMap<PackageId, Config>,
_value: &mut Value,
@@ -1365,6 +1379,7 @@ impl ValueSpec for ValueSpecUnion {
}
async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
db: &mut Db,
config_overrides: &IndexMap<PackageId, Config>,
value: &mut Value,
@@ -1378,7 +1393,7 @@ impl ValueSpec for ValueSpecUnion {
None => Err(ConfigurationError::NoMatch(NoMatchWithPath::new(
MatchError::Union(tag.clone(), self.variants.keys().cloned().collect()),
))),
Some(spec) => spec.update(db, config_overrides, o).await,
Some(spec) => spec.update(ctx, db, config_overrides, o).await,
},
Some(other) => Err(ConfigurationError::NoMatch(
NoMatchWithPath::new(MatchError::InvalidType("string", other.type_of()))
@@ -1505,13 +1520,14 @@ impl ValueSpec for ValueSpecPointer {
}
async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
db: &mut Db,
config_overrides: &IndexMap<PackageId, Config>,
value: &mut Value,
) -> Result<(), ConfigurationError> {
match self {
ValueSpecPointer::Package(a) => a.update(db, config_overrides, value).await,
ValueSpecPointer::System(a) => a.update(db, config_overrides, value).await,
ValueSpecPointer::Package(a) => a.update(ctx, db, config_overrides, value).await,
ValueSpecPointer::System(a) => a.update(ctx, db, config_overrides, value).await,
}
}
fn pointers(&self, _value: &Value) -> Result<Vec<ValueSpecPointer>, NoMatchWithPath> {
@@ -1543,6 +1559,7 @@ impl fmt::Display for PackagePointerSpec {
impl PackagePointerSpec {
async fn deref<Db: DbHandle>(
&self,
ctx: &RpcContext,
db: &mut Db,
config_overrides: &IndexMap<PackageId, Config>,
) -> Result<Value, ConfigurationError> {
@@ -1602,7 +1619,7 @@ impl PackagePointerSpec {
(&*version, &*cfg_actions, &*volumes)
{
let cfg_res = cfg_actions
.get(&self.package_id, version, volumes)
.get(&ctx, &self.package_id, version, volumes)
.await
.map_err(|e| ConfigurationError::SystemError(Error::from(e)))?;
if let Some(cfg) = cfg_res.config {
@@ -1646,11 +1663,12 @@ impl ValueSpec for PackagePointerSpec {
}
async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
db: &mut Db,
config_overrides: &IndexMap<PackageId, Config>,
value: &mut Value,
) -> Result<(), ConfigurationError> {
*value = self.deref(db, config_overrides).await?;
*value = self.deref(ctx, db, config_overrides).await?;
Ok(())
}
fn pointers(&self, _value: &Value) -> Result<Vec<ValueSpecPointer>, NoMatchWithPath> {
@@ -1762,6 +1780,7 @@ impl ValueSpec for SystemPointerSpec {
}
async fn update<Db: DbHandle>(
&self,
ctx: &RpcContext,
db: &mut Db,
_config_overrides: &IndexMap<PackageId, Config>,
value: &mut Value,

View File

@@ -1,5 +1,5 @@
mod cli;
mod rpc;
pub mod cli;
pub mod rpc;
pub use cli::CliContext;
pub use rpc::RpcContext;

View File

@@ -13,10 +13,12 @@ use serde::Deserialize;
use sqlx::migrate::MigrateDatabase;
use sqlx::{Sqlite, SqlitePool};
use tokio::fs::File;
use tokio::sync::broadcast::Sender;
use tokio::sync::RwLock;
use crate::manager::ManagerMap;
use crate::net::NetController;
use crate::shutdown::Shutdown;
use crate::util::{from_toml_async_reader, AsyncFileExt};
use crate::{Error, ResultExt};
@@ -26,50 +28,39 @@ pub struct RpcContextConfig {
pub bind_rpc: Option<SocketAddr>,
pub bind_ws: Option<SocketAddr>,
pub tor_control: Option<SocketAddr>,
pub db: Option<PathBuf>,
pub secret_store: Option<PathBuf>,
pub revision_cache_size: Option<usize>,
pub datadir: Option<PathBuf>,
}
pub struct RpcContextSeed {
pub bind_rpc: SocketAddr,
pub bind_ws: SocketAddr,
pub db: PatchDb,
pub secret_store: SqlitePool,
pub docker: Docker,
pub net_controller: Arc<NetController>,
pub managers: ManagerMap,
pub revision_cache_size: usize,
pub revision_cache: RwLock<VecDeque<Arc<Revision>>>,
pub metrics_cache: RwLock<Option<crate::system::Metrics>>,
}
#[derive(Clone)]
pub struct RpcContext(Arc<RpcContextSeed>);
impl RpcContext {
pub async fn init<P: AsRef<Path>>(cfg_path: Option<P>) -> Result<Self, Error> {
let cfg_path = cfg_path
impl RpcContextConfig {
pub async fn load<P: AsRef<Path>>(path: Option<P>) -> Result<Self, Error> {
let cfg_path = path
.as_ref()
.map(|p| p.as_ref())
.unwrap_or(Path::new(crate::CONFIG_PATH));
let base = if let Some(f) = File::maybe_open(cfg_path)
if let Some(f) = File::maybe_open(cfg_path)
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, cfg_path.display().to_string()))?
{
from_toml_async_reader(f).await?
from_toml_async_reader(f).await
} else {
RpcContextConfig::default()
};
let db = PatchDb::open(
base.db
.unwrap_or_else(|| Path::new("/mnt/embassy-os/embassy.db").to_owned()),
)
.await?;
Ok(RpcContextConfig::default())
}
}
pub fn datadir(&self) -> &Path {
self.datadir
.as_ref()
.map(|a| a.as_path())
.unwrap_or_else(|| Path::new("/embassy-data"))
}
pub async fn db(&self) -> Result<PatchDb, Error> {
PatchDb::open(self.datadir().join("main").join("embassy.db"))
.await
.map_err(Error::from)
}
pub async fn secret_store(&self) -> Result<SqlitePool, Error> {
let secret_store_url = format!(
"sqlite://{}",
base.secret_store
.unwrap_or_else(|| Path::new("/mnt/embassy-os/secrets.db").to_owned())
.display()
self.datadir().join("main").join("secrets.db").display()
);
if !Sqlite::database_exists(&secret_store_url).await? {
Sqlite::create_database(&secret_store_url).await?;
@@ -79,26 +70,48 @@ impl RpcContext {
.run(&secret_store)
.await
.with_kind(crate::ErrorKind::Database)?;
Ok(secret_store)
}
}
pub struct RpcContextSeed {
pub bind_rpc: SocketAddr,
pub bind_ws: SocketAddr,
pub datadir: PathBuf,
pub db: PatchDb,
pub secret_store: SqlitePool,
pub docker: Docker,
pub net_controller: NetController,
pub managers: ManagerMap,
pub revision_cache_size: usize,
pub revision_cache: RwLock<VecDeque<Arc<Revision>>>,
pub metrics_cache: RwLock<Option<crate::system::Metrics>>,
pub shutdown: Sender<Option<Shutdown>>,
}
#[derive(Clone)]
pub struct RpcContext(Arc<RpcContextSeed>);
impl RpcContext {
pub async fn init<P: AsRef<Path>>(
cfg_path: Option<P>,
shutdown: Sender<Option<Shutdown>>,
) -> Result<Self, Error> {
let base = RpcContextConfig::load(cfg_path).await?;
let db = base.db().await?;
let secret_store = base.secret_store().await?;
let docker = Docker::connect_with_unix_defaults()?;
let net_controller = Arc::new(
NetController::init(
([127, 0, 0, 1], 80).into(),
crate::net::tor::os_key(&mut secret_store.acquire().await?).await?,
base.tor_control
.unwrap_or(SocketAddr::from(([127, 0, 0, 1], 9051))),
)
.await?,
);
let managers = ManagerMap::init(
&mut db.handle(),
&mut secret_store.acquire().await?,
docker.clone(),
net_controller.clone(),
let net_controller = NetController::init(
([127, 0, 0, 1], 80).into(),
crate::net::tor::os_key(&mut secret_store.acquire().await?).await?,
base.tor_control
.unwrap_or(SocketAddr::from(([127, 0, 0, 1], 9051))),
)
.await?;
let managers = ManagerMap::default();
let seed = Arc::new(RpcContextSeed {
bind_rpc: base.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()),
bind_ws: base.bind_ws.unwrap_or(([127, 0, 0, 1], 5960).into()),
datadir: base.datadir().to_path_buf(),
db,
secret_store,
docker,
@@ -107,9 +120,18 @@ impl RpcContext {
revision_cache_size: base.revision_cache_size.unwrap_or(512),
revision_cache: RwLock::new(VecDeque::new()),
metrics_cache: RwLock::new(None),
shutdown,
});
let res = Self(seed);
res.managers
.init(
&res,
&mut res.db.handle(),
&mut res.secret_store.acquire().await?,
)
.await?;
// TODO: handle apps in bad / transient state
Ok(Self(seed))
Ok(res)
}
pub async fn package_registry_url(&self) -> Result<Url, Error> {
Ok(

View File

@@ -1 +0,0 @@
pub mod tor_health_check;

View File

@@ -1,52 +0,0 @@
use std::time::Duration;
use serde_json::json;
use crate::net::tor::TorController;
lazy_static::lazy_static! {
static ref PROXY: reqwest::Proxy = reqwest::Proxy::http("socks5h://localhost:9050").expect("PROXY");
static ref CLIENT: reqwest::Client = reqwest::Client::builder().proxy(PROXY.clone()).build().expect("CLIENT");
}
pub async fn tor_health_check_daemon(tor_controller: &TorController) {
loop {
// call out to tor address
let onion = tor_controller.embassyd_onion().await;
let result = CLIENT
.post(format!("http://{}/rpc/v1", onion))
.body(
json!({
"jsonrpc": "2.0",
"method": "echo",
"params": { "message": "Follow the orange rabbit" },
})
.to_string()
.into_bytes(),
)
.send()
.await;
match result {
// if success, do nothing
Ok(_) => {}
// if failure, disconnect tor control port, and restart tor controller
Err(e) => {
log::error!("Unable to reach self over tor: {}", e);
loop {
match tor_controller.replace().await {
Ok(restarted) => {
if restarted {
log::error!("Tor has been recently restarted, refusing to restart");
}
break;
}
Err(e) => {
log::error!("Unable to restart tor: {}", e);
}
}
}
}
}
tokio::time::sleep(Duration::from_secs(300)).await;
}
}

View File

@@ -3,6 +3,7 @@ pub mod util;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use futures::{FutureExt, SinkExt, StreamExt};
use patch_db::json_ptr::JsonPointer;
@@ -85,6 +86,12 @@ async fn ws_handler<
_ => (),
}
}
_ = tokio::time::sleep(Duration::from_secs(10)).fuse() => {
stream
.send(Message::Ping(Vec::new()))
.await
.with_kind(crate::ErrorKind::Network)?;
}
}
}
}

View File

@@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize};
use crate::action::ActionImplementation;
use crate::config::Config;
use crate::context::RpcContext;
use crate::db::model::CurrentDependencyInfo;
use crate::s9pk::manifest::PackageId;
use crate::status::health_check::{HealthCheckId, HealthCheckResult, HealthCheckResultVariant};
@@ -130,6 +131,7 @@ pub struct DepInfo {
impl DepInfo {
pub async fn satisfied<Db: DbHandle>(
&self,
ctx: &RpcContext,
db: &mut Db,
dependency_id: &PackageId,
dependency_config: Option<Config>, // fetch if none
@@ -161,7 +163,7 @@ impl DepInfo {
cfg
} else if let Some(cfg_info) = &manifest.config {
cfg_info
.get(dependency_id, &manifest.version, &manifest.volumes)
.get(ctx, dependency_id, &manifest.version, &manifest.volumes)
.await?
.config
.unwrap_or_default()
@@ -171,6 +173,7 @@ impl DepInfo {
if let Some(cfg_req) = &self.config {
if let Err(e) = cfg_req
.check(
ctx,
dependent_id,
dependent_version,
dependent_volumes,
@@ -218,6 +221,7 @@ pub struct DependencyConfig {
impl DependencyConfig {
pub async fn check(
&self,
ctx: &RpcContext,
dependent_id: &PackageId,
dependent_version: &Version,
dependent_volumes: &Volumes,
@@ -226,6 +230,7 @@ impl DependencyConfig {
Ok(self
.check
.sandboxed(
ctx,
dependent_id,
dependent_version,
dependent_volumes,
@@ -236,6 +241,7 @@ impl DependencyConfig {
}
pub async fn auto_configure(
&self,
ctx: &RpcContext,
dependent_id: &PackageId,
dependent_version: &Version,
dependent_volumes: &Volumes,
@@ -243,6 +249,7 @@ impl DependencyConfig {
) -> Result<Config, Error> {
self.auto_configure
.sandboxed(
ctx,
dependent_id,
dependent_version,
dependent_volumes,

45
appmgr/src/disk/main.rs Normal file
View File

@@ -0,0 +1,45 @@
use tokio::process::Command;
use crate::util::Invoke;
use crate::Error;
pub async fn importable() -> Result<bool, Error> {
todo!()
}
pub async fn create(disks: &[&str]) -> Result<(), Error> {
todo!()
}
pub async fn load(password: &str) -> Result<(), Error> {
todo!()
}
pub async fn create_pool(disks: &[&str]) -> Result<(), Error> {
Command::new("zpool")
.arg("create")
.arg("embassy-data")
.args(disks)
.invoke(crate::ErrorKind::Zfs)
.await?;
Ok(())
}
pub async fn create_fs() -> Result<(), Error> {
todo!()
}
pub async fn import() -> Result<(), Error> {
Command::new("zpool")
.arg("import")
.arg("-f")
.arg("embassy-data")
.invoke(crate::ErrorKind::Zfs)
.await?;
Ok(())
}
pub async fn mount(password: &str) -> Result<(), Error> {
// zfs get -H -ovalue mountpoint embassy-data
todo!()
}

2
appmgr/src/disk/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod main;
pub mod util;

View File

@@ -9,9 +9,6 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::util::Invoke;
use crate::{Error, ResultExt as _};
pub const ROOT_DISK: &'static str = "/dev/mmcblk0";
pub const MAIN_DISK: &'static str = "/dev/sda";
pub struct Disks(IndexMap<String, DiskInfo>);
#[derive(Clone, Debug, Deserialize, Serialize)]
@@ -190,6 +187,49 @@ pub async fn mount_encfs<P0: AsRef<Path>, P1: AsRef<Path>>(
}
}
pub async fn bind<P0: AsRef<Path>, P1: AsRef<Path>>(
src: P0,
dst: P1,
read_only: bool,
) -> Result<(), Error> {
log::info!(
"Binding {} to {}",
src.as_ref().display(),
dst.as_ref().display()
);
let is_mountpoint = tokio::process::Command::new("mountpoint")
.arg(dst.as_ref())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.await?;
if is_mountpoint.success() {
unmount(dst.as_ref()).await?;
}
tokio::fs::create_dir_all(&dst).await?;
let mut mount_cmd = tokio::process::Command::new("mount");
mount_cmd.arg("--bind");
if read_only {
mount_cmd.arg("-o").arg("ro");
}
mount_cmd
.arg(src.as_ref())
.arg(dst.as_ref())
.invoke(crate::ErrorKind::Filesystem)
.await
.map_err(|e| {
Error::new(
e.source.context(format!(
"Binding {} to {}",
src.as_ref().display(),
dst.as_ref().display(),
)),
e.kind,
)
})?;
Ok(())
}
pub async fn unmount<P: AsRef<Path>>(mount_point: P) -> Result<(), Error> {
log::info!("Unmounting {}.", mount_point.as_ref().display());
let umount_output = tokio::process::Command::new("umount")

View File

@@ -51,8 +51,9 @@ pub enum ErrorKind {
SoundError = 43,
ParseTimestamp = 44,
ParseSysInfo = 45,
WifiError = 46,
Wifi = 46,
Journald = 47,
Zfs = 48,
}
impl ErrorKind {
pub fn as_str(&self) -> &'static str {
@@ -103,8 +104,9 @@ impl ErrorKind {
SoundError => "Sound Interface Error",
ParseTimestamp => "Timestamp Parsing Error",
ParseSysInfo => "System Info Parsing Error",
WifiError => "Wifi Internal Error",
Wifi => "WiFi Internal Error",
Journald => "Journald Error",
Zfs => "ZFS Error",
}
}
}

View File

@@ -22,7 +22,7 @@ pub async fn set_hostname(hostname: &str) -> Result<(), Error> {
}
pub async fn get_product_key() -> Result<String, Error> {
let out = tokio::fs::read_to_string("/boot/product_key.txt").await?;
let out = tokio::fs::read_to_string("/boot/embassy-os/product_key.txt").await?;
Ok(out.trim().to_owned())
}
@@ -34,7 +34,7 @@ pub async fn get_id() -> Result<String, Error> {
Ok(hex::encode(&res[0..4]))
}
// cat /boot/product_key.txt | shasum -a 256 | head -c 8 | awk '{print "start9-"$1}' | xargs hostnamectl set-hostname
// cat /boot/embassy-os/product_key.txt | shasum -a 256 | head -c 8 | awk '{print "start9-"$1}' | xargs hostnamectl set-hostname
pub async fn sync_hostname() -> Result<(), Error> {
set_hostname(&format!("start9-{}", get_id().await?)).await?;
Ok(())

View File

@@ -11,6 +11,7 @@ use crate::util::Version;
use crate::Error;
pub async fn update_dependents<'a, Db: DbHandle, I: IntoIterator<Item = &'a PackageId>>(
ctx: &RpcContext,
db: &mut Db,
id: &PackageId,
deps: I,
@@ -37,7 +38,7 @@ pub async fn update_dependents<'a, Db: DbHandle, I: IntoIterator<Item = &'a Pack
crate::ErrorKind::Database,
)
})?
.satisfied(db, id, None, dep, &man.version, &man.volumes)
.satisfied(ctx, db, id, None, dep, &man.version, &man.volumes)
.await?
{
let mut errs = crate::db::DatabaseModel::new()
@@ -161,7 +162,13 @@ pub async fn uninstall(
.package_data()
.remove(&mut tx, &entry.manifest.id)
.await?;
update_dependents(&mut tx, &entry.manifest.id, entry.current_dependents.keys()).await?;
update_dependents(
ctx,
&mut tx,
&entry.manifest.id,
entry.current_dependents.keys(),
)
.await?;
tx.commit(None).await?;
Ok(())
}

View File

@@ -36,8 +36,8 @@ use crate::{Error, ResultExt};
pub mod cleanup;
pub mod progress;
pub const PKG_CACHE: &'static str = "/mnt/embassy-os/cache/packages";
pub const PKG_PUBLIC_DIR: &'static str = "/mnt/embassy-os/public/package-data";
pub const PKG_CACHE: &'static str = "main/cache/packages";
pub const PKG_PUBLIC_DIR: &'static str = "main/public/package-data";
#[command(display(display_none))]
pub async fn install(
@@ -170,7 +170,11 @@ pub async fn download_install_s9pk(
let pkg_id = &temp_manifest.id;
let version = &temp_manifest.version;
let pkg_cache_dir = Path::new(PKG_CACHE).join(pkg_id).join(version.as_str());
let pkg_cache_dir = ctx
.datadir
.join(PKG_CACHE)
.join(pkg_id)
.join(version.as_str());
tokio::fs::create_dir_all(&pkg_cache_dir).await?;
let pkg_cache = AsRef::<Path>::as_ref(pkg_id).with_extension("s9pk");
@@ -368,7 +372,9 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
}
.with_kind(crate::ErrorKind::Registry)?;
if let Some(manifest) = manifest {
let dir = Path::new(PKG_PUBLIC_DIR)
let dir = ctx
.datadir
.join(PKG_PUBLIC_DIR)
.join(&manifest.id)
.join(manifest.version.as_str());
let icon_path = dir.join(format!("icon.{}", manifest.assets.icon_type()));
@@ -416,7 +422,9 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
}
log::info!("Install {}@{}: Fetched Dependency Info", pkg_id, version);
let public_dir_path = Path::new(PKG_PUBLIC_DIR)
let public_dir_path = ctx
.datadir
.join(PKG_PUBLIC_DIR)
.join(pkg_id)
.join(version.as_str());
tokio::fs::create_dir_all(&public_dir_path).await?;
@@ -512,7 +520,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
let mut sql_tx = ctx.secret_store.begin().await?;
log::info!("Install {}@{}: Creating volumes", pkg_id, version);
manifest.volumes.install(pkg_id, version).await?;
manifest.volumes.install(ctx, pkg_id, version).await?;
log::info!("Install {}@{}: Created volumes", pkg_id, version);
log::info!("Install {}@{}: Installing interfaces", pkg_id, version);
@@ -522,8 +530,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
log::info!("Install {}@{}: Creating manager", pkg_id, version);
ctx.managers
.add(
ctx.docker.clone(),
ctx.net_controller.clone(),
ctx.clone(),
manifest.clone(),
manifest.interfaces.tor_keys(&mut sql_tx, pkg_id).await?,
)
@@ -572,8 +579,13 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
status: Status {
configured: manifest.config.is_none(),
main: MainStatus::Stopped,
dependency_errors: DependencyErrors::init(&mut tx, &manifest, &current_dependencies)
.await?,
dependency_errors: DependencyErrors::init(
ctx,
&mut tx,
&manifest,
&current_dependencies,
)
.await?,
},
manifest: manifest.clone(),
system_pointers: Vec::new(),
@@ -600,6 +612,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
} = prev
{
update_dependents(
ctx,
&mut tx,
pkg_id,
current_dependents
@@ -612,6 +625,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
if let Some(res) = prev_manifest
.migrations
.to(
ctx,
version,
pkg_id,
&prev_manifest.version,
@@ -626,15 +640,21 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
}
if let Some(res) = manifest
.migrations
.from(&prev_manifest.version, pkg_id, version, &manifest.volumes)
.from(
ctx,
&prev_manifest.version,
pkg_id,
version,
&manifest.volumes,
)
.await?
{
configured &= res.configured;
}
if configured {
crate::config::configure(
ctx,
&mut tx,
&ctx.docker,
pkg_id,
None,
&None,
@@ -646,7 +666,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
todo!("set as running if viable");
}
} else {
update_dependents(&mut tx, pkg_id, current_dependents.keys()).await?;
update_dependents(ctx, &mut tx, pkg_id, current_dependents.keys()).await?;
}
sql_tx.commit().await?;

View File

@@ -22,10 +22,10 @@ pub mod backup;
pub mod config;
pub mod context;
pub mod control;
pub mod daemon;
pub mod db;
pub mod dependencies;
pub mod developer;
pub mod disk;
pub mod error;
pub mod hostname;
pub mod id;
@@ -38,6 +38,7 @@ pub mod migration;
pub mod net;
pub mod registry;
pub mod s9pk;
pub mod shutdown;
pub mod sound;
pub mod ssh;
pub mod status;

View File

@@ -6,30 +6,29 @@ use std::task::Poll;
use anyhow::anyhow;
use bollard::container::StopContainerOptions;
use bollard::Docker;
use patch_db::DbHandle;
use sqlx::{Executor, Sqlite};
use tokio::sync::watch::error::RecvError;
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use torut::onion::TorSecretKeyV3;
use crate::action::docker::DockerAction;
use crate::context::RpcContext;
use crate::net::interface::InterfaceId;
use crate::net::NetController;
use crate::s9pk::manifest::{Manifest, PackageId};
use crate::util::{Container, Version};
use crate::util::{Container, NonDetachingJoinHandle, Version};
use crate::Error;
#[derive(Default)]
pub struct ManagerMap(RwLock<HashMap<(PackageId, Version), Arc<Manager>>>);
impl ManagerMap {
pub async fn init<Db: DbHandle, Ex>(
&self,
ctx: &RpcContext,
db: &mut Db,
secrets: &mut Ex,
docker: Docker,
net_ctl: Arc<NetController>,
) -> Result<Self, Error>
) -> Result<(), Error>
where
for<'a> &'a mut Ex: Executor<'a, Database = Sqlite>,
{
@@ -53,16 +52,16 @@ impl ManagerMap {
let tor_keys = man.interfaces.tor_keys(secrets, &package).await?;
res.insert(
(package, man.version.clone()),
Arc::new(Manager::create(docker.clone(), net_ctl.clone(), man, tor_keys).await?),
Arc::new(Manager::create(ctx.clone(), man, tor_keys).await?),
);
}
Ok(ManagerMap(RwLock::new(res)))
*self.0.write().await = res;
Ok(())
}
pub async fn add(
&self,
docker: Docker,
net_ctl: Arc<NetController>,
ctx: RpcContext,
manifest: Manifest,
tor_keys: HashMap<InterfaceId, TorSecretKeyV3>,
) -> Result<(), Error> {
@@ -75,7 +74,7 @@ impl ManagerMap {
}
lock.insert(
id,
Arc::new(Manager::create(docker, net_ctl, manifest, tor_keys).await?),
Arc::new(Manager::create(ctx, manifest, tor_keys).await?),
);
Ok(())
}
@@ -88,6 +87,20 @@ impl ManagerMap {
}
}
pub async fn empty(&self) -> Result<(), Error> {
let res = futures::future::join_all(
std::mem::take(&mut *self.0.write().await)
.into_iter()
.map(|(_, man)| async move { man.exit().await }),
)
.await;
res.into_iter().fold(Ok(()), |res, x| match (res, x) {
(Ok(()), x) => x,
(Err(e), Ok(())) => Err(e),
(Err(e1), Err(e2)) => Err(Error::new(anyhow!("{}, {}", e1.source, e2.source), e1.kind)),
})
}
pub async fn get(&self, id: &(PackageId, Version)) -> Option<Arc<Manager>> {
self.0.read().await.get(id).cloned()
}
@@ -95,7 +108,7 @@ impl ManagerMap {
pub struct Manager {
shared: Arc<ManagerSharedState>,
thread: Container<JoinHandle<()>>,
thread: Container<NonDetachingJoinHandle<()>>,
}
pub enum Status {
@@ -105,10 +118,9 @@ pub enum Status {
}
struct ManagerSharedState {
ctx: RpcContext,
status: AtomicUsize,
on_stop: Sender<OnStop>,
docker: Docker,
net_ctl: Arc<NetController>,
manifest: Manifest,
container_name: String,
tor_keys: HashMap<InterfaceId, TorSecretKeyV3>,
@@ -128,6 +140,7 @@ async fn run_main(state: &Arc<ManagerSharedState>) -> Result<Result<(), (i32, St
.manifest
.main
.execute::<(), ()>(
&rt_state.ctx,
&rt_state.manifest.id,
&rt_state.manifest.version,
None,
@@ -140,6 +153,7 @@ async fn run_main(state: &Arc<ManagerSharedState>) -> Result<Result<(), (i32, St
let ip;
loop {
match state
.ctx
.docker
.inspect_container(&state.container_name, None)
.await
@@ -177,7 +191,8 @@ async fn run_main(state: &Arc<ManagerSharedState>) -> Result<Result<(), (i32, St
}
state
.net_ctl
.ctx
.net_controller
.add(
&state.manifest.id,
ip,
@@ -215,7 +230,8 @@ async fn run_main(state: &Arc<ManagerSharedState>) -> Result<Result<(), (i32, St
})
.and_then(|a| a);
state
.net_ctl
.ctx
.net_controller
.remove(
&state.manifest.id,
state.manifest.interfaces.0.keys().cloned(),
@@ -226,17 +242,15 @@ async fn run_main(state: &Arc<ManagerSharedState>) -> Result<Result<(), (i32, St
impl Manager {
async fn create(
docker: Docker,
net_ctl: Arc<NetController>,
ctx: RpcContext,
manifest: Manifest,
tor_keys: HashMap<InterfaceId, TorSecretKeyV3>,
) -> Result<Self, Error> {
let (on_stop, mut recv) = channel(OnStop::Sleep);
let shared = Arc::new(ManagerSharedState {
ctx,
status: AtomicUsize::new(Status::Stopped as usize),
on_stop,
docker,
net_ctl,
container_name: DockerAction::container_name(&manifest.id, None),
manifest,
tor_keys,
@@ -301,7 +315,7 @@ impl Manager {
});
Ok(Manager {
shared,
thread: Container::new(Some(thread)),
thread: Container::new(Some(thread.into())),
})
}
@@ -326,6 +340,7 @@ impl Manager {
}
match self
.shared
.ctx
.docker
.stop_container(
&self.shared.container_name,
@@ -360,6 +375,7 @@ impl Manager {
pub async fn pause(&self) -> Result<(), Error> {
self.shared
.ctx
.docker
.pause_container(&self.shared.container_name)
.await?;
@@ -371,6 +387,7 @@ impl Manager {
pub async fn resume(&self) -> Result<(), Error> {
self.shared
.ctx
.docker
.unpause_container(&self.shared.container_name)
.await?;
@@ -385,6 +402,7 @@ impl Manager {
let _ = self.shared.on_stop.send(OnStop::Exit);
match self
.shared
.ctx
.docker
.stop_container(
&self.shared.container_name,

View File

@@ -5,6 +5,7 @@ use patch_db::HasModel;
use serde::{Deserialize, Serialize};
use crate::action::ActionImplementation;
use crate::context::RpcContext;
use crate::s9pk::manifest::PackageId;
use crate::util::Version;
use crate::volume::Volumes;
@@ -19,6 +20,7 @@ pub struct Migrations {
impl Migrations {
pub async fn from(
&self,
ctx: &RpcContext,
version: &Version,
pkg_id: &PackageId,
pkg_version: &Version,
@@ -33,6 +35,7 @@ impl Migrations {
Some(
migration
.execute(
ctx,
pkg_id,
pkg_version,
Some("Migration"), // Migrations cannot be executed concurrently
@@ -52,6 +55,7 @@ impl Migrations {
}
pub async fn to(
&self,
ctx: &RpcContext,
version: &Version,
pkg_id: &PackageId,
pkg_version: &Version,
@@ -64,6 +68,7 @@ impl Migrations {
Some(
migration
.execute(
ctx,
pkg_id,
pkg_version,
Some("Migration"),

View File

@@ -6,7 +6,9 @@ use anyhow::anyhow;
use clap::ArgMatches;
use futures::future::BoxFuture;
use futures::FutureExt;
use reqwest::Client;
use rpc_toolkit::command;
use serde_json::json;
use sqlx::{Executor, Sqlite};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
@@ -346,6 +348,44 @@ impl TorControllerInner {
}
}
pub async fn tor_health_check(client: &Client, tor_controller: &TorController) {
let onion = tor_controller.embassyd_onion().await;
let result = client
.post(format!("http://{}/rpc/v1", onion))
.body(
json!({
"jsonrpc": "2.0",
"method": "echo",
"params": { "message": "Follow the orange rabbit" },
})
.to_string()
.into_bytes(),
)
.send()
.await;
match result {
// if success, do nothing
Ok(_) => {}
// if failure, disconnect tor control port, and restart tor controller
Err(e) => {
log::error!("Unable to reach self over tor: {}", e);
loop {
match tor_controller.replace().await {
Ok(restarted) => {
if restarted {
log::error!("Tor has been recently restarted, refusing to restart");
}
break;
}
Err(e) => {
log::error!("Unable to restart tor: {}", e);
}
}
}
}
}
}
#[tokio::test]
async fn test() {
let mut conn = torut::control::UnauthenticatedConn::new(

View File

@@ -25,13 +25,13 @@ pub async fn add(
if !ssid.is_ascii() {
return Err(Error::new(
anyhow::anyhow!("SSID may not have special characters"),
ErrorKind::WifiError,
ErrorKind::Wifi,
));
}
if !password.is_ascii() {
return Err(Error::new(
anyhow::anyhow!("WiFi Password may not have special characters"),
ErrorKind::WifiError,
ErrorKind::Wifi,
));
}
async fn add_procedure<'a>(
@@ -75,7 +75,7 @@ pub async fn connect(#[arg] ssid: String) -> Result<(), Error> {
if !ssid.is_ascii() {
return Err(Error::new(
anyhow::anyhow!("SSID may not have special characters"),
ErrorKind::WifiError,
ErrorKind::Wifi,
));
}
async fn connect_procedure<'a>(wpa_supplicant: WpaCli<'a>, ssid: &String) -> Result<(), Error> {
@@ -113,7 +113,7 @@ pub async fn delete(#[arg] ssid: String) -> Result<(), Error> {
if !ssid.is_ascii() {
return Err(Error::new(
anyhow::anyhow!("SSID may not have special characters"),
ErrorKind::WifiError,
ErrorKind::Wifi,
));
}
let wpa_supplicant = WpaCli { interface: "wlan0" };
@@ -127,7 +127,7 @@ pub async fn delete(#[arg] ssid: String) -> Result<(), Error> {
if interface_connected("eth0").await? {
wpa_supplicant.remove_network(&ssid).await?;
} else {
return Err(Error::new(anyhow::anyhow!("Forbidden: Deleting this Network would make your Embassy Unreachable. Either connect to ethernet or connect to a different WiFi network to remedy this."), ErrorKind::WifiError));
return Err(Error::new(anyhow::anyhow!("Forbidden: Deleting this Network would make your Embassy Unreachable. Either connect to ethernet or connect to a different WiFi network to remedy this."), ErrorKind::Wifi));
}
}
}
@@ -277,7 +277,7 @@ impl<'a> WpaCli<'a> {
.arg("-i")
.arg(self.interface)
.arg("add_network")
.invoke(ErrorKind::WifiError)
.invoke(ErrorKind::Wifi)
.await?;
let s = std::str::from_utf8(&r)?;
Ok(NetworkId(s.trim().to_owned()))
@@ -289,7 +289,7 @@ impl<'a> WpaCli<'a> {
.arg("set_network")
.arg(&id.0)
.arg(format!("{}", attr))
.invoke(ErrorKind::WifiError)
.invoke(ErrorKind::Wifi)
.await?;
Ok(())
}
@@ -300,7 +300,7 @@ impl<'a> WpaCli<'a> {
.arg("set")
.arg("country")
.arg(country_code)
.invoke(ErrorKind::WifiError)
.invoke(ErrorKind::Wifi)
.await?;
Ok(())
}
@@ -310,7 +310,7 @@ impl<'a> WpaCli<'a> {
.arg(self.interface)
.arg("get")
.arg("country")
.invoke(ErrorKind::WifiError)
.invoke(ErrorKind::Wifi)
.await?;
Ok(CountryCode::for_alpha2(&String::from_utf8(r)?).unwrap())
}
@@ -320,7 +320,7 @@ impl<'a> WpaCli<'a> {
.arg(self.interface)
.arg("enable_network")
.arg(&id.0)
.invoke(ErrorKind::WifiError)
.invoke(ErrorKind::Wifi)
.await?;
Ok(())
}
@@ -329,7 +329,7 @@ impl<'a> WpaCli<'a> {
.arg("-i")
.arg(self.interface)
.arg("save_config")
.invoke(ErrorKind::WifiError)
.invoke(ErrorKind::Wifi)
.await?;
Ok(())
}
@@ -339,7 +339,7 @@ impl<'a> WpaCli<'a> {
.arg(self.interface)
.arg("remove_network")
.arg(&id.0)
.invoke(ErrorKind::WifiError)
.invoke(ErrorKind::Wifi)
.await?;
Ok(())
}
@@ -348,7 +348,7 @@ impl<'a> WpaCli<'a> {
.arg("-i")
.arg(self.interface)
.arg("reconfigure")
.invoke(ErrorKind::WifiError)
.invoke(ErrorKind::Wifi)
.await?;
Ok(())
}
@@ -357,7 +357,7 @@ impl<'a> WpaCli<'a> {
.arg("-i")
.arg(self.interface)
.arg("list_networks")
.invoke(ErrorKind::WifiError)
.invoke(ErrorKind::Wifi)
.await?;
Ok(String::from_utf8(r)?
.lines()
@@ -376,7 +376,7 @@ impl<'a> WpaCli<'a> {
.arg(self.interface)
.arg("select_network")
.arg(&id.0)
.invoke(ErrorKind::WifiError)
.invoke(ErrorKind::Wifi)
.await?;
Ok(())
}
@@ -387,7 +387,7 @@ impl<'a> WpaCli<'a> {
.arg("new_password")
.arg(&id.0)
.arg(pass)
.invoke(ErrorKind::WifiError)
.invoke(ErrorKind::Wifi)
.await?;
Ok(())
}
@@ -396,12 +396,12 @@ impl<'a> WpaCli<'a> {
.arg("-i")
.arg(self.interface)
.arg("signal_poll")
.invoke(ErrorKind::WifiError)
.invoke(ErrorKind::Wifi)
.await?;
let e = || {
Error::new(
anyhow::anyhow!("Invalid output from wpa_cli signal_poll"),
ErrorKind::WifiError,
ErrorKind::Wifi,
)
};
let output = String::from_utf8(r)?;
@@ -423,7 +423,7 @@ impl<'a> WpaCli<'a> {
match m_id {
None => Err(Error::new(
anyhow::anyhow!("SSID Not Found"),
ErrorKind::WifiError,
ErrorKind::Wifi,
)),
Some(x) => {
self.select_network_low(&x).await?;
@@ -456,7 +456,7 @@ impl<'a> WpaCli<'a> {
let r = Command::new("iwgetid")
.arg(self.interface)
.arg("--raw")
.invoke(ErrorKind::WifiError)
.invoke(ErrorKind::Wifi)
.await?;
let output = String::from_utf8(r)?;
if output.trim().is_empty() {
@@ -501,7 +501,7 @@ impl<'a> WpaCli<'a> {
pub async fn interface_connected(interface: &str) -> Result<bool, Error> {
let out = Command::new("ifconfig")
.arg(interface)
.invoke(ErrorKind::WifiError)
.invoke(ErrorKind::Wifi)
.await?;
let v = std::str::from_utf8(&out)?
.lines()
@@ -513,7 +513,7 @@ pub async fn interface_connected(interface: &str) -> Result<bool, Error> {
pub fn country_code_parse(code: &str, _matches: &ArgMatches<'_>) -> Result<CountryCode, Error> {
CountryCode::for_alpha2(code).or(Err(Error::new(
anyhow::anyhow!("Invalid Country Code: {}", code),
ErrorKind::WifiError,
ErrorKind::Wifi,
)))
}

4
appmgr/src/shutdown.rs Normal file
View File

@@ -0,0 +1,4 @@
#[derive(Debug, Clone)]
pub struct Shutdown {
restart: bool,
}

View File

@@ -153,7 +153,8 @@ pub async fn list(
.collect())
}
pub async fn sync_keys_from_db(pool: &Pool<Sqlite>, dest: &Path) -> Result<(), Error> {
pub async fn sync_keys_from_db<P: AsRef<Path>>(pool: &Pool<Sqlite>, dest: P) -> Result<(), Error> {
let dest = dest.as_ref();
let keys = sqlx::query!("SELECT openssh_pubkey FROM ssh_keys")
.fetch_all(pool)
.await?;

View File

@@ -5,6 +5,7 @@ use indexmap::IndexMap;
use serde::{Deserialize, Deserializer, Serialize};
use crate::action::ActionImplementation;
use crate::context::RpcContext;
use crate::id::Id;
use crate::s9pk::manifest::PackageId;
use crate::util::Version;
@@ -46,6 +47,7 @@ pub struct HealthChecks(pub IndexMap<HealthCheckId, HealthCheck>);
impl HealthChecks {
pub async fn check_all(
&self,
ctx: &RpcContext,
started: &DateTime<Utc>,
pkg_id: &PackageId,
pkg_version: &Version,
@@ -55,7 +57,7 @@ impl HealthChecks {
Ok::<_, Error>((
id.clone(),
check
.check(id, started, pkg_id, pkg_version, volumes)
.check(ctx, id, started, pkg_id, pkg_version, volumes)
.await?,
))
}))
@@ -73,6 +75,7 @@ pub struct HealthCheck {
impl HealthCheck {
pub async fn check(
&self,
ctx: &RpcContext,
id: &HealthCheckId,
started: &DateTime<Utc>,
pkg_id: &PackageId,
@@ -82,6 +85,7 @@ impl HealthCheck {
let res = self
.implementation
.execute(
ctx,
pkg_id,
pkg_version,
Some(&format!("{}Health", id)),

View File

@@ -109,13 +109,14 @@ pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> {
}
drop(db);
async fn main_status<Db: DbHandle>(
ctx: RpcContext,
status_model: StatusModel,
manifest: Arc<ModelData<Manifest>>,
mut db: Db,
) -> Result<MainStatus, Error> {
let mut status = status_model.get_mut(&mut db).await?;
status.main.check(&*manifest).await?;
status.main.check(&ctx, &*manifest).await?;
let res = status.main.clone();
@@ -125,25 +126,30 @@ pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> {
}
let (status_sender, mut statuses_recv) = tokio::sync::mpsc::channel(status_manifest.len() + 1);
let mut statuses = HashMap::with_capacity(status_manifest.len());
futures::stream::iter(status_manifest.into_iter().zip(pkg_ids.clone()))
.for_each_concurrent(None, move |((status, manifest), id)| {
let status_sender = status_sender.clone();
async move {
match tokio::spawn(main_status(status, manifest, ctx.db.handle()))
.await
.unwrap()
{
Err(e) => {
log::error!("Error running main health check for {}: {}", id, e);
log::debug!("{:?}", e);
}
Ok(status) => {
status_sender.send((id, status)).await.expect("unreachable");
}
futures::stream::iter(
status_manifest
.into_iter()
.zip(pkg_ids.clone())
.zip(std::iter::repeat(ctx)),
)
.for_each_concurrent(None, move |(((status, manifest), id), ctx)| {
let status_sender = status_sender.clone();
async move {
match tokio::spawn(main_status(ctx.clone(), status, manifest, ctx.db.handle()))
.await
.unwrap()
{
Err(e) => {
log::error!("Error running main health check for {}: {}", id, e);
log::debug!("{:?}", e);
}
Ok(status) => {
status_sender.send((id, status)).await.expect("unreachable");
}
}
})
.await;
}
})
.await;
while let Some((id, status)) = statuses_recv.recv().await {
statuses.insert(id, status);
}
@@ -246,12 +252,18 @@ impl MainStatus {
}
Ok(())
}
pub async fn check(&mut self, manifest: &Manifest) -> Result<(), Error> {
pub async fn check(&mut self, ctx: &RpcContext, manifest: &Manifest) -> Result<(), Error> {
match self {
MainStatus::Running { started, health } => {
*health = manifest
.health_checks
.check_all(started, &manifest.id, &manifest.version, &manifest.volumes)
.check_all(
ctx,
started,
&manifest.id,
&manifest.version,
&manifest.volumes,
)
.await?;
for (check, res) in health {
if matches!(
@@ -313,6 +325,7 @@ impl HasModel for DependencyErrors {
}
impl DependencyErrors {
pub async fn init<Db: DbHandle>(
ctx: &RpcContext,
db: &mut Db,
manifest: &Manifest,
current_dependencies: &IndexMap<PackageId, CurrentDependencyInfo>,
@@ -330,6 +343,7 @@ impl DependencyErrors {
)
})?
.satisfied(
ctx,
db,
dep_id,
None,

View File

@@ -16,7 +16,9 @@ use serde_json::Value;
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::sync::RwLock;
use tokio::task::{JoinError, JoinHandle};
use crate::shutdown::Shutdown;
use crate::{Error, ResultExt as _};
#[derive(Clone, Copy, Debug)]
@@ -339,17 +341,22 @@ pub fn serialize_display_opt<T: std::fmt::Display, S: Serializer>(
Option::<String>::serialize(&t.as_ref().map(|t| t.to_string()), serializer)
}
pub async fn daemon<F: Fn() -> Fut, Fut: Future<Output = ()> + Send + 'static>(
f: F,
pub async fn daemon<F: FnMut() -> Fut, Fut: Future<Output = ()> + Send + 'static>(
mut f: F,
cooldown: std::time::Duration,
) -> Result<Never, anyhow::Error> {
loop {
mut shutdown: tokio::sync::broadcast::Receiver<Option<Shutdown>>,
) -> Result<(), anyhow::Error> {
while matches!(
shutdown.try_recv(),
Err(tokio::sync::broadcast::error::TryRecvError::Empty)
) {
match tokio::spawn(f()).await {
Err(e) if e.is_panic() => return Err(anyhow!("daemon panicked!")),
_ => (),
}
tokio::time::sleep(cooldown).await
}
Ok(())
}
pub trait SOption<T> {}
@@ -999,3 +1006,28 @@ where
Deserialize::deserialize_in_place(deserializer, &mut place.data)
}
}
#[pin_project::pin_project(PinnedDrop)]
pub struct NonDetachingJoinHandle<T>(#[pin] JoinHandle<T>);
impl<T> From<JoinHandle<T>> for NonDetachingJoinHandle<T> {
fn from(t: JoinHandle<T>) -> Self {
NonDetachingJoinHandle(t)
}
}
#[pin_project::pinned_drop]
impl<T> PinnedDrop for NonDetachingJoinHandle<T> {
fn drop(self: std::pin::Pin<&mut Self>) {
let this = self.project();
this.0.into_ref().get_ref().abort()
}
}
impl<T> Future for NonDetachingJoinHandle<T> {
type Output = Result<T, JoinError>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
this.0.poll(cx)
}
}

View File

@@ -6,15 +6,14 @@ use indexmap::IndexMap;
use patch_db::{HasModel, Map, MapModel};
use serde::{Deserialize, Deserializer, Serialize};
use crate::context::RpcContext;
use crate::id::{Id, IdUnchecked};
use crate::net::interface::InterfaceId;
use crate::s9pk::manifest::PackageId;
use crate::util::Version;
use crate::Error;
pub mod disk;
pub const PKG_VOLUME_DIR: &'static str = "/mnt/embassy-os/volumes/package-data";
pub const PKG_VOLUME_DIR: &'static str = "main/volumes/package-data";
pub const BACKUP_DIR: &'static str = "/mnt/embassy-os-backups/EmbassyBackups";
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
@@ -76,21 +75,27 @@ impl<S: AsRef<str>> Serialize for VolumeId<S> {
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct Volumes(IndexMap<VolumeId, Volume>);
impl Volumes {
pub async fn install(&self, pkg_id: &PackageId, version: &Version) -> Result<(), Error> {
pub async fn install(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
version: &Version,
) -> Result<(), Error> {
for (volume_id, volume) in &self.0 {
volume.install(pkg_id, version, volume_id).await?; // TODO: concurrent?
volume.install(ctx, pkg_id, version, volume_id).await?; // TODO: concurrent?
}
Ok(())
}
pub fn get_path_for(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
version: &Version,
volume_id: &VolumeId,
) -> Option<PathBuf> {
self.0
.get(volume_id)
.map(|volume| volume.path_for(pkg_id, version, volume_id))
.map(|volume| volume.path_for(ctx, pkg_id, version, volume_id))
}
pub fn to_readonly(&self) -> Self {
Volumes(
@@ -154,25 +159,36 @@ pub enum Volume {
impl Volume {
pub async fn install(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
version: &Version,
volume_id: &VolumeId,
) -> Result<(), Error> {
match self {
Volume::Data { .. } => {
tokio::fs::create_dir_all(self.path_for(pkg_id, version, volume_id)).await?;
tokio::fs::create_dir_all(self.path_for(ctx, pkg_id, version, volume_id)).await?;
}
_ => (),
}
Ok(())
}
pub fn path_for(&self, pkg_id: &PackageId, version: &Version, volume_id: &VolumeId) -> PathBuf {
pub fn path_for(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
version: &Version,
volume_id: &VolumeId,
) -> PathBuf {
match self {
Volume::Data { .. } => Path::new(PKG_VOLUME_DIR)
Volume::Data { .. } => ctx
.datadir
.join(PKG_VOLUME_DIR)
.join(pkg_id)
.join("volumes")
.join(volume_id),
Volume::Assets {} => Path::new(PKG_VOLUME_DIR)
Volume::Assets {} => ctx
.datadir
.join(PKG_VOLUME_DIR)
.join(pkg_id)
.join("assets")
.join(version.as_str())
@@ -182,7 +198,9 @@ impl Volume {
volume_id,
path,
..
} => dbg!(Path::new(PKG_VOLUME_DIR)
} => dbg!(ctx
.datadir
.join(PKG_VOLUME_DIR)
.join(package_id)
.join("volumes")
.join(volume_id)
@@ -191,7 +209,9 @@ impl Volume {
} else {
path.as_ref()
})),
Volume::Certificate { interface_id } => Path::new(PKG_VOLUME_DIR)
Volume::Certificate { interface_id } => ctx
.datadir
.join(PKG_VOLUME_DIR)
.join(pkg_id)
.join("certificates")
.join(interface_id),