use new locking api

This commit is contained in:
Aiden McClelland
2021-09-23 18:02:18 -06:00
committed by Aiden McClelland
parent 51417383d2
commit 660205c3db
13 changed files with 223 additions and 258 deletions

View File

@@ -3,15 +3,16 @@ use std::sync::Arc;
use anyhow::anyhow;
use chrono::{DateTime, Utc};
use futures::StreamExt;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt};
use indexmap::IndexMap;
use patch_db::{DbHandle, HasModel, LockType, Map, MapModel, ModelData};
use patch_db::{DbHandle, HasModel, Map, MapModel, ModelData};
use serde::{Deserialize, Serialize};
use self::health_check::{HealthCheckId, HealthCheckResult};
use crate::context::RpcContext;
use crate::db::model::CurrentDependencyInfo;
use crate::dependencies::DependencyError;
use crate::db::model::{CurrentDependencyInfo, InstalledPackageDataEntryModel};
use crate::dependencies::{DependencyError, TaggedDependencyError};
use crate::manager::{Manager, Status as ManagerStatus};
use crate::notifications::{notify, NotificationLevel, NotificationSubtype};
use crate::s9pk::manifest::{Manifest, PackageId};
@@ -22,37 +23,31 @@ pub mod health_check;
// Assume docker for now
pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> {
let mut pkg_ids = crate::db::DatabaseModel::new()
let pkg_ids = crate::db::DatabaseModel::new()
.package_data()
.keys(&mut ctx.db.handle(), false)
.await?;
// TODO: parallelize this
for id in pkg_ids {
async fn status(ctx: &RpcContext, id: PackageId) -> Result<(), Error> {
let mut db = ctx.db.handle();
// TODO: DRAGONS!!
// this locks all of package data to solve a deadlock issue below. As of the writing of this comment, it
// hangs during the `check` operation on /package-data/<id>. There is another daemon loop somewhere that
// is likely iterating through packages in a different order.
crate::db::DatabaseModel::new()
.package_data()
.lock(&mut db, LockType::Write)
.await;
futures::stream::iter(pkg_ids)
.for_each_concurrent(None, |id| async move {
async fn status(ctx: &RpcContext, id: PackageId) -> Result<(), Error> {
let mut db = ctx.db.handle();
// TODO: DRAGONS!!
// this locks all of package data to solve a deadlock issue below. As of the writing of this comment, it
// hangs during the `check` operation on /package-data/<id>. There is another daemon loop somewhere that
// is likely iterating through packages in a different order.
// crate::db::DatabaseModel::new()
// .package_data()
// .lock(&mut db)
// .await;
// Without the above lock, the below check operation will deadlock
let model = crate::db::DatabaseModel::new()
.package_data()
.idx_model(&id)
.check(&mut db)
.await?
.ok_or_else(|| {
Error::new(
anyhow!("PackageDataEntry does not exist"),
crate::ErrorKind::Database,
)
})?;
let (mut status, manager) =
if let Some(installed) = model.installed().check(&mut db).await? {
// Without the above lock, the below check operation will deadlock
let (mut status, manager) = if let Some(installed) = crate::db::DatabaseModel::new()
.package_data()
.idx_model(&id)
.and_then(|m| m.installed())
.check(&mut db)
.await?
{
(
installed.clone().status().get_mut(&mut db).await?,
ctx.managers
@@ -74,42 +69,41 @@ pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> {
return Ok(());
};
let res = status.main.synchronize(&manager).await?;
let res = status.main.synchronize(&manager).await?;
status.save(&mut db).await?;
status.save(&mut db).await?;
Ok(res)
}
if let Err(e) = status(ctx, id.clone()).await {
log::error!("Error syncronizing status of {}: {}", id, e);
}
}
Ok(res)
}
if let Err(e) = status(ctx, id.clone()).await {
log::error!("Error syncronizing status of {}: {}", id, e);
}
})
.await;
Ok(())
}
pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> {
let mut db = ctx.db.handle();
// TODO: DRAGONS!!
// this locks all of package data to solve a deadlock issue below. As of the writing of this comment, it
// hangs during the `check` operation on /package-data/<id>. There is another daemon loop somewhere that
// is likely iterating through packages in a different order.
let pkg_ids = crate::db::DatabaseModel::new()
.package_data()
.keys(&mut db, false)
.keys(&mut db, true)
.await?;
let mut status_manifest = Vec::with_capacity(pkg_ids.len());
let mut status_deps = Vec::with_capacity(pkg_ids.len());
let mut installed_deps = Vec::with_capacity(pkg_ids.len());
for id in &pkg_ids {
let model = crate::db::DatabaseModel::new()
if let Some(installed) = crate::db::DatabaseModel::new()
.package_data()
.idx_model(id)
.and_then(|m| m.installed())
.check(&mut db)
.await?
.ok_or_else(|| {
Error::new(
anyhow!("PackageDataEntry does not exist"),
crate::ErrorKind::Database,
)
})?;
model.lock(&mut db, LockType::Write).await;
if let Some(installed) = model.installed().check(&mut db).await? {
{
let listed_deps = installed
.clone()
.manifest()
@@ -125,8 +119,8 @@ pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> {
installed.clone().status(),
Arc::new(installed.clone().manifest().get(&mut db, true).await?),
));
status_deps.push((
installed.clone().status(),
installed_deps.push((
installed.clone(),
Arc::new({
installed
.current_dependencies()
@@ -168,10 +162,7 @@ pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> {
.for_each_concurrent(None, move |(((status, manifest), id), ctx)| {
let status_sender = status_sender.clone();
async move {
match tokio::spawn(main_status(ctx.clone(), status, manifest, ctx.db.handle()))
.await
.unwrap()
{
match main_status(ctx.clone(), status, manifest, ctx.db.handle()).await {
Err(e) => {
log::error!("Error running main health check for {}: {}", id, e);
log::debug!("{:?}", e);
@@ -188,30 +179,74 @@ pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> {
}
let statuses = Arc::new(statuses);
async fn dependency_status<Db: DbHandle>(
id: &PackageId,
statuses: Arc<HashMap<PackageId, MainStatus>>,
status_model: StatusModel,
model: InstalledPackageDataEntryModel,
current_deps: Arc<IndexMap<PackageId, CurrentDependencyInfo>>,
mut db: Db,
) -> Result<(), Error> {
let mut status = status_model.get_mut(&mut db).await?;
status
.dependency_errors
.update_health_based(&current_deps, &*statuses)
.await?;
status.save(&mut db).await?;
for (dep_id, dep_info) in &*current_deps {
if let Some(err) = match statuses.get(dep_id) {
Some(MainStatus::Running { ref health, .. })
| Some(MainStatus::BackingUp {
started: Some(_),
ref health,
}) => {
let mut failures = IndexMap::new();
for check in &dep_info.health_checks {
let res = health
.get(check)
.cloned()
.unwrap_or_else(|| HealthCheckResult {
result: HealthCheckResultVariant::Disabled,
time: Utc::now(),
});
if !matches!(res.result, HealthCheckResultVariant::Success) {
failures.insert(check.clone(), res);
}
}
if !failures.is_empty() {
Some(DependencyError::HealthChecksFailed { failures })
} else {
None
}
}
_ => Some(DependencyError::NotRunning),
} {
handle_broken_dependents(
&mut db,
id,
dep_id,
model.clone(),
err,
&mut IndexMap::new(),
)
.await?;
} else {
let mut errs = model
.clone()
.status()
.dependency_errors()
.get_mut(&mut db)
.await?;
if matches!(
errs.get(dep_id),
Some(DependencyError::HealthChecksFailed { .. })
) {
errs.0.remove(dep_id);
errs.save(&mut db).await?;
}
}
}
Ok(())
}
futures::stream::iter(status_deps.into_iter().zip(pkg_ids.clone()))
.for_each_concurrent(None, |((status, deps), id)| {
futures::stream::iter(installed_deps.into_iter().zip(pkg_ids.clone()))
.for_each_concurrent(None, |((installed, deps), id)| {
let statuses = statuses.clone();
async move {
if let Err(e) =
tokio::spawn(dependency_status(statuses, status, deps, ctx.db.handle()))
.await
.unwrap()
dependency_status(&id, statuses, installed, deps, ctx.db.handle()).await
{
log::error!("Error running dependency health check for {}: {}", id, e);
log::debug!("{:?}", e);
@@ -231,7 +266,7 @@ pub struct Status {
pub dependency_errors: DependencyErrors,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize, HasModel)]
#[serde(tag = "status")]
#[serde(rename_all = "kebab-case")]
pub enum MainStatus {
@@ -405,52 +440,86 @@ impl DependencyErrors {
}
Ok(DependencyErrors(res))
}
async fn update_health_based(
&mut self,
dependencies: &IndexMap<PackageId, CurrentDependencyInfo>,
statuses: &HashMap<PackageId, MainStatus>,
) -> Result<(), Error> {
for (dep_id, dep_info) in dependencies {
if matches!(
self.get(&dep_id),
Some(&DependencyError::NotRunning)
| Some(&DependencyError::HealthChecksFailed { .. })
| None
) {
match statuses.get(dep_id) {
Some(MainStatus::Running { ref health, .. })
| Some(MainStatus::BackingUp {
started: Some(_),
ref health,
}) => {
let mut failures = IndexMap::new();
for check in &dep_info.health_checks {
let res =
health
.get(check)
.cloned()
.unwrap_or_else(|| HealthCheckResult {
result: HealthCheckResultVariant::Disabled,
time: Utc::now(),
});
if !matches!(res.result, HealthCheckResultVariant::Success) {
failures.insert(check.clone(), res);
}
}
if !failures.is_empty() {
self.0.insert(
dep_id.clone(),
DependencyError::HealthChecksFailed { failures },
);
}
}
_ => {
self.0.insert(dep_id.clone(), DependencyError::NotRunning);
}
pub fn handle_broken_dependents<'a, Db: DbHandle>(
db: &'a mut Db,
id: &'a PackageId,
dependency: &'a PackageId,
model: InstalledPackageDataEntryModel,
error: DependencyError,
breakages: &'a mut IndexMap<PackageId, TaggedDependencyError>,
) -> BoxFuture<'a, Result<(), Error>> {
async move {
let mut status = model.clone().status().get_mut(db).await?;
let old = status.dependency_errors.0.remove(id);
let newly_broken = old.is_none();
status.dependency_errors.0.insert(
id.clone(),
if let Some(old) = old {
old.merge_with(error.clone())
} else {
error.clone()
},
);
if newly_broken {
breakages.insert(
id.clone(),
TaggedDependencyError {
dependency: dependency.clone(),
error: error.clone(),
},
);
if status.main.running() {
if model
.clone()
.manifest()
.dependencies()
.idx_model(dependency)
.get(db, true)
.await?
.into_owned()
.ok_or_else(|| {
Error::new(
anyhow!("{} not in listed dependencies", dependency),
crate::ErrorKind::Database,
)
})?
.critical
{
status.main.stop();
let dependents = model.current_dependents().get(db, true).await?;
for (dependent, _) in &*dependents {
let dependent_model = crate::db::DatabaseModel::new()
.package_data()
.idx_model(dependent)
.and_then(|pkg| pkg.installed())
.check(db)
.await?
.ok_or_else(|| {
Error::new(
anyhow!("{} is not installed", dependent),
crate::ErrorKind::NotFound,
)
})?;
handle_broken_dependents(
db,
dependent,
id,
dependent_model,
DependencyError::NotRunning,
breakages,
)
.await?;
}
}
}
}
status.save(db).await?;
Ok(())
}
.boxed()
}