adds flag to manager shared state describing whether health checks should be committed, sets that flag on stop and signal

This commit is contained in:
Keagan McClelland
2021-12-01 17:50:44 -07:00
committed by Aiden McClelland
parent d8fb32ea8b
commit e578062082
5 changed files with 81 additions and 25 deletions

View File

@@ -505,26 +505,19 @@ pub fn configure<'a, Db: DbHandle>(
}
if let Some(signal) = signal {
ctx.docker
.kill_container(
&DockerAction::container_name(id, None),
Some(KillContainerOptions {
signal: signal.to_string(),
}),
)
.await
// ignore container is not running https://docs.docker.com/engine/api/v1.41/#operation/ContainerKill
.or_else(|e| {
if matches!(
e,
bollard::errors::Error::DockerResponseConflictError { .. }
| bollard::errors::Error::DockerResponseNotFoundError { .. }
) {
Ok(())
} else {
Err(e)
}
})?;
match ctx.managers.get(&(id.clone(), version.clone())).await {
None => {
// in theory this should never happen, which indicates this function should be moved behind the
// Manager interface
return Err(Error::new(
eyre!("Manager Not Found for package being configured"),
crate::ErrorKind::Incoherent,
));
}
Some(m) => {
m.signal(&signal).await?;
}
}
}
Ok(())

View File

@@ -60,6 +60,7 @@ pub enum ErrorKind {
ParseDbField = 52,
Duplicate = 53,
MultipleErrors = 54,
Incoherent = 55,
}
impl ErrorKind {
pub fn as_str(&self) -> &'static str {
@@ -119,6 +120,7 @@ impl ErrorKind {
ParseDbField => "Database Field Parse Error",
Duplicate => "Duplication Error",
MultipleErrors => "Multiple Errors",
Incoherent => "Incoherent",
}
}
}

View File

@@ -1,4 +1,5 @@
use std::collections::BTreeMap;
use std::sync::atomic::AtomicBool;
use patch_db::DbHandle;
use tracing::instrument;
@@ -15,6 +16,7 @@ pub async fn check<Db: DbHandle>(
ctx: &RpcContext,
db: &mut Db,
id: &PackageId,
should_commit: &AtomicBool,
) -> Result<(), Error> {
let mut tx = db.begin().await?;
@@ -41,7 +43,10 @@ pub async fn check<Db: DbHandle>(
.get_mut(&mut checkpoint)
.await?;
status.main.check(&ctx, &mut checkpoint, &*manifest).await?;
status
.main
.check(&ctx, &mut checkpoint, &*manifest, should_commit)
.await?;
let failed = match &status.main {
MainStatus::Running { health, .. } => health.clone(),

View File

@@ -1,13 +1,14 @@
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use bollard::container::StopContainerOptions;
use bollard::container::{KillContainerOptions, StopContainerOptions};
use color_eyre::eyre::eyre;
use nix::sys::signal::Signal;
use num_enum::TryFromPrimitive;
use patch_db::DbHandle;
use sqlx::{Executor, Sqlite};
@@ -158,6 +159,7 @@ pub struct ManagerSharedState {
tor_keys: BTreeMap<InterfaceId, TorSecretKeyV3>,
synchronized: Notify,
synchronize_now: Notify,
commit_health_check_results: AtomicBool,
}
#[derive(Clone, Copy)]
@@ -255,11 +257,22 @@ async fn run_main(
.collect::<Result<Vec<_>, Error>>()?,
)
.await?;
state
.commit_health_check_results
.store(true, Ordering::SeqCst);
let health = async {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let mut db = state.ctx.db.handle();
if let Err(e) = health::check(&state.ctx, &mut db, &state.manifest.id).await {
if let Err(e) = health::check(
&state.ctx,
&mut db,
&state.manifest.id,
&state.commit_health_check_results,
)
.await
{
tracing::error!(
"Failed to run health check for {}: {}",
&state.manifest.id,
@@ -310,6 +323,7 @@ impl Manager {
tor_keys,
synchronized: Notify::new(),
synchronize_now: Notify::new(),
commit_health_check_results: AtomicBool::new(true),
});
let thread_shared = shared.clone();
let thread = tokio::spawn(async move {
@@ -324,6 +338,37 @@ impl Manager {
})
}
pub async fn signal(&self, signal: &Signal) -> Result<(), Error> {
// stop health checks from committing their results
self.shared
.commit_health_check_results
.store(false, Ordering::SeqCst);
// send signal to container
self.shared
.ctx
.docker
.kill_container(
&self.shared.container_name,
Some(KillContainerOptions {
signal: signal.to_string(),
}),
)
.await
.or_else(|e| {
if matches!(
e,
bollard::errors::Error::DockerResponseConflictError { .. }
| bollard::errors::Error::DockerResponseNotFoundError { .. }
) {
Ok(())
} else {
Err(e)
}
})?;
Ok(())
}
#[instrument(skip(self))]
async fn exit(&self) -> Result<(), Error> {
let _ = self.shared.on_stop.send(OnStop::Exit);
@@ -436,6 +481,9 @@ async fn manager_thread_loop(mut recv: Receiver<OnStop>, thread_shared: &Arc<Man
#[instrument(skip(shared))]
async fn stop(shared: &ManagerSharedState) -> Result<(), Error> {
shared
.commit_health_check_results
.store(false, Ordering::SeqCst);
shared.on_stop.send(OnStop::Sleep).map_err(|_| {
Error::new(
eyre!("Manager has already been shutdown"),

View File

@@ -1,4 +1,5 @@
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicBool, Ordering};
use chrono::{DateTime, Utc};
use patch_db::{DbHandle, HasModel};
@@ -46,10 +47,11 @@ impl MainStatus {
ctx: &RpcContext,
db: &mut Db,
manifest: &Manifest,
should_commit: &AtomicBool,
) -> Result<(), Error> {
match self {
MainStatus::Running { started, health } => {
*health = manifest
let health_result = manifest
.health_checks
.check_all(
ctx,
@@ -59,6 +61,12 @@ impl MainStatus {
&manifest.volumes,
)
.await?;
if !should_commit.load(Ordering::SeqCst) {
return Ok(());
} else {
// only commit health check results if we are supposed to
*health = health_result;
}
let mut should_stop = false;
for (check, res) in health {
match &res {