refactor dependency errors (#574)

* refactor dependency errors

* fix hostname after restart

* fix errors from testing

* fix updating of current_dependents

* fixes
This commit is contained in:
Aiden McClelland
2021-10-04 21:44:34 -06:00
parent b7e6729272
commit 4e9849cd38
16 changed files with 628 additions and 330 deletions

4
appmgr/Cargo.lock generated
View File

@@ -829,9 +829,9 @@ dependencies = [
[[package]] [[package]]
name = "emver" name = "emver"
version = "0.1.2" version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79b2c25504998d1069f80a8082e3c558acd7c103a2f2399306b12ed06fec5db8" checksum = "ed260c4d7efaec031b9c4f6c4d3cf136e3df2bbfe50925800236f5e847f28704"
dependencies = [ dependencies = [
"either", "either",
"fp-core", "fp-core",

View File

@@ -57,7 +57,7 @@ cookie_store = "0.15.0"
digest = "0.9.0" digest = "0.9.0"
divrem = "1.0.0" divrem = "1.0.0"
ed25519-dalek = { version = "1.0.1", features = ["serde"] } ed25519-dalek = { version = "1.0.1", features = ["serde"] }
emver = { version = "0.1.2", features = ["serde"] } emver = { version = "0.1.6", features = ["serde"] }
fd-lock-rs = "0.1.3" fd-lock-rs = "0.1.3"
futures = "0.3.17" futures = "0.3.17"
git-version = "0.3.5" git-version = "0.3.5"

View File

@@ -117,7 +117,7 @@ pub async fn login(
res.headers.insert( res.headers.insert(
"set-cookie", "set-cookie",
HeaderValue::from_str(&format!( HeaderValue::from_str(&format!(
"session={}; SameSite=Lax; Expires=Fri, 31 Dec 9999 23:59:59 GMT;", "session={}; Path=/; SameSite=Lax; Expires=Fri, 31 Dec 9999 23:59:59 GMT;",
token token
)) ))
.with_kind(crate::ErrorKind::Unknown)?, // Should be impossible, but don't want to panic .with_kind(crate::ErrorKind::Unknown)?, // Should be impossible, but don't want to panic

View File

@@ -63,6 +63,9 @@ async fn init(cfg_path: Option<&str>) -> Result<(), Error> {
}) })
.await .await
.with_kind(embassy::ErrorKind::Network)?; .with_kind(embassy::ErrorKind::Network)?;
let pool_name = ctx.zfs_pool_name.clone();
drop(ctx);
embassy::disk::main::export(&*pool_name).await?;
} }
embassy::disk::main::load( embassy::disk::main::load(

View File

@@ -19,11 +19,11 @@ use crate::db::model::{
}; };
use crate::db::util::WithRevision; use crate::db::util::WithRevision;
use crate::dependencies::{ use crate::dependencies::{
update_current_dependents, BreakageRes, DependencyError, TaggedDependencyError, break_transitive, update_current_dependents, BreakageRes, DependencyError, DependencyErrors,
TaggedDependencyError,
}; };
use crate::install::cleanup::update_dependents; use crate::install::cleanup::{remove_current_dependents, update_dependents};
use crate::s9pk::manifest::{Manifest, ManifestModel, PackageId}; use crate::s9pk::manifest::{Manifest, ManifestModel, PackageId};
use crate::status::{handle_broken_dependents, DependencyErrors};
use crate::util::{ use crate::util::{
display_none, display_serializable, parse_duration, parse_stdin_deserializable, IoFormat, display_none, display_serializable, parse_duration, parse_stdin_deserializable, IoFormat,
}; };
@@ -414,15 +414,16 @@ pub fn configure<'a, Db: DbHandle>(
} }
}) })
.collect(); .collect();
let mut deps = pkg_model.clone().current_dependencies().get_mut(db).await?;
*deps = current_dependencies.clone();
deps.save(db).await?;
res.signal res.signal
} else { } else {
None None
}; };
// update dependencies // update dependencies
let mut deps = pkg_model.clone().current_dependencies().get_mut(db).await?;
remove_current_dependents(db, id, deps.keys()).await?;
*deps = current_dependencies.clone();
deps.save(db).await?;
update_current_dependents(db, id, &current_dependencies).await?; update_current_dependents(db, id, &current_dependencies).await?;
let mut errs = pkg_model let mut errs = pkg_model
.clone() .clone()
@@ -431,7 +432,7 @@ pub fn configure<'a, Db: DbHandle>(
.get_mut(db) .get_mut(db)
.await?; .await?;
*errs = DependencyErrors::init(ctx, db, &*manifest, &current_dependencies).await?; *errs = DependencyErrors::init(ctx, db, &*manifest, &current_dependencies).await?;
errs.save(db).await; errs.save(db).await?;
// cache current config for dependents // cache current config for dependents
overrides.insert(id.clone(), config.clone()); overrides.insert(id.clone(), config.clone());
@@ -471,15 +472,7 @@ pub fn configure<'a, Db: DbHandle>(
.await? .await?
{ {
let dep_err = DependencyError::ConfigUnsatisfied { error }; let dep_err = DependencyError::ConfigUnsatisfied { error };
handle_broken_dependents( break_transitive(db, dependent, id, dep_err, breakages).await?;
db,
dependent,
id,
dependent_model,
dep_err,
breakages,
)
.await?;
} }
// handle backreferences // handle backreferences
@@ -492,17 +485,10 @@ pub fn configure<'a, Db: DbHandle>(
.await .await
{ {
if e.kind == crate::ErrorKind::ConfigRulesViolation { if e.kind == crate::ErrorKind::ConfigRulesViolation {
let dependent_model = crate::db::DatabaseModel::new() break_transitive(
.package_data()
.idx_model(dependent)
.and_then(|pkg| pkg.installed())
.expect(db)
.await?;
handle_broken_dependents(
db, db,
dependent, dependent,
id, id,
dependent_model,
DependencyError::ConfigUnsatisfied { DependencyError::ConfigUnsatisfied {
error: format!("{}", e), error: format!("{}", e),
}, },

View File

@@ -7,9 +7,13 @@ use rpc_toolkit::command;
use crate::context::RpcContext; use crate::context::RpcContext;
use crate::db::util::WithRevision; use crate::db::util::WithRevision;
use crate::dependencies::{
break_all_dependents_transitive, heal_all_dependents_transitive, BreakageRes, DependencyError,
TaggedDependencyError,
};
use crate::s9pk::manifest::PackageId; use crate::s9pk::manifest::PackageId;
use crate::status::MainStatus; use crate::status::MainStatus;
use crate::util::display_none; use crate::util::{display_none, display_serializable};
use crate::{Error, ResultExt}; use crate::{Error, ResultExt};
#[command(display(display_none))] #[command(display(display_none))]
@@ -46,12 +50,17 @@ pub async fn start(
}; };
status status
.synchronize( .synchronize(
&*ctx.managers.get(&(id, version)).await.ok_or_else(|| { &*ctx
Error::new(anyhow!("Manager not found"), crate::ErrorKind::Docker) .managers
})?, .get(&(id.clone(), version))
.await
.ok_or_else(|| {
Error::new(anyhow!("Manager not found"), crate::ErrorKind::Docker)
})?,
) )
.await?; .await?;
status.save(&mut tx).await?; status.save(&mut tx).await?;
heal_all_dependents_transitive(&ctx, &mut tx, &id).await?;
Ok(WithRevision { Ok(WithRevision {
revision: tx.commit(None).await?, revision: tx.commit(None).await?,
@@ -59,19 +68,16 @@ pub async fn start(
}) })
} }
#[command(display(display_none))] async fn stop_common<Db: DbHandle>(
pub async fn stop( db: &mut Db,
#[context] ctx: RpcContext, id: &PackageId,
#[arg] id: PackageId, breakages: &mut BTreeMap<PackageId, TaggedDependencyError>,
) -> Result<WithRevision<()>, Error> { ) -> Result<(), Error> {
let mut db = ctx.db.handle();
let mut tx = db.begin().await?;
let mut status = crate::db::DatabaseModel::new() let mut status = crate::db::DatabaseModel::new()
.package_data() .package_data()
.idx_model(&id) .idx_model(&id)
.and_then(|pkg| pkg.installed()) .and_then(|pkg| pkg.installed())
.expect(&mut tx) .expect(db)
.await .await
.with_ctx(|_| { .with_ctx(|_| {
( (
@@ -81,11 +87,43 @@ pub async fn stop(
})? })?
.status() .status()
.main() .main()
.get_mut(&mut tx) .get_mut(db)
.await?; .await?;
*status = MainStatus::Stopping; *status = MainStatus::Stopping;
status.save(&mut tx).await?; status.save(db).await?;
break_all_dependents_transitive(db, &id, DependencyError::NotRunning, breakages).await?;
Ok(())
}
#[command(subcommands(self(stop_impl(async)), stop_dry), display(display_none))]
pub fn stop(#[arg] id: PackageId) -> Result<PackageId, Error> {
Ok(id)
}
#[command(rename = "dry", display(display_serializable))]
pub async fn stop_dry(
#[context] ctx: RpcContext,
#[parent_data] id: PackageId,
) -> Result<BreakageRes, Error> {
let mut db = ctx.db.handle();
let mut tx = db.begin().await?;
let mut breakages = BTreeMap::new();
stop_common(&mut tx, &id, &mut breakages).await?;
Ok(BreakageRes {
breakages,
patch: tx.abort().await?,
})
}
pub async fn stop_impl(ctx: RpcContext, id: PackageId) -> Result<WithRevision<()>, Error> {
let mut db = ctx.db.handle();
let mut tx = db.begin().await?;
stop_common(&mut tx, &id, &mut BTreeMap::new()).await?;
Ok(WithRevision { Ok(WithRevision {
revision: tx.commit(None).await?, revision: tx.commit(None).await?,

View File

@@ -1,10 +1,12 @@
pub mod model; pub mod model;
pub mod util; pub mod util;
use std::borrow::Cow;
use std::future::Future; use std::future::Future;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use anyhow::anyhow;
use futures::{FutureExt, SinkExt, StreamExt}; use futures::{FutureExt, SinkExt, StreamExt};
use patch_db::json_ptr::JsonPointer; use patch_db::json_ptr::JsonPointer;
use patch_db::{Dump, Revision}; use patch_db::{Dump, Revision};
@@ -21,6 +23,7 @@ use tokio_tungstenite::WebSocketStream;
pub use self::model::DatabaseModel; pub use self::model::DatabaseModel;
use self::util::WithRevision; use self::util::WithRevision;
use crate::context::RpcContext; use crate::context::RpcContext;
use crate::middleware::auth::hash_token;
use crate::util::{display_serializable, GeneralGuard, IoFormat}; use crate::util::{display_serializable, GeneralGuard, IoFormat};
use crate::{Error, ResultExt}; use crate::{Error, ResultExt};
@@ -39,7 +42,7 @@ async fn ws_handler<
// add 1 to the session counter and issue an RAII guard to subtract 1 on drop // add 1 to the session counter and issue an RAII guard to subtract 1 on drop
ctx.websocket_count ctx.websocket_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst); .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let decrementer = GeneralGuard::new(|| { let _decrementer = GeneralGuard::new(|| {
let new_count = ctx let new_count = ctx
.websocket_count .websocket_count
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
@@ -51,13 +54,22 @@ async fn ws_handler<
}); });
loop { loop {
if let Some(Message::Text(_)) = stream if let Some(Message::Text(cookie)) = stream
.next() .next()
.await .await
.transpose() .transpose()
.with_kind(crate::ErrorKind::Network)? .with_kind(crate::ErrorKind::Network)?
{ {
// TODO: check auth let cookie_str = serde_json::from_str::<Cow<str>>(&cookie)
.with_kind(crate::ErrorKind::Deserialization)?;
let id = basic_cookies::Cookie::parse(&cookie_str)
.with_kind(crate::ErrorKind::Authorization)?
.into_iter()
.find(|c| c.get_name() == "session")
.ok_or_else(|| {
Error::new(anyhow!("UNAUTHORIZED"), crate::ErrorKind::Authorization)
})?;
crate::middleware::auth::is_authed(&ctx, &hash_token(id.get_value())).await?;
break; break;
} }
} }

View File

@@ -2,6 +2,8 @@ use std::collections::BTreeMap;
use anyhow::anyhow; use anyhow::anyhow;
use emver::VersionRange; use emver::VersionRange;
use futures::future::BoxFuture;
use futures::FutureExt;
use patch_db::{DbHandle, DiffPatch, HasModel, Map, MapModel}; use patch_db::{DbHandle, DiffPatch, HasModel, Map, MapModel};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -9,9 +11,9 @@ use crate::action::{ActionImplementation, NoOutput};
use crate::config::Config; use crate::config::Config;
use crate::context::RpcContext; use crate::context::RpcContext;
use crate::db::model::CurrentDependencyInfo; use crate::db::model::CurrentDependencyInfo;
use crate::s9pk::manifest::PackageId; use crate::s9pk::manifest::{Manifest, PackageId};
use crate::status::health_check::{HealthCheckId, HealthCheckResult, HealthCheckResultVariant}; use crate::status::health_check::{HealthCheckId, HealthCheckResult, HealthCheckResultVariant};
use crate::status::MainStatus; use crate::status::{MainStatus, Status};
use crate::util::Version; use crate::util::Version;
use crate::volume::Volumes; use crate::volume::Volumes;
use crate::Error; use crate::Error;
@@ -21,59 +23,278 @@ use crate::Error;
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum DependencyError { pub enum DependencyError {
NotInstalled, // { "type": "not-installed" } NotInstalled, // { "type": "not-installed" }
#[serde(rename_all = "kebab-case")]
IncorrectVersion { IncorrectVersion {
expected: VersionRange, expected: VersionRange,
received: Version, received: Version,
}, // { "type": "incorrect-version", "expected": "0.1.0", "received": "^0.2.0" } }, // { "type": "incorrect-version", "expected": "0.1.0", "received": "^0.2.0" }
#[serde(rename_all = "kebab-case")]
ConfigUnsatisfied { ConfigUnsatisfied {
error: String, error: String,
}, // { "type": "config-unsatisfied", "error": "Bitcoin Core must have pruning set to manual." } }, // { "type": "config-unsatisfied", "error": "Bitcoin Core must have pruning set to manual." }
NotRunning, // { "type": "not-running" } NotRunning, // { "type": "not-running" }
#[serde(rename_all = "kebab-case")]
HealthChecksFailed { HealthChecksFailed {
failures: BTreeMap<HealthCheckId, HealthCheckResult>, failures: BTreeMap<HealthCheckId, HealthCheckResult>,
}, // { "type": "health-checks-failed", "checks": { "rpc": { "time": "2021-05-11T18:21:29Z", "result": "warming-up" } } } }, // { "type": "health-checks-failed", "checks": { "rpc": { "time": "2021-05-11T18:21:29Z", "result": "starting" } } }
#[serde(rename_all = "kebab-case")]
Transitive, // { "type": "transitive" }
} }
impl DependencyError { impl DependencyError {
pub fn merge_with(self, other: DependencyError) -> DependencyError { pub fn merge_with(self, other: DependencyError) -> DependencyError {
use DependencyError::*;
match (self, other) { match (self, other) {
(NotInstalled, _) => NotInstalled, (DependencyError::NotInstalled, _) | (_, DependencyError::NotInstalled) => {
(_, NotInstalled) => NotInstalled, DependencyError::NotInstalled
(IncorrectVersion { expected, received }, _) => IncorrectVersion { expected, received },
(_, IncorrectVersion { expected, received }) => IncorrectVersion { expected, received },
(ConfigUnsatisfied { error: e0 }, ConfigUnsatisfied { error: e1 }) => {
ConfigUnsatisfied {
error: e0 + "\n" + &e1,
}
} }
(ConfigUnsatisfied { error }, _) => ConfigUnsatisfied { error }, (DependencyError::IncorrectVersion { expected, received }, _)
(_, ConfigUnsatisfied { error }) => ConfigUnsatisfied { error }, | (_, DependencyError::IncorrectVersion { expected, received }) => {
(NotRunning, _) => NotRunning, DependencyError::IncorrectVersion { expected, received }
(_, NotRunning) => NotRunning, }
(HealthChecksFailed { failures: f0 }, HealthChecksFailed { failures: f1 }) => { (
HealthChecksFailed { DependencyError::ConfigUnsatisfied { error: e0 },
failures: f0.into_iter().chain(f1.into_iter()).collect(), DependencyError::ConfigUnsatisfied { error: e1 },
} ) => DependencyError::ConfigUnsatisfied {
error: e0 + "\n" + &e1,
},
(DependencyError::ConfigUnsatisfied { error }, _)
| (_, DependencyError::ConfigUnsatisfied { error }) => {
DependencyError::ConfigUnsatisfied { error }
}
(DependencyError::NotRunning, _) | (_, DependencyError::NotRunning) => {
DependencyError::NotRunning
}
(
DependencyError::HealthChecksFailed { failures: f0 },
DependencyError::HealthChecksFailed { failures: f1 },
) => DependencyError::HealthChecksFailed {
failures: f0.into_iter().chain(f1.into_iter()).collect(),
},
(DependencyError::HealthChecksFailed { failures }, _)
| (_, DependencyError::HealthChecksFailed { failures }) => {
DependencyError::HealthChecksFailed { failures }
}
(DependencyError::Transitive, _) | (_, DependencyError::Transitive) => {
DependencyError::Transitive
} }
} }
} }
pub fn try_heal<'a, Db: DbHandle>(
self,
ctx: &'a RpcContext,
db: &'a mut Db,
id: &'a PackageId,
dependency: &'a PackageId,
mut dependency_config: Option<Config>,
info: &'a DepInfo,
) -> BoxFuture<'a, Result<Option<Self>, Error>> {
async move {
Ok(match self {
DependencyError::NotInstalled => {
if crate::db::DatabaseModel::new()
.package_data()
.idx_model(dependency)
.and_then(|m| m.installed())
.exists(db, true)
.await?
{
DependencyError::IncorrectVersion {
expected: info.version.clone(),
received: Default::default(),
}
.try_heal(ctx, db, id, dependency, dependency_config, info)
.await?
} else {
Some(DependencyError::NotInstalled)
}
}
DependencyError::IncorrectVersion { expected, .. } => {
let version: Version = crate::db::DatabaseModel::new()
.package_data()
.idx_model(dependency)
.and_then(|m| m.installed())
.map(|m| m.manifest().version())
.get(db, true)
.await?
.into_owned()
.unwrap_or_default();
if version.satisfies(&expected) {
DependencyError::ConfigUnsatisfied {
error: String::new(),
}
.try_heal(ctx, db, id, dependency, dependency_config, info)
.await?
} else {
Some(DependencyError::IncorrectVersion {
expected,
received: version,
})
}
}
DependencyError::ConfigUnsatisfied { .. } => {
let dependent_manifest = crate::db::DatabaseModel::new()
.package_data()
.idx_model(id)
.and_then(|m| m.installed())
.map::<_, Manifest>(|m| m.manifest())
.expect(db)
.await?
.get(db, true)
.await?;
let dependency_manifest = crate::db::DatabaseModel::new()
.package_data()
.idx_model(dependency)
.and_then(|m| m.installed())
.map::<_, Manifest>(|m| m.manifest())
.expect(db)
.await?
.get(db, true)
.await?;
let dependency_config = if let Some(cfg) = dependency_config.take() {
cfg
} else if let Some(cfg_info) = &dependency_manifest.config {
cfg_info
.get(
ctx,
dependency,
&dependency_manifest.version,
&dependency_manifest.volumes,
)
.await?
.config
.unwrap_or_default()
} else {
Config::default()
};
if let Some(cfg_req) = &info.config {
if let Err(e) = cfg_req
.check(
ctx,
id,
&dependent_manifest.version,
&dependent_manifest.volumes,
&dependency_config,
)
.await
{
if e.kind == crate::ErrorKind::ConfigRulesViolation {
return Ok(Some(DependencyError::ConfigUnsatisfied {
error: format!("{}", e),
}));
} else {
return Err(e);
}
}
}
DependencyError::NotRunning
.try_heal(ctx, db, id, dependency, Some(dependency_config), info)
.await?
}
DependencyError::NotRunning => {
let status = crate::db::DatabaseModel::new()
.package_data()
.idx_model(dependency)
.and_then(|m| m.installed())
.map::<_, Status>(|m| m.status())
.expect(db)
.await?
.get(db, true)
.await?;
if status.main.running() {
DependencyError::HealthChecksFailed {
failures: BTreeMap::new(),
}
.try_heal(ctx, db, id, dependency, dependency_config, info)
.await?
} else {
Some(DependencyError::NotRunning)
}
}
DependencyError::HealthChecksFailed { .. } => {
let status = crate::db::DatabaseModel::new()
.package_data()
.idx_model(dependency)
.and_then(|m| m.installed())
.map::<_, Status>(|m| m.status())
.expect(db)
.await?
.get(db, true)
.await?
.into_owned();
match status.main {
MainStatus::BackingUp {
started: Some(_),
health,
}
| MainStatus::Running { health, .. } => {
let mut failures = BTreeMap::new();
for (check, res) in health {
if !matches!(res.result, HealthCheckResultVariant::Success)
&& crate::db::DatabaseModel::new()
.package_data()
.idx_model(id)
.and_then(|m| m.installed())
.and_then::<_, CurrentDependencyInfo>(|m| {
m.current_dependencies().idx_model(dependency)
})
.get(db, true)
.await?
.into_owned()
.map(|i| i.health_checks)
.unwrap_or_default()
.contains(&check)
{
failures.insert(check.clone(), res.clone());
}
}
if !failures.is_empty() {
Some(DependencyError::HealthChecksFailed { failures })
} else {
DependencyError::Transitive
.try_heal(ctx, db, id, dependency, dependency_config, info)
.await?
}
}
_ => return Ok(Some(DependencyError::NotRunning)),
}
}
DependencyError::Transitive => {
if crate::db::DatabaseModel::new()
.package_data()
.idx_model(dependency)
.and_then(|m| m.installed())
.map::<_, DependencyErrors>(|m| m.status().dependency_errors())
.get(db, true)
.await?
.into_owned()
.unwrap_or_default()
.0
.is_empty()
{
None
} else {
Some(DependencyError::Transitive)
}
}
})
}
.boxed()
}
} }
impl std::fmt::Display for DependencyError { impl std::fmt::Display for DependencyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use DependencyError::*;
match self { match self {
NotInstalled => write!(f, "Not Installed"), DependencyError::NotInstalled => write!(f, "Not Installed"),
IncorrectVersion { expected, received } => write!( DependencyError::IncorrectVersion { expected, received } => write!(
f, f,
"Incorrect Version: Expected {}, Received {}", "Incorrect Version: Expected {}, Received {}",
expected, expected,
received.as_str() received.as_str()
), ),
ConfigUnsatisfied { error } => { DependencyError::ConfigUnsatisfied { error } => {
write!(f, "Configuration Requirements Not Satisfied: {}", error) write!(f, "Configuration Requirements Not Satisfied: {}", error)
} }
NotRunning => write!(f, "Not Running"), DependencyError::NotRunning => write!(f, "Not Running"),
HealthChecksFailed { failures } => { DependencyError::HealthChecksFailed { failures } => {
write!(f, "Failed Health Check(s): ")?; write!(f, "Failed Health Check(s): ")?;
let mut comma = false; let mut comma = false;
for (check, res) in failures { for (check, res) in failures {
@@ -86,6 +307,9 @@ impl std::fmt::Display for DependencyError {
} }
Ok(()) Ok(())
} }
DependencyError::Transitive => {
write!(f, "Dependency Error(s)")
}
} }
} }
} }
@@ -150,79 +374,24 @@ impl DepInfo {
dependency_id: &PackageId, dependency_id: &PackageId,
dependency_config: Option<Config>, // fetch if none dependency_config: Option<Config>, // fetch if none
dependent_id: &PackageId, dependent_id: &PackageId,
dependent_version: &Version,
dependent_volumes: &Volumes,
) -> Result<Result<(), DependencyError>, Error> { ) -> Result<Result<(), DependencyError>, Error> {
let (manifest, info) = if let Some(dep_model) = crate::db::DatabaseModel::new() Ok(
.package_data() if let Some(err) = DependencyError::NotInstalled
.idx_model(dependency_id) .try_heal(
.and_then(|pde| pde.installed())
.check(db)
.await?
{
(
dep_model.clone().manifest().get(db, true).await?,
dep_model.get(db, true).await?,
)
} else {
return Ok(Err(DependencyError::NotInstalled));
};
if !&manifest.version.satisfies(&self.version) {
return Ok(Err(DependencyError::IncorrectVersion {
expected: self.version.clone(),
received: manifest.version.clone(),
}));
}
let dependency_config = if let Some(cfg) = dependency_config {
cfg
} else if let Some(cfg_info) = &manifest.config {
cfg_info
.get(ctx, dependency_id, &manifest.version, &manifest.volumes)
.await?
.config
.unwrap_or_default()
} else {
Config::default()
};
if let Some(cfg_req) = &self.config {
if let Err(e) = cfg_req
.check(
ctx, ctx,
db,
dependent_id, dependent_id,
dependent_version, dependency_id,
dependent_volumes, dependency_config,
&dependency_config, self,
) )
.await .await?
{ {
if e.kind == crate::ErrorKind::ConfigRulesViolation { Err(err)
return Ok(Err(DependencyError::ConfigUnsatisfied { } else {
error: format!("{}", e), Ok(())
})); },
} else { )
return Err(e);
}
}
}
match &info.status.main {
MainStatus::BackingUp {
started: Some(_),
health,
}
| MainStatus::Running { health, .. } => {
let mut failures = BTreeMap::new();
for (check, res) in health {
if !matches!(res.result, HealthCheckResultVariant::Success) {
failures.insert(check.clone(), res.clone());
}
}
if !failures.is_empty() {
return Ok(Err(DependencyError::HealthChecksFailed { failures }));
}
}
_ => return Ok(Err(DependencyError::NotRunning)),
}
Ok(Ok(()))
} }
} }
@@ -303,3 +472,202 @@ pub async fn update_current_dependents<
} }
Ok(()) Ok(())
} }
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct DependencyErrors(pub BTreeMap<PackageId, DependencyError>);
impl Map for DependencyErrors {
type Key = PackageId;
type Value = DependencyError;
fn get(&self, key: &Self::Key) -> Option<&Self::Value> {
self.0.get(key)
}
}
impl HasModel for DependencyErrors {
type Model = MapModel<Self>;
}
impl DependencyErrors {
pub async fn init<Db: DbHandle>(
ctx: &RpcContext,
db: &mut Db,
manifest: &Manifest,
current_dependencies: &BTreeMap<PackageId, CurrentDependencyInfo>,
) -> Result<DependencyErrors, Error> {
let mut res = BTreeMap::new();
for (dependency_id, info) in current_dependencies.keys().filter_map(|dependency_id| {
manifest
.dependencies
.0
.get(dependency_id)
.map(|info| (dependency_id, info))
}) {
if let Err(e) = info
.satisfied(ctx, db, dependency_id, None, &manifest.id)
.await?
{
res.insert(dependency_id.clone(), e);
}
}
Ok(DependencyErrors(res))
}
}
impl std::fmt::Display for DependencyErrors {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{ ")?;
for (idx, (id, err)) in self.0.iter().enumerate() {
write!(f, "{}: {}", id, err)?;
if idx < self.0.len() - 1 {
// not last
write!(f, ", ")?;
}
}
write!(f, " }}")
}
}
pub async fn break_all_dependents_transitive<'a, Db: DbHandle>(
db: &'a mut Db,
id: &'a PackageId,
error: DependencyError,
breakages: &'a mut BTreeMap<PackageId, TaggedDependencyError>,
) -> Result<(), Error> {
for dependent in crate::db::DatabaseModel::new()
.package_data()
.idx_model(id)
.and_then(|m| m.installed())
.expect(db)
.await?
.current_dependents()
.keys(db, true)
.await?
{
break_transitive(db, &dependent, id, error.clone(), breakages).await?;
}
Ok(())
}
pub fn break_transitive<'a, Db: DbHandle>(
db: &'a mut Db,
id: &'a PackageId,
dependency: &'a PackageId,
error: DependencyError,
breakages: &'a mut BTreeMap<PackageId, TaggedDependencyError>,
) -> BoxFuture<'a, Result<(), Error>> {
async move {
let model = crate::db::DatabaseModel::new()
.package_data()
.idx_model(id)
.and_then(|m| m.installed())
.expect(db)
.await?;
let mut status = model.clone().status().get_mut(db).await?;
let old = status.dependency_errors.0.remove(dependency);
let newly_broken = old.is_none();
status.dependency_errors.0.insert(
dependency.clone(),
if let Some(old) = old {
old.merge_with(error.clone())
} else {
error.clone()
},
);
if newly_broken {
breakages.insert(
id.clone(),
TaggedDependencyError {
dependency: dependency.clone(),
error: error.clone(),
},
);
if status.main.running() {
let transitive_error = if model
.clone()
.manifest()
.dependencies()
.idx_model(dependency)
.get(db, true)
.await?
.into_owned()
.ok_or_else(|| {
Error::new(
anyhow!("{} not in listed dependencies", dependency),
crate::ErrorKind::Database,
)
})?
.critical
{
status.main.stop();
DependencyError::NotRunning
} else {
DependencyError::Transitive
};
break_all_dependents_transitive(db, id, transitive_error, breakages).await?;
}
}
status.save(db).await?;
Ok(())
}
.boxed()
}
pub async fn heal_all_dependents_transitive<'a, Db: DbHandle>(
ctx: &'a RpcContext,
db: &'a mut Db,
id: &'a PackageId,
) -> Result<(), Error> {
for dependent in crate::db::DatabaseModel::new()
.package_data()
.idx_model(id)
.and_then(|m| m.installed())
.expect(db)
.await?
.current_dependents()
.keys(db, true)
.await?
{
heal_transitive(ctx, db, &dependent, id).await?;
}
Ok(())
}
pub fn heal_transitive<'a, Db: DbHandle>(
ctx: &'a RpcContext,
db: &'a mut Db,
id: &'a PackageId,
dependency: &'a PackageId,
) -> BoxFuture<'a, Result<(), Error>> {
async move {
let model = crate::db::DatabaseModel::new()
.package_data()
.idx_model(id)
.and_then(|m| m.installed())
.expect(db)
.await?;
let mut status = model.clone().status().get_mut(db).await?;
let old = status.dependency_errors.0.remove(dependency);
if let Some(old) = old {
let info = model
.manifest()
.dependencies()
.idx_model(dependency)
.expect(db)
.await?
.get(db, true)
.await?;
if let Some(new) = old.try_heal(ctx, db, id, dependency, None, &*info).await? {
status.dependency_errors.0.insert(dependency.clone(), new);
} else {
heal_all_dependents_transitive(ctx, db, id).await?;
}
}
status.save(db).await?;
Ok(())
}
.boxed()
}

