fixes shutdown issues (#1106)

This commit is contained in:
Keagan McClelland
2022-01-20 12:33:44 -07:00
committed by Aiden McClelland
parent 75dbc0bd76
commit 5278a22aef
2 changed files with 32 additions and 18 deletions

View File

@@ -112,8 +112,9 @@ impl ManagerMap {
let res = let res =
futures::future::join_all(std::mem::take(&mut *self.0.write().await).into_iter().map( futures::future::join_all(std::mem::take(&mut *self.0.write().await).into_iter().map(
|((id, version), man)| async move { |((id, version), man)| async move {
tracing::debug!("Manager for {}@{} shutting down", id, version);
man.exit().await?; man.exit().await?;
tracing::debug!("Manager for {}@{} shutdown", id, version); tracing::debug!("Manager for {}@{} is shutdown", id, version);
if let Err(e) = Arc::try_unwrap(man) { if let Err(e) = Arc::try_unwrap(man) {
tracing::trace!( tracing::trace!(
"Manager for {}@{} still has {} other open references", "Manager for {}@{} still has {} other open references",
@@ -151,6 +152,7 @@ pub enum Status {
Running = 1, Running = 1,
Stopped = 2, Stopped = 2,
Paused = 3, Paused = 3,
Shutdown = 4,
} }
pub struct ManagerSharedState { pub struct ManagerSharedState {
@@ -165,7 +167,7 @@ pub struct ManagerSharedState {
commit_health_check_results: AtomicBool, commit_health_check_results: AtomicBool,
} }
#[derive(Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub enum OnStop { pub enum OnStop {
Restart, Restart,
Sleep, Sleep,
@@ -336,6 +338,7 @@ impl Manager {
synchronize_now: Notify::new(), synchronize_now: Notify::new(),
commit_health_check_results: AtomicBool::new(true), commit_health_check_results: AtomicBool::new(true),
}); });
shared.synchronize_now.notify_one();
let thread_shared = shared.clone(); let thread_shared = shared.clone();
let thread = tokio::spawn(async move { let thread = tokio::spawn(async move {
tokio::select! { tokio::select! {
@@ -410,7 +413,7 @@ impl Manager {
a => a?, a => a?,
}; };
self.shared.status.store( self.shared.status.store(
Status::Stopped as usize, Status::Shutdown as usize,
std::sync::atomic::Ordering::SeqCst, std::sync::atomic::Ordering::SeqCst,
); );
if let Some(thread) = self.thread.take().await { if let Some(thread) = self.thread.take().await {

View File

@@ -10,7 +10,7 @@ use crate::status::MainStatus;
use crate::Error; use crate::Error;
/// Allocates a db handle. DO NOT CALL with a db handle already in scope /// Allocates a db handle. DO NOT CALL with a db handle already in scope
async fn synchronize_once(shared: &ManagerSharedState) -> Result<(), Error> { async fn synchronize_once(shared: &ManagerSharedState) -> Result<Status, Error> {
let mut db = shared.ctx.db.handle(); let mut db = shared.ctx.db.handle();
let mut status = crate::db::DatabaseModel::new() let mut status = crate::db::DatabaseModel::new()
.package_data() .package_data()
@@ -24,7 +24,8 @@ async fn synchronize_once(shared: &ManagerSharedState) -> Result<(), Error> {
.main() .main()
.get_mut(&mut db) .get_mut(&mut db)
.await?; .await?;
match shared.status.load(Ordering::SeqCst).try_into().unwrap() { let manager_status = shared.status.load(Ordering::SeqCst).try_into().unwrap();
match manager_status {
Status::Stopped => match &mut *status { Status::Stopped => match &mut *status {
MainStatus::Stopped => (), MainStatus::Stopped => (),
MainStatus::Stopping => { MainStatus::Stopping => {
@@ -72,28 +73,38 @@ async fn synchronize_once(shared: &ManagerSharedState) -> Result<(), Error> {
} }
MainStatus::BackingUp { .. } => (), MainStatus::BackingUp { .. } => (),
}, },
Status::Shutdown => (),
} }
status.save(&mut db).await?; status.save(&mut db).await?;
Ok(()) Ok(manager_status)
} }
pub async fn synchronizer(shared: &ManagerSharedState) { pub async fn synchronizer(shared: &ManagerSharedState) {
loop { loop {
if let Err(e) = synchronize_once(shared).await {
tracing::error!(
"Synchronizer for {}@{} failed: {}",
shared.manifest.id,
shared.manifest.version,
e
);
tracing::debug!("{:?}", e);
} else {
tracing::trace!("{} status synchronized", shared.manifest.id);
shared.synchronized.notify_waiters();
}
tokio::select! { tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(5)) => (), _ = tokio::time::sleep(Duration::from_secs(5)) => (),
_ = shared.synchronize_now.notified() => (), _ = shared.synchronize_now.notified() => (),
} }
let status = match synchronize_once(shared).await {
Err(e) => {
tracing::error!(
"Synchronizer for {}@{} failed: {}",
shared.manifest.id,
shared.manifest.version,
e
);
tracing::debug!("{:?}", e);
continue;
}
Ok(status) => status,
};
tracing::trace!("{} status synchronized", shared.manifest.id);
shared.synchronized.notify_waiters();
match status {
Status::Shutdown => {
break;
}
_ => (),
}
} }
} }