From e578062082bb610fd044df1abeea41904fd0147f Mon Sep 17 00:00:00 2001 From: Keagan McClelland Date: Wed, 1 Dec 2021 17:50:44 -0700 Subject: [PATCH] adds flag to manager shared state describing whether health checks should be committed, sets that flag on stop and signal --- appmgr/src/config/mod.rs | 33 +++++++++------------- appmgr/src/error.rs | 2 ++ appmgr/src/manager/health.rs | 7 ++++- appmgr/src/manager/mod.rs | 54 ++++++++++++++++++++++++++++++++++-- appmgr/src/status/mod.rs | 10 ++++++- 5 files changed, 81 insertions(+), 25 deletions(-) diff --git a/appmgr/src/config/mod.rs b/appmgr/src/config/mod.rs index be33d4aa9..a8e7f0d39 100644 --- a/appmgr/src/config/mod.rs +++ b/appmgr/src/config/mod.rs @@ -505,26 +505,19 @@ pub fn configure<'a, Db: DbHandle>( } if let Some(signal) = signal { - ctx.docker - .kill_container( - &DockerAction::container_name(id, None), - Some(KillContainerOptions { - signal: signal.to_string(), - }), - ) - .await - // ignore container is not running https://docs.docker.com/engine/api/v1.41/#operation/ContainerKill - .or_else(|e| { - if matches!( - e, - bollard::errors::Error::DockerResponseConflictError { .. } - | bollard::errors::Error::DockerResponseNotFoundError { .. } - ) { - Ok(()) - } else { - Err(e) - } - })?; + match ctx.managers.get(&(id.clone(), version.clone())).await { + None => { + // in theory this should never happen, which indicates this function should be moved behind the + // Manager interface + return Err(Error::new( + eyre!("Manager Not Found for package being configured"), + crate::ErrorKind::Incoherent, + )); + } + Some(m) => { + m.signal(&signal).await?; + } + } } Ok(()) diff --git a/appmgr/src/error.rs b/appmgr/src/error.rs index f2f270907..6f88d06b2 100644 --- a/appmgr/src/error.rs +++ b/appmgr/src/error.rs @@ -60,6 +60,7 @@ pub enum ErrorKind { ParseDbField = 52, Duplicate = 53, MultipleErrors = 54, + Incoherent = 55, } impl ErrorKind { pub fn as_str(&self) -> &'static str { @@ -119,6 +120,7 @@ impl ErrorKind { ParseDbField => "Database Field Parse Error", Duplicate => "Duplication Error", MultipleErrors => "Multiple Errors", + Incoherent => "Incoherent", } } } diff --git a/appmgr/src/manager/health.rs b/appmgr/src/manager/health.rs index 2bb373293..4041fcb2e 100644 --- a/appmgr/src/manager/health.rs +++ b/appmgr/src/manager/health.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::sync::atomic::AtomicBool; use patch_db::DbHandle; use tracing::instrument; @@ -15,6 +16,7 @@ pub async fn check( ctx: &RpcContext, db: &mut Db, id: &PackageId, + should_commit: &AtomicBool, ) -> Result<(), Error> { let mut tx = db.begin().await?; @@ -41,7 +43,10 @@ pub async fn check( .get_mut(&mut checkpoint) .await?; - status.main.check(&ctx, &mut checkpoint, &*manifest).await?; + status + .main + .check(&ctx, &mut checkpoint, &*manifest, should_commit) + .await?; let failed = match &status.main { MainStatus::Running { health, .. } => health.clone(), diff --git a/appmgr/src/manager/mod.rs b/appmgr/src/manager/mod.rs index a778a6ce1..3731de2d5 100644 --- a/appmgr/src/manager/mod.rs +++ b/appmgr/src/manager/mod.rs @@ -1,13 +1,14 @@ use std::collections::BTreeMap; use std::convert::TryInto; use std::future::Future; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::task::Poll; use std::time::Duration; -use bollard::container::StopContainerOptions; +use bollard::container::{KillContainerOptions, StopContainerOptions}; use color_eyre::eyre::eyre; +use nix::sys::signal::Signal; use num_enum::TryFromPrimitive; use patch_db::DbHandle; use sqlx::{Executor, Sqlite}; @@ -158,6 +159,7 @@ pub struct ManagerSharedState { tor_keys: BTreeMap, synchronized: Notify, synchronize_now: Notify, + commit_health_check_results: AtomicBool, } #[derive(Clone, Copy)] @@ -255,11 +257,22 @@ async fn run_main( .collect::, Error>>()?, ) .await?; + + state + .commit_health_check_results + .store(true, Ordering::SeqCst); let health = async { loop { tokio::time::sleep(Duration::from_secs(1)).await; let mut db = state.ctx.db.handle(); - if let Err(e) = health::check(&state.ctx, &mut db, &state.manifest.id).await { + if let Err(e) = health::check( + &state.ctx, + &mut db, + &state.manifest.id, + &state.commit_health_check_results, + ) + .await + { tracing::error!( "Failed to run health check for {}: {}", &state.manifest.id, @@ -310,6 +323,7 @@ impl Manager { tor_keys, synchronized: Notify::new(), synchronize_now: Notify::new(), + commit_health_check_results: AtomicBool::new(true), }); let thread_shared = shared.clone(); let thread = tokio::spawn(async move { @@ -324,6 +338,37 @@ impl Manager { }) } + pub async fn signal(&self, signal: &Signal) -> Result<(), Error> { + // stop health checks from committing their results + self.shared + .commit_health_check_results + .store(false, Ordering::SeqCst); + + // send signal to container + self.shared + .ctx + .docker + .kill_container( + &self.shared.container_name, + Some(KillContainerOptions { + signal: signal.to_string(), + }), + ) + .await + .or_else(|e| { + if matches!( + e, + bollard::errors::Error::DockerResponseConflictError { .. } + | bollard::errors::Error::DockerResponseNotFoundError { .. } + ) { + Ok(()) + } else { + Err(e) + } + })?; + Ok(()) + } + #[instrument(skip(self))] async fn exit(&self) -> Result<(), Error> { let _ = self.shared.on_stop.send(OnStop::Exit); @@ -436,6 +481,9 @@ async fn manager_thread_loop(mut recv: Receiver, thread_shared: &Arc Result<(), Error> { + shared + .commit_health_check_results + .store(false, Ordering::SeqCst); shared.on_stop.send(OnStop::Sleep).map_err(|_| { Error::new( eyre!("Manager has already been shutdown"), diff --git a/appmgr/src/status/mod.rs b/appmgr/src/status/mod.rs index b5c0cb75f..2d1e0b7e5 100644 --- a/appmgr/src/status/mod.rs +++ b/appmgr/src/status/mod.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::sync::atomic::{AtomicBool, Ordering}; use chrono::{DateTime, Utc}; use patch_db::{DbHandle, HasModel}; @@ -46,10 +47,11 @@ impl MainStatus { ctx: &RpcContext, db: &mut Db, manifest: &Manifest, + should_commit: &AtomicBool, ) -> Result<(), Error> { match self { MainStatus::Running { started, health } => { - *health = manifest + let health_result = manifest .health_checks .check_all( ctx, @@ -59,6 +61,12 @@ impl MainStatus { &manifest.volumes, ) .await?; + if !should_commit.load(Ordering::SeqCst) { + return Ok(()); + } else { + // only commit health check results if we are supposed to + *health = health_result; + } let mut should_stop = false; for (check, res) in health { match &res {