diff --git a/appmgr/src/manager/mod.rs b/appmgr/src/manager/mod.rs index 1e2ef8828..660f251cf 100644 --- a/appmgr/src/manager/mod.rs +++ b/appmgr/src/manager/mod.rs @@ -112,8 +112,9 @@ impl ManagerMap { let res = futures::future::join_all(std::mem::take(&mut *self.0.write().await).into_iter().map( |((id, version), man)| async move { + tracing::debug!("Manager for {}@{} shutting down", id, version); man.exit().await?; - tracing::debug!("Manager for {}@{} shutdown", id, version); + tracing::debug!("Manager for {}@{} is shutdown", id, version); if let Err(e) = Arc::try_unwrap(man) { tracing::trace!( "Manager for {}@{} still has {} other open references", @@ -151,6 +152,7 @@ pub enum Status { Running = 1, Stopped = 2, Paused = 3, + Shutdown = 4, } pub struct ManagerSharedState { @@ -165,7 +167,7 @@ pub struct ManagerSharedState { commit_health_check_results: AtomicBool, } -#[derive(Clone, Copy)] +#[derive(Debug, Clone, Copy)] pub enum OnStop { Restart, Sleep, @@ -336,6 +338,7 @@ impl Manager { synchronize_now: Notify::new(), commit_health_check_results: AtomicBool::new(true), }); + shared.synchronize_now.notify_one(); let thread_shared = shared.clone(); let thread = tokio::spawn(async move { tokio::select! { @@ -410,7 +413,7 @@ impl Manager { a => a?, }; self.shared.status.store( - Status::Stopped as usize, + Status::Shutdown as usize, std::sync::atomic::Ordering::SeqCst, ); if let Some(thread) = self.thread.take().await { diff --git a/appmgr/src/manager/sync.rs b/appmgr/src/manager/sync.rs index c2d1d1299..841e52f5f 100644 --- a/appmgr/src/manager/sync.rs +++ b/appmgr/src/manager/sync.rs @@ -10,7 +10,7 @@ use crate::status::MainStatus; use crate::Error; /// Allocates a db handle. DO NOT CALL with a db handle already in scope -async fn synchronize_once(shared: &ManagerSharedState) -> Result<(), Error> { +async fn synchronize_once(shared: &ManagerSharedState) -> Result { let mut db = shared.ctx.db.handle(); let mut status = crate::db::DatabaseModel::new() .package_data() @@ -24,7 +24,8 @@ async fn synchronize_once(shared: &ManagerSharedState) -> Result<(), Error> { .main() .get_mut(&mut db) .await?; - match shared.status.load(Ordering::SeqCst).try_into().unwrap() { + let manager_status = shared.status.load(Ordering::SeqCst).try_into().unwrap(); + match manager_status { Status::Stopped => match &mut *status { MainStatus::Stopped => (), MainStatus::Stopping => { @@ -72,28 +73,38 @@ async fn synchronize_once(shared: &ManagerSharedState) -> Result<(), Error> { } MainStatus::BackingUp { .. } => (), }, + Status::Shutdown => (), } status.save(&mut db).await?; - Ok(()) + Ok(manager_status) } pub async fn synchronizer(shared: &ManagerSharedState) { loop { - if let Err(e) = synchronize_once(shared).await { - tracing::error!( - "Synchronizer for {}@{} failed: {}", - shared.manifest.id, - shared.manifest.version, - e - ); - tracing::debug!("{:?}", e); - } else { - tracing::trace!("{} status synchronized", shared.manifest.id); - shared.synchronized.notify_waiters(); - } tokio::select! { _ = tokio::time::sleep(Duration::from_secs(5)) => (), _ = shared.synchronize_now.notified() => (), } + let status = match synchronize_once(shared).await { + Err(e) => { + tracing::error!( + "Synchronizer for {}@{} failed: {}", + shared.manifest.id, + shared.manifest.version, + e + ); + tracing::debug!("{:?}", e); + continue; + } + Ok(status) => status, + }; + tracing::trace!("{} status synchronized", shared.manifest.id); + shared.synchronized.notify_waiters(); + match status { + Status::Shutdown => { + break; + } + _ => (), + } } }