From 13f08242ddaea7b6f7ff9a2e37f019a3b495e043 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Thu, 9 Dec 2021 12:03:52 -0700 Subject: [PATCH] reduce health check locking --- appmgr/src/dependencies.rs | 34 ++----------- appmgr/src/manager/health.rs | 80 +++++++++++++++++++++---------- appmgr/src/status/health_check.rs | 14 ------ appmgr/src/status/mod.rs | 76 ++++------------------------- 4 files changed, 67 insertions(+), 137 deletions(-) diff --git a/appmgr/src/dependencies.rs b/appmgr/src/dependencies.rs index 9596e0ff8..585766b51 100644 --- a/appmgr/src/dependencies.rs +++ b/appmgr/src/dependencies.rs @@ -395,7 +395,6 @@ pub struct DepInfo { pub version: VersionRange, pub requirement: DependencyRequirement, pub description: Option, - pub critical: bool, #[serde(default)] #[model] pub config: Option, @@ -792,37 +791,10 @@ pub fn break_transitive<'a, Db: DbHandle>( error: error.clone(), }, ); - if status.main.running() { - let transitive_error = if model - .clone() - .manifest() - .dependencies() - .idx_model(dependency) - .get(&mut tx, true) - .await? - .into_owned() - .ok_or_else(|| { - Error::new( - eyre!("{} not in listed dependencies", dependency), - crate::ErrorKind::Database, - ) - })? - .critical - { - status.main.stop(); - DependencyError::NotRunning - } else { - DependencyError::Transitive - }; - status.save(&mut tx).await?; + status.save(&mut tx).await?; - tx.save().await?; - break_all_dependents_transitive(db, id, transitive_error, breakages).await?; - } else { - status.save(&mut tx).await?; - - tx.save().await?; - } + tx.save().await?; + break_all_dependents_transitive(db, id, DependencyError::Transitive, breakages).await?; } else { status.save(&mut tx).await?; diff --git a/appmgr/src/manager/health.rs b/appmgr/src/manager/health.rs index 4041fcb2e..965897e05 100644 --- a/appmgr/src/manager/health.rs +++ b/appmgr/src/manager/health.rs @@ -1,5 +1,5 @@ use std::collections::BTreeMap; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use patch_db::DbHandle; use tracing::instrument; @@ -20,51 +20,83 @@ pub async fn check( ) -> Result<(), Error> { let mut tx = db.begin().await?; + let mut checkpoint = tx.begin().await?; + let installed_model = crate::db::DatabaseModel::new() .package_data() .idx_model(id) - .expect(&mut tx) + .expect(&mut checkpoint) .await? .installed() - .expect(&mut tx) + .expect(&mut checkpoint) .await?; - let mut checkpoint = tx.begin().await?; - let manifest = installed_model .clone() .manifest() .get(&mut checkpoint, true) - .await?; + .await? + .into_owned(); - let mut status = installed_model + let started = installed_model .clone() .status() - .get_mut(&mut checkpoint) - .await?; - - status - .main - .check(&ctx, &mut checkpoint, &*manifest, should_commit) - .await?; - - let failed = match &status.main { - MainStatus::Running { health, .. } => health.clone(), - MainStatus::BackingUp { health, .. } => health.clone(), - _ => BTreeMap::new(), - }; - - status.save(&mut checkpoint).await?; + .main() + .started() + .get(&mut checkpoint, true) + .await? + .into_owned(); checkpoint.save().await?; + let health_results = if let Some(started) = started { + manifest + .health_checks + .check_all(ctx, started, id, &manifest.version, &manifest.volumes) + .await? + } else { + return Ok(()); + }; + + if !should_commit.load(Ordering::SeqCst) { + return Ok(()); + } + + let mut checkpoint = tx.begin().await?; + + let mut status = crate::db::DatabaseModel::new() + .package_data() + .idx_model(id) + .expect(&mut checkpoint) + .await? + .installed() + .expect(&mut checkpoint) + .await? + .status() + .main() + .get_mut(&mut checkpoint) + .await?; + + match &mut *status { + MainStatus::Running { health, .. } => { + *health = health_results.clone(); + } + _ => (), + } + + status.save(&mut checkpoint).await?; + let current_dependents = installed_model .current_dependents() - .get(&mut tx, true) + .get(&mut checkpoint, true) .await?; + + checkpoint.save().await?; + for (dependent, info) in &*current_dependents { - let failures: BTreeMap = failed + let failures: BTreeMap = health_results .iter() + .filter(|(_, hc_res)| !matches!(hc_res, HealthCheckResult::Success { .. })) .filter(|(hc_id, _)| info.health_checks.contains(hc_id)) .map(|(k, v)| (k.clone(), v.clone())) .collect(); diff --git a/appmgr/src/status/health_check.rs b/appmgr/src/status/health_check.rs index 12b06aa0a..628f09800 100644 --- a/appmgr/src/status/health_check.rs +++ b/appmgr/src/status/health_check.rs @@ -68,26 +68,12 @@ impl HealthChecks { } } -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] -#[serde(rename_all = "kebab-case")] -pub enum HealthCheckSeverity { - Warning, - Critical, -} - -impl Default for HealthCheckSeverity { - fn default() -> Self { - HealthCheckSeverity::Warning - } -} #[derive(Clone, Debug, Deserialize, Serialize)] pub struct HealthCheck { pub name: String, pub description: String, #[serde(flatten)] implementation: ActionImplementation, - #[serde(default)] - pub severity: HealthCheckSeverity, pub timeout: Option, } impl HealthCheck { diff --git a/appmgr/src/status/mod.rs b/appmgr/src/status/mod.rs index 2d1e0b7e5..4167a0020 100644 --- a/appmgr/src/status/mod.rs +++ b/appmgr/src/status/mod.rs @@ -1,24 +1,19 @@ use std::collections::BTreeMap; -use std::sync::atomic::{AtomicBool, Ordering}; use chrono::{DateTime, Utc}; -use patch_db::{DbHandle, HasModel}; +use patch_db::{HasModel, Model}; use serde::{Deserialize, Serialize}; -use tracing::instrument; -use self::health_check::{HealthCheckId, HealthCheckSeverity}; -use crate::context::RpcContext; +use self::health_check::HealthCheckId; use crate::dependencies::DependencyErrors; -use crate::notifications::NotificationLevel; -use crate::s9pk::manifest::Manifest; use crate::status::health_check::HealthCheckResult; -use crate::Error; pub mod health_check; #[derive(Clone, Debug, Deserialize, Serialize, HasModel)] #[serde(rename_all = "kebab-case")] pub struct Status { pub configured: bool, + #[model] pub main: MainStatus, #[model] pub dependency_errors: DependencyErrors, @@ -41,66 +36,6 @@ pub enum MainStatus { }, } impl MainStatus { - #[instrument(skip(ctx, db, manifest))] - pub async fn check( - &mut self, - ctx: &RpcContext, - db: &mut Db, - manifest: &Manifest, - should_commit: &AtomicBool, - ) -> Result<(), Error> { - match self { - MainStatus::Running { started, health } => { - let health_result = manifest - .health_checks - .check_all( - ctx, - *started, - &manifest.id, - &manifest.version, - &manifest.volumes, - ) - .await?; - if !should_commit.load(Ordering::SeqCst) { - return Ok(()); - } else { - // only commit health check results if we are supposed to - *health = health_result; - } - let mut should_stop = false; - for (check, res) in health { - match &res { - health_check::HealthCheckResult::Failure { error } - if manifest - .health_checks - .0 - .get(check) - .map(|hc| hc.severity == HealthCheckSeverity::Critical) - .unwrap_or_default() => - { - ctx.notification_manager.notify( - db, - Some(manifest.id.clone()), - NotificationLevel::Error, - String::from("Critical Health Check Failed"), - format!("{} was shut down because a health check required for its operation failed\n{}", manifest.title, error), - (), - None, - ) - .await?; - should_stop = true; - } - _ => (), - } - } - if should_stop { - *self = MainStatus::Stopping; - } - } - _ => (), - } - Ok(()) - } pub fn running(&self) -> bool { match self { MainStatus::Starting @@ -125,3 +60,8 @@ impl MainStatus { } } } +impl MainStatusModel { + pub fn started(self) -> Model>> { + self.0.child("started") + } +}