View File

@@ -38,8 +38,13 @@ pub async fn get_id() -> Result<String, Error> {
Ok(hex::encode(&res[0..4])) Ok(hex::encode(&res[0..4]))
} }
// cat /embassy-os/product_key.txt | shasum -a 256 | head -c 8 | awk '{print "start9-"$1}' | xargs hostnamectl set-hostname // cat /embassy-os/product_key.txt | shasum -a 256 | head -c 8 | awk '{print "start9-"$1}' | xargs hostnamectl set-hostname && systemctl restart avahi-daemon
pub async fn sync_hostname() -> Result<(), Error> { pub async fn sync_hostname() -> Result<(), Error> {
set_hostname(&format!("start9-{}", get_id().await?)).await?; set_hostname(&format!("start9-{}", get_id().await?)).await?;
Command::new("systemctl")
.arg("restart")
.arg("avahi-daemon")
.invoke(crate::ErrorKind::Network)
.await?;
Ok(()) Ok(())
} }

View File

@@ -1,11 +1,12 @@
use std::collections::HashMap; use std::collections::{BTreeMap, HashMap};
use bollard::image::ListImagesOptions; use bollard::image::ListImagesOptions;
use patch_db::{DbHandle, PatchDbHandle}; use patch_db::{DbHandle, PatchDbHandle};
use super::PKG_DOCKER_DIR; use super::PKG_DOCKER_DIR;
use crate::context::RpcContext; use crate::context::RpcContext;
use crate::db::model::{InstalledPackageDataEntry, PackageDataEntry}; use crate::db::model::{CurrentDependencyInfo, InstalledPackageDataEntry, PackageDataEntry};
use crate::dependencies::update_current_dependents;
use crate::s9pk::manifest::PackageId; use crate::s9pk::manifest::PackageId;
use crate::util::Version; use crate::util::Version;
use crate::Error; use crate::Error;
@@ -29,8 +30,7 @@ pub async fn update_dependents<'a, Db: DbHandle, I: IntoIterator<Item = &'a Pack
.get(db, true) .get(db, true)
.await?; .await?;
if let Err(e) = if let Some(info) = man.dependencies.0.get(id) { if let Err(e) = if let Some(info) = man.dependencies.0.get(id) {
info.satisfied(ctx, db, id, None, dep, &man.version, &man.volumes) info.satisfied(ctx, db, id, None, dep).await?
.await?
} else { } else {
Ok(()) Ok(())
} { } {
@@ -167,6 +167,26 @@ pub async fn cleanup_failed<Db: DbHandle>(
Ok(()) Ok(())
} }
pub async fn remove_current_dependents<'a, Db: DbHandle, I: IntoIterator<Item = &'a PackageId>>(
db: &mut Db,
id: &PackageId,
current_dependencies: I,
) -> Result<(), Error> {
for dep in current_dependencies {
if let Some(current_dependents) = crate::db::DatabaseModel::new()
.package_data()
.idx_model(dep)
.and_then(|m| m.installed())
.map::<_, BTreeMap<PackageId, CurrentDependencyInfo>>(|m| m.current_dependents())
.check(db)
.await?
{
current_dependents.remove(db, id).await?
}
}
Ok(())
}
pub async fn uninstall( pub async fn uninstall(
ctx: &RpcContext, ctx: &RpcContext,
db: &mut PatchDbHandle, db: &mut PatchDbHandle,
@@ -178,6 +198,12 @@ pub async fn uninstall(
.package_data() .package_data()
.remove(&mut tx, &entry.manifest.id) .remove(&mut tx, &entry.manifest.id)
.await?; .await?;
remove_current_dependents(
&mut tx,
&entry.manifest.id,
entry.current_dependencies.keys(),
)
.await?;
update_dependents( update_dependents(
ctx, ctx,
&mut tx, &mut tx,

View File

@@ -24,12 +24,15 @@ use crate::db::model::{
StaticFiles, StaticFiles,
}; };
use crate::db::util::WithRevision; use crate::db::util::WithRevision;
use crate::dependencies::{update_current_dependents, BreakageRes, DependencyError}; use crate::dependencies::{
break_all_dependents_transitive, update_current_dependents, BreakageRes, DependencyError,
DependencyErrors,
};
use crate::install::cleanup::{cleanup, update_dependents}; use crate::install::cleanup::{cleanup, update_dependents};
use crate::install::progress::{InstallProgress, InstallProgressTracker}; use crate::install::progress::{InstallProgress, InstallProgressTracker};
use crate::s9pk::manifest::{Manifest, PackageId}; use crate::s9pk::manifest::{Manifest, PackageId};
use crate::s9pk::reader::S9pkReader; use crate::s9pk::reader::S9pkReader;
use crate::status::{handle_broken_dependents, DependencyErrors, MainStatus, Status}; use crate::status::{MainStatus, Status};
use crate::util::io::copy_and_shutdown; use crate::util::io::copy_and_shutdown;
use crate::util::{display_none, display_serializable, AsyncFileExt, Version}; use crate::util::{display_none, display_serializable, AsyncFileExt, Version};
use crate::volume::asset_dir; use crate::volume::asset_dir;
@@ -134,38 +137,10 @@ pub async fn uninstall_dry(
#[parent_data] id: PackageId, #[parent_data] id: PackageId,
) -> Result<BreakageRes, Error> { ) -> Result<BreakageRes, Error> {
let mut db = ctx.db.handle(); let mut db = ctx.db.handle();
let deps = crate::db::DatabaseModel::new()
.package_data()
.idx_model(&id)
.expect(&mut db)
.await?
.installed()
.expect(&mut db)
.await?
.current_dependents()
.get(&mut db, true)
.await?;
let mut tx = db.begin().await?; let mut tx = db.begin().await?;
let mut breakages = BTreeMap::new(); let mut breakages = BTreeMap::new();
for dep_id in deps.keys() { break_all_dependents_transitive(&mut tx, &id, DependencyError::NotInstalled, &mut breakages)
let model = crate::db::DatabaseModel::new()
.package_data()
.idx_model(&dep_id)
.expect(&mut tx)
.await?
.installed()
.expect(&mut tx)
.await?;
handle_broken_dependents(
&mut tx,
dep_id,
&id,
model,
DependencyError::NotInstalled,
&mut breakages,
)
.await?; .await?;
}
Ok(BreakageRes { Ok(BreakageRes {
breakages, breakages,
@@ -575,7 +550,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
log::info!("Install {}@{}: Created manager", pkg_id, version); log::info!("Install {}@{}: Created manager", pkg_id, version);
let static_files = StaticFiles::local(pkg_id, version, manifest.assets.icon_type()); let static_files = StaticFiles::local(pkg_id, version, manifest.assets.icon_type());
let current_dependencies = manifest let current_dependencies: BTreeMap<_, _> = manifest
.dependencies .dependencies
.0 .0
.iter() .iter()
@@ -616,22 +591,21 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
status: Status { status: Status {
configured: manifest.config.is_none(), configured: manifest.config.is_none(),
main: MainStatus::Stopped, main: MainStatus::Stopped,
dependency_errors: DependencyErrors::init( dependency_errors: DependencyErrors::default(),
ctx,
&mut tx,
&manifest,
&current_dependencies,
)
.await?,
}, },
manifest: manifest.clone(), manifest: manifest.clone(),
system_pointers: Vec::new(), system_pointers: Vec::new(),
dependency_info, dependency_info,
current_dependents: current_dependents.clone(), current_dependents: current_dependents.clone(),
current_dependencies, current_dependencies: current_dependencies.clone(),
interface_addresses, interface_addresses,
}; };
let mut pde = model.expect(&mut tx).await?.get_mut(&mut tx).await?; let mut pde = model
.clone()
.expect(&mut tx)
.await?
.get_mut(&mut tx)
.await?;
let prev = std::mem::replace( let prev = std::mem::replace(
&mut *pde, &mut *pde,
PackageDataEntry::Installed { PackageDataEntry::Installed {
@@ -641,6 +615,18 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
}, },
); );
pde.save(&mut tx).await?; pde.save(&mut tx).await?;
let mut dep_errs = model
.expect(&mut tx)
.await?
.installed()
.expect(&mut tx)
.await?
.status()
.dependency_errors()
.get_mut(&mut tx)
.await?;
*dep_errs = DependencyErrors::init(ctx, &mut tx, &manifest, &current_dependencies).await?;
dep_errs.save(&mut tx).await?;
if let PackageDataEntry::Updating { if let PackageDataEntry::Updating {
installed: prev, installed: prev,

View File

@@ -2,7 +2,7 @@ use anyhow::anyhow;
use basic_cookies::Cookie; use basic_cookies::Cookie;
use digest::Digest; use digest::Digest;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::FutureExt; use futures::{FutureExt, TryFutureExt};
use http::StatusCode; use http::StatusCode;
use rpc_toolkit::command_helpers::prelude::RequestParts; use rpc_toolkit::command_helpers::prelude::RequestParts;
use rpc_toolkit::hyper::header::COOKIE; use rpc_toolkit::hyper::header::COOKIE;
@@ -44,8 +44,7 @@ pub fn hash_token(token: &str) -> String {
.to_lowercase() .to_lowercase()
} }
pub async fn is_authed(ctx: &RpcContext, req: &RequestParts) -> Result<(), Error> { pub async fn is_authed(ctx: &RpcContext, id: &str) -> Result<(), Error> {
let id = get_id(req)?;
let session = sqlx::query!("UPDATE session SET last_active = CURRENT_TIMESTAMP WHERE id = ? AND logged_out IS NULL OR logged_out > CURRENT_TIMESTAMP", id) let session = sqlx::query!("UPDATE session SET last_active = CURRENT_TIMESTAMP WHERE id = ? AND logged_out IS NULL OR logged_out > CURRENT_TIMESTAMP", id)
.execute(&mut ctx.secret_store.acquire().await?) .execute(&mut ctx.secret_store.acquire().await?)
.await?; .await?;
@@ -73,7 +72,10 @@ pub fn auth<M: Metadata>(ctx: RpcContext) -> DynMiddleware<M> {
.get(rpc_req.method.as_str(), "authenticated") .get(rpc_req.method.as_str(), "authenticated")
.unwrap_or(true) .unwrap_or(true)
{ {
if let Err(e) = is_authed(&ctx, req).await { if let Err(e) = async { get_id(req) }
.and_then(|id| async move { is_authed(&ctx, &id).await })
.await
{
let (res_parts, _) = Response::new(()).into_parts(); let (res_parts, _) = Response::new(()).into_parts();
return Ok(Err(to_response( return Ok(Err(to_response(
&req.headers, &req.headers,

View File

@@ -103,7 +103,7 @@ pub async fn execute_inner(
let mut guid_file = File::create("/embassy-os/disk.guid").await?; let mut guid_file = File::create("/embassy-os/disk.guid").await?;
guid_file.write_all(guid.as_bytes()).await?; guid_file.write_all(guid.as_bytes()).await?;
guid_file.sync_all().await?; guid_file.sync_all().await?;
crate::disk::main::export(&ctx.zfs_pool_name).await?; sqlite_pool.close().await;
ctx.shutdown.send(()).expect("failed to shutdown"); ctx.shutdown.send(()).expect("failed to shutdown");

View File

@@ -11,7 +11,9 @@ use serde::{Deserialize, Serialize};
use self::health_check::{HealthCheckId, HealthCheckResult}; use self::health_check::{HealthCheckId, HealthCheckResult};
use crate::context::RpcContext; use crate::context::RpcContext;
use crate::db::model::{CurrentDependencyInfo, InstalledPackageDataEntryModel}; use crate::db::model::{CurrentDependencyInfo, InstalledPackageDataEntryModel};
use crate::dependencies::{DependencyError, TaggedDependencyError}; use crate::dependencies::{
break_transitive, DependencyError, DependencyErrors, TaggedDependencyError,
};
use crate::manager::{Manager, Status as ManagerStatus}; use crate::manager::{Manager, Status as ManagerStatus};
use crate::notifications::{notify, NotificationLevel, NotificationSubtype}; use crate::notifications::{notify, NotificationLevel, NotificationSubtype};
use crate::s9pk::manifest::{Manifest, PackageId}; use crate::s9pk::manifest::{Manifest, PackageId};
@@ -212,15 +214,7 @@ pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> {
} }
_ => Some(DependencyError::NotRunning), _ => Some(DependencyError::NotRunning),
} { } {
handle_broken_dependents( break_transitive(&mut db, id, dep_id, err, &mut BTreeMap::new()).await?;
&mut db,
id,
dep_id,
model.clone(),
err,
&mut BTreeMap::new(),
)
.await?;
} else { } else {
let mut errs = model let mut errs = model
.clone() .clone()
@@ -262,6 +256,7 @@ pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> {
pub struct Status { pub struct Status {
pub configured: bool, pub configured: bool,
pub main: MainStatus, pub main: MainStatus,
#[model]
pub dependency_errors: DependencyErrors, pub dependency_errors: DependencyErrors,
} }
@@ -391,131 +386,3 @@ impl MainStatus {
} }
} }
} }
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct DependencyErrors(pub BTreeMap<PackageId, DependencyError>);
impl Map for DependencyErrors {
type Key = PackageId;
type Value = DependencyError;
fn get(&self, key: &Self::Key) -> Option<&Self::Value> {
self.0.get(key)
}
}
impl HasModel for DependencyErrors {
type Model = MapModel<Self>;
}
impl DependencyErrors {
pub async fn init<Db: DbHandle>(
ctx: &RpcContext,
db: &mut Db,
manifest: &Manifest,
current_dependencies: &BTreeMap<PackageId, CurrentDependencyInfo>,
) -> Result<DependencyErrors, Error> {
let mut res = BTreeMap::new();
for (dep_id, info) in current_dependencies.keys().filter_map(|dep_id| {
manifest
.dependencies
.0
.get(dep_id)
.map(|info| (dep_id, info))
}) {
if let Err(e) = info
.satisfied(
ctx,
db,
dep_id,
None,
&manifest.id,
&manifest.version,
&manifest.volumes,
)
.await?
{
res.insert(dep_id.clone(), e);
}
}
Ok(DependencyErrors(res))
}
}
pub fn handle_broken_dependents<'a, Db: DbHandle>(
db: &'a mut Db,
id: &'a PackageId,
dependency: &'a PackageId,
model: InstalledPackageDataEntryModel,
error: DependencyError,
breakages: &'a mut BTreeMap<PackageId, TaggedDependencyError>,
) -> BoxFuture<'a, Result<(), Error>> {
async move {
let mut status = model.clone().status().get_mut(db).await?;
let old = status.dependency_errors.0.remove(id);
let newly_broken = old.is_none();
status.dependency_errors.0.insert(
id.clone(),
if let Some(old) = old {
old.merge_with(error.clone())
} else {
error.clone()
},
);
if newly_broken {
breakages.insert(
id.clone(),
TaggedDependencyError {
dependency: dependency.clone(),
error: error.clone(),
},
);
if status.main.running() {
if model
.clone()
.manifest()
.dependencies()
.idx_model(dependency)
.get(db, true)
.await?
.into_owned()
.ok_or_else(|| {
Error::new(
anyhow!("{} not in listed dependencies", dependency),
crate::ErrorKind::Database,
)
})?
.critical
{
status.main.stop();
let dependents = model.current_dependents().get(db, true).await?;
for (dependent, _) in &*dependents {
let dependent_model = crate::db::DatabaseModel::new()
.package_data()
.idx_model(dependent)
.and_then(|pkg| pkg.installed())
.check(db)
.await?
.ok_or_else(|| {
Error::new(
anyhow!("{} is not installed", dependent),
crate::ErrorKind::NotFound,
)
})?;
handle_broken_dependents(
db,
dependent,
id,
dependent_model,
DependencyError::NotRunning,
breakages,
)
.await?;
}
}
}
}
status.save(db).await?;
Ok(())
}
.boxed()
}

View File

@@ -365,6 +365,11 @@ impl From<Version> for emver::Version {
v.version v.version
} }
} }
impl Default for Version {
fn default() -> Self {
Self::from(emver::Version::default())
}
}
impl Deref for Version { impl Deref for Version {
type Target = emver::Version; type Target = emver::Version;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {