From e4a40ac32da3d46646e9735e078f9fec6d7b12fd Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Wed, 3 Nov 2021 12:15:38 -0600 Subject: [PATCH] refactor: remove sync daemon (#765) * refactor: remove sync daemon * improve code organization --- appmgr/Cargo.lock | 44 ++++ appmgr/Cargo.toml | 1 + appmgr/src/backup/backup_bulk.rs | 36 ++-- appmgr/src/bin/embassyd.rs | 23 --- appmgr/src/config/mod.rs | 5 +- appmgr/src/context/rpc.rs | 1 - appmgr/src/context/setup.rs | 1 - appmgr/src/control.rs | 20 +- appmgr/src/manager/mod.rs | 335 ++++++++++++++++--------------- appmgr/src/manager/sync.rs | 79 ++++++++ appmgr/src/status/mod.rs | 98 +-------- 11 files changed, 330 insertions(+), 313 deletions(-) create mode 100644 appmgr/src/manager/sync.rs diff --git a/appmgr/Cargo.lock b/appmgr/Cargo.lock index 98ed2b177..3779e9250 100644 --- a/appmgr/Cargo.lock +++ b/appmgr/Cargo.lock @@ -702,6 +702,17 @@ dependencies = [ "syn 1.0.80", ] +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2 1.0.29", + "quote 1.0.10", + "syn 1.0.80", +] + [[package]] name = "derive_more" version = "0.99.16" @@ -857,6 +868,7 @@ dependencies = [ "nix 0.23.0", "nom 7.0.0", "num", + "num_enum", "openssh-keys", "openssl", "patch-db", @@ -1915,6 +1927,28 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9bd055fb730c4f8f4f57d45d35cd6b3f0980535b056dc7ff119cee6a66ed6f" +dependencies = [ + "derivative", + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "486ea01961c4a818096de679a8b740b26d9033146ac5291b1c98557658f8cdd9" +dependencies = [ + "proc-macro-crate", + "proc-macro2 1.0.29", + "quote 1.0.10", + "syn 1.0.80", +] + [[package]] name = "object" version = "0.26.2" @@ -2202,6 +2236,16 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "proc-macro-crate" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ebace6889caf889b4d3f76becee12e90353f2b8c7d875534a71e5742f8f6f83" +dependencies = [ + "thiserror", + "toml", +] + [[package]] name = "proc-macro-hack" version = "0.5.19" diff --git a/appmgr/Cargo.toml b/appmgr/Cargo.toml index 1f2f73e2d..f926b6583 100644 --- a/appmgr/Cargo.toml +++ b/appmgr/Cargo.toml @@ -77,6 +77,7 @@ log = "0.4.14" nix = "0.23.0" nom = "7.0.0" num = "0.4.0" +num_enum = "0.5.4" openssh-keys = "0.5.0" openssl = { version = "0.10.36", features = ["vendored"] } patch-db = { version = "*", path = "../patch-db/patch-db", features = [ diff --git a/appmgr/src/backup/backup_bulk.rs b/appmgr/src/backup/backup_bulk.rs index 9a94cd2fc..a97a175dd 100644 --- a/appmgr/src/backup/backup_bulk.rs +++ b/appmgr/src/backup/backup_bulk.rs @@ -223,27 +223,10 @@ async fn perform_backup( } else { continue; }; - installed_model.lock(&mut db, LockType::Write).await; - let manifest = installed_model - .clone() - .manifest() - .get(&mut db, true) - .await?; let main_status_model = installed_model.clone().status().main(); let (started, health) = match main_status_model.get(&mut db, true).await?.into_owned() { MainStatus::Running { started, health } => (Some(started.clone()), health.clone()), MainStatus::Stopped | MainStatus::Stopping => (None, Default::default()), - MainStatus::Restoring { .. } => { - backup_report.insert( - package_id, - PackageBackupReport { - error: Some( - "Can't do backup because service is in a restoring state".to_owned(), - ), - }, - ); - continue; - } MainStatus::BackingUp { .. } => { backup_report.insert( package_id, @@ -256,16 +239,33 @@ async fn perform_backup( continue; } }; + let mut tx = db.begin().await?; // for lock scope main_status_model .put( - &mut db, + &mut tx, &MainStatus::BackingUp { started: started.clone(), health: health.clone(), }, ) .await?; + tx.save().await?; // drop locks + installed_model.lock(&mut db, LockType::Write).await; + let manifest = installed_model + .clone() + .manifest() + .get(&mut db, true) + .await?; + + ctx.managers + .get(&(manifest.id.clone(), manifest.version.clone())) + .await + .ok_or_else(|| { + Error::new(eyre!("Manager not found"), crate::ErrorKind::InvalidRequest) + })? + .synchronize() + .await; let guard = backup_guard.mount_package_backup(&package_id).await?; let res = manifest .backup diff --git a/appmgr/src/bin/embassyd.rs b/appmgr/src/bin/embassyd.rs index 4c0551883..dcd54454e 100644 --- a/appmgr/src/bin/embassyd.rs +++ b/appmgr/src/bin/embassyd.rs @@ -11,7 +11,6 @@ use embassy::middleware::diagnostic::diagnostic; use embassy::net::mdns::MdnsController; use embassy::net::tor::tor_health_check; use embassy::shutdown::Shutdown; -use embassy::status::synchronize_all; use embassy::util::{daemon, Invoke}; use embassy::{static_server, Error, ErrorKind, ResultExt}; use futures::{FutureExt, TryFutureExt}; @@ -183,22 +182,6 @@ async fn inner_main(cfg_path: Option<&str>) -> Result, Error> { }) }; - let status_ctx = rpc_ctx.clone(); - let status_daemon = daemon( - move || { - let ctx = status_ctx.clone(); - async move { - if let Err(e) = synchronize_all(&ctx).await { - tracing::error!("Error in Status Sync daemon: {}", e); - tracing::debug!("{:?}", e); - } else { - tracing::trace!("Status Sync completed successfully"); - } - } - }, - Duration::from_millis(500), - rpc_ctx.shutdown.subscribe(), - ); let tor_health_ctx = rpc_ctx.clone(); let tor_client = Client::builder() .proxy( @@ -239,12 +222,6 @@ async fn inner_main(cfg_path: Option<&str>) -> Result, Error> { file_server .map_err(|e| Error::new(e, ErrorKind::Network)) .map_ok(|_| tracing::debug!("Static File Server Shutdown")), - status_daemon - .map_err(|e| Error::new( - e.wrap_err("Status Sync Daemon panicked!"), - ErrorKind::Unknown - )) - .map_ok(|_| tracing::debug!("Status Sync Daemon Shutdown")), tor_health_daemon .map_err(|e| Error::new( e.wrap_err("Tor Health Daemon panicked!"), diff --git a/appmgr/src/config/mod.rs b/appmgr/src/config/mod.rs index 80bb82bd4..c0464e444 100644 --- a/appmgr/src/config/mod.rs +++ b/appmgr/src/config/mod.rs @@ -17,7 +17,10 @@ use crate::action::docker::DockerAction; use crate::context::RpcContext; use crate::db::model::CurrentDependencyInfo; use crate::db::util::WithRevision; -use crate::dependencies::{BreakageRes, DependencyError, DependencyErrors, TaggedDependencyError, break_transitive, heal_all_dependents_transitive, update_current_dependents}; +use crate::dependencies::{ + break_transitive, heal_all_dependents_transitive, update_current_dependents, BreakageRes, + DependencyError, DependencyErrors, TaggedDependencyError, +}; use crate::install::cleanup::remove_current_dependents; use crate::s9pk::manifest::{Manifest, PackageId}; use crate::util::{ diff --git a/appmgr/src/context/rpc.rs b/appmgr/src/context/rpc.rs index 1048611f5..7c37d0d1a 100644 --- a/appmgr/src/context/rpc.rs +++ b/appmgr/src/context/rpc.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::collections::VecDeque; use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}; use std::ops::Deref; diff --git a/appmgr/src/context/setup.rs b/appmgr/src/context/setup.rs index 15fcf3cee..78ad239a1 100644 --- a/appmgr/src/context/setup.rs +++ b/appmgr/src/context/setup.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::net::{IpAddr, SocketAddr}; use std::ops::Deref; use std::path::{Path, PathBuf}; diff --git a/appmgr/src/control.rs b/appmgr/src/control.rs index bc5eda4ad..ea7a9e5d5 100644 --- a/appmgr/src/control.rs +++ b/appmgr/src/control.rs @@ -50,20 +50,20 @@ pub async fn start( started: Utc::now(), health: BTreeMap::new(), }; - status - .synchronize( - &*ctx - .managers - .get(&(id.clone(), version)) - .await - .ok_or_else(|| Error::new(eyre!("Manager not found"), crate::ErrorKind::Docker))?, - ) - .await?; status.save(&mut tx).await?; heal_all_dependents_transitive(&ctx, &mut tx, &id).await?; + let revision = tx.commit(None).await?; + + ctx.managers + .get(&(id, version)) + .await + .ok_or_else(|| Error::new(eyre!("Manager not found"), crate::ErrorKind::InvalidRequest))? + .synchronize() + .await; + Ok(WithRevision { - revision: tx.commit(None).await?, + revision, response: (), }) } diff --git a/appmgr/src/manager/mod.rs b/appmgr/src/manager/mod.rs index 3dd5e86e3..1039bb38d 100644 --- a/appmgr/src/manager/mod.rs +++ b/appmgr/src/manager/mod.rs @@ -1,23 +1,26 @@ use std::collections::BTreeMap; +use std::convert::TryInto; use std::future::Future; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::Poll; use std::time::Duration; use bollard::container::StopContainerOptions; use color_eyre::eyre::eyre; +use num_enum::TryFromPrimitive; use patch_db::DbHandle; use sqlx::{Executor, Sqlite}; use tokio::sync::watch::error::RecvError; use tokio::sync::watch::{channel, Receiver, Sender}; -use tokio::sync::RwLock; +use tokio::sync::{Notify, RwLock}; use torut::onion::TorSecretKeyV3; use tracing::instrument; use crate::action::docker::DockerAction; use crate::action::NoOutput; use crate::context::RpcContext; +use crate::manager::sync::synchronizer; use crate::net::interface::InterfaceId; use crate::notifications::NotificationLevel; use crate::s9pk::manifest::{Manifest, PackageId}; @@ -25,6 +28,7 @@ use crate::util::{Container, NonDetachingJoinHandle, Version}; use crate::Error; pub mod health; +mod sync; #[derive(Default)] pub struct ManagerMap(RwLock>>); @@ -128,19 +132,23 @@ pub struct Manager { thread: Container>, } +#[derive(TryFromPrimitive)] +#[repr(usize)] pub enum Status { Running = 0, Stopped = 1, Paused = 2, } -struct ManagerSharedState { +pub struct ManagerSharedState { ctx: RpcContext, status: AtomicUsize, on_stop: Sender, manifest: Manifest, container_name: String, tor_keys: BTreeMap, + synchronized: Notify, + synchronize_now: Notify, } #[derive(Clone, Copy)] @@ -273,7 +281,7 @@ impl Manager { manifest: Manifest, tor_keys: BTreeMap, ) -> Result { - let (on_stop, mut recv) = channel(OnStop::Sleep); + let (on_stop, recv) = channel(OnStop::Sleep); let shared = Arc::new(ManagerSharedState { ctx, status: AtomicUsize::new(Status::Stopped as usize), @@ -281,83 +289,14 @@ impl Manager { container_name: DockerAction::container_name(&manifest.id, None), manifest, tor_keys, + synchronized: Notify::new(), + synchronize_now: Notify::new(), }); let thread_shared = shared.clone(); let thread = tokio::spawn(async move { - loop { - fn handle_stop_action<'a>( - recv: &'a mut Receiver, - ) -> ( - OnStop, - Option> + 'a>, - ) { - let val = *recv.borrow_and_update(); - match val { - OnStop::Sleep => (OnStop::Sleep, Some(recv.changed())), - a => (a, None), - } - } - let (stop_action, fut) = handle_stop_action(&mut recv); - match stop_action { - OnStop::Sleep => { - if let Some(fut) = fut { - thread_shared.status.store( - Status::Stopped as usize, - std::sync::atomic::Ordering::SeqCst, - ); - fut.await.unwrap(); - continue; - } - } - OnStop::Exit => { - thread_shared.status.store( - Status::Stopped as usize, - std::sync::atomic::Ordering::SeqCst, - ); - break; - } - OnStop::Restart => { - thread_shared.status.store( - Status::Running as usize, - std::sync::atomic::Ordering::SeqCst, - ); - } - } - match run_main(&thread_shared).await { - Ok(Ok(NoOutput)) => { - thread_shared - .on_stop - .send(OnStop::Sleep) - .map_err(|_| ()) - .unwrap(); // recv is still in scope, cannot fail - } - Ok(Err(e)) => { - let res = thread_shared.ctx.notification_manager - .notify( - &mut thread_shared.ctx.db.handle(), - Some(thread_shared.manifest.id.clone()), - NotificationLevel::Warning, - String::from("Service Crashed"), - format!("The service {} has crashed with the following exit code: {}\nDetails: {}", thread_shared.manifest.id.clone(), e.0, e.1), - (), - Some(900) // 15 minutes - ) - .await; - match res { - Err(e) => { - // TODO for code review: Do we return this error or just log it? - tracing::error!("Failed to issue notification: {}", e); - tracing::debug!("{:?}", e); - } - Ok(()) => {} - } - tracing::error!("service crashed: {}: {}", e.0, e.1); - } - Err(e) => { - tracing::error!("failed to start service: {}", e); - tracing::debug!("{:?}", e); - } - } + tokio::select! { + _ = manager_thread_loop(recv, &thread_shared) => (), + _ = synchronizer(&*thread_shared) => (), } }); Ok(Manager { @@ -366,90 +305,6 @@ impl Manager { }) } - pub fn status(&self) -> Status { - match self.shared.status.load(std::sync::atomic::Ordering::SeqCst) { - 0 => Status::Running, - 1 => Status::Stopped, - 2 => Status::Paused, - _ => unreachable!(), - } - } - - #[instrument(skip(self))] - pub async fn stop(&self) -> Result<(), Error> { - self.shared.on_stop.send(OnStop::Sleep).map_err(|_| { - Error::new( - eyre!("Manager has already been shutdown"), - crate::ErrorKind::Docker, - ) - })?; - if matches!(self.status(), Status::Paused) { - self.resume().await?; - } - match self - .shared - .ctx - .docker - .stop_container( - &self.shared.container_name, - Some(StopContainerOptions { t: 30 }), - ) - .await - { - Err(bollard::errors::Error::DockerResponseNotFoundError { .. }) - | Err(bollard::errors::Error::DockerResponseConflictError { .. }) - | Err(bollard::errors::Error::DockerResponseNotModifiedError { .. }) => (), // Already stopped - a => a?, - }; - self.shared.status.store( - Status::Stopped as usize, - std::sync::atomic::Ordering::SeqCst, - ); - Ok(()) - } - - #[instrument(skip(self))] - pub async fn start(&self) -> Result<(), Error> { - self.shared.on_stop.send(OnStop::Restart).map_err(|_| { - Error::new( - eyre!("Manager has already been shutdown"), - crate::ErrorKind::Docker, - ) - })?; - self.shared.status.store( - Status::Running as usize, - std::sync::atomic::Ordering::SeqCst, - ); - Ok(()) - } - - #[instrument(skip(self))] - pub async fn pause(&self) -> Result<(), Error> { - self.shared - .ctx - .docker - .pause_container(&self.shared.container_name) - .await?; - self.shared - .status - .store(Status::Paused as usize, std::sync::atomic::Ordering::SeqCst); - Ok(()) - } - - #[instrument(skip(self))] - pub async fn resume(&self) -> Result<(), Error> { - self.shared - .ctx - .docker - .unpause_container(&self.shared.container_name) - .await?; - self.shared.status.store( - Status::Running as usize, - std::sync::atomic::Ordering::SeqCst, - ); - Ok(()) - } - #[instrument(skip(self))] async fn exit(&self) -> Result<(), Error> { let _ = self.shared.on_stop.send(OnStop::Exit); @@ -482,4 +337,160 @@ impl Manager { } Ok(()) } + + pub async fn synchronize(&self) { + self.shared.synchronize_now.notify_waiters(); + self.shared.synchronized.notified().await + } +} + +async fn manager_thread_loop(mut recv: Receiver, thread_shared: &Arc) { + loop { + fn handle_stop_action<'a>( + recv: &'a mut Receiver, + ) -> ( + OnStop, + Option> + 'a>, + ) { + let val = *recv.borrow_and_update(); + match val { + OnStop::Sleep => (OnStop::Sleep, Some(recv.changed())), + a => (a, None), + } + } + let (stop_action, fut) = handle_stop_action(&mut recv); + match stop_action { + OnStop::Sleep => { + if let Some(fut) = fut { + thread_shared.status.store( + Status::Stopped as usize, + std::sync::atomic::Ordering::SeqCst, + ); + fut.await.unwrap(); + continue; + } + } + OnStop::Exit => { + thread_shared.status.store( + Status::Stopped as usize, + std::sync::atomic::Ordering::SeqCst, + ); + break; + } + OnStop::Restart => { + thread_shared.status.store( + Status::Running as usize, + std::sync::atomic::Ordering::SeqCst, + ); + } + } + match run_main(&thread_shared).await { + Ok(Ok(NoOutput)) => { + thread_shared + .on_stop + .send(OnStop::Sleep) + .map_err(|_| ()) + .unwrap(); // recv is still in scope, cannot fail + } + Ok(Err(e)) => { + let res = thread_shared.ctx.notification_manager + .notify( + &mut thread_shared.ctx.db.handle(), + Some(thread_shared.manifest.id.clone()), + NotificationLevel::Warning, + String::from("Service Crashed"), + format!("The service {} has crashed with the following exit code: {}\nDetails: {}", thread_shared.manifest.id.clone(), e.0, e.1), + (), + Some(900) // 15 minutes + ) + .await; + match res { + Err(e) => { + tracing::error!("Failed to issue notification: {}", e); + tracing::debug!("{:?}", e); + } + Ok(()) => {} + } + tracing::error!("service crashed: {}: {}", e.0, e.1); + } + Err(e) => { + tracing::error!("failed to start service: {}", e); + tracing::debug!("{:?}", e); + } + } + } +} + +#[instrument(skip(shared))] +async fn stop(shared: &ManagerSharedState) -> Result<(), Error> { + shared.on_stop.send(OnStop::Sleep).map_err(|_| { + Error::new( + eyre!("Manager has already been shutdown"), + crate::ErrorKind::Docker, + ) + })?; + if matches!( + shared.status.load(Ordering::SeqCst).try_into().unwrap(), + Status::Paused + ) { + resume(shared).await?; + } + match shared + .ctx + .docker + .stop_container(&shared.container_name, Some(StopContainerOptions { t: 30 })) + .await + { + Err(bollard::errors::Error::DockerResponseNotFoundError { .. }) + | Err(bollard::errors::Error::DockerResponseConflictError { .. }) + | Err(bollard::errors::Error::DockerResponseNotModifiedError { .. }) => (), // Already stopped + a => a?, + }; + shared.status.store( + Status::Stopped as usize, + std::sync::atomic::Ordering::SeqCst, + ); + Ok(()) +} + +#[instrument(skip(shared))] +async fn start(shared: &ManagerSharedState) -> Result<(), Error> { + shared.on_stop.send(OnStop::Restart).map_err(|_| { + Error::new( + eyre!("Manager has already been shutdown"), + crate::ErrorKind::Docker, + ) + })?; + shared.status.store( + Status::Running as usize, + std::sync::atomic::Ordering::SeqCst, + ); + Ok(()) +} + +#[instrument(skip(shared))] +async fn pause(shared: &ManagerSharedState) -> Result<(), Error> { + shared + .ctx + .docker + .pause_container(&shared.container_name) + .await?; + shared + .status + .store(Status::Paused as usize, std::sync::atomic::Ordering::SeqCst); + Ok(()) +} + +#[instrument(skip(shared))] +async fn resume(shared: &ManagerSharedState) -> Result<(), Error> { + shared + .ctx + .docker + .unpause_container(&shared.container_name) + .await?; + shared.status.store( + Status::Running as usize, + std::sync::atomic::Ordering::SeqCst, + ); + Ok(()) } diff --git a/appmgr/src/manager/sync.rs b/appmgr/src/manager/sync.rs new file mode 100644 index 000000000..65800913f --- /dev/null +++ b/appmgr/src/manager/sync.rs @@ -0,0 +1,79 @@ +use std::convert::TryInto; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use chrono::Utc; + +use super::{pause, resume, start, stop, ManagerSharedState, Status}; +use crate::status::MainStatus; +use crate::Error; + +/// Allocates a db handle. DO NOT CALL with a db handle already in scope +async fn synchronize_once(shared: &ManagerSharedState) -> Result<(), Error> { + let mut db = shared.ctx.db.handle(); + let mut status = crate::db::DatabaseModel::new() + .package_data() + .idx_model(&shared.manifest.id) + .expect(&mut db) + .await? + .installed() + .expect(&mut db) + .await? + .status() + .main() + .get_mut(&mut db) + .await?; + match shared.status.load(Ordering::SeqCst).try_into().unwrap() { + Status::Stopped => match &mut *status { + MainStatus::Stopped => (), + MainStatus::Stopping => { + *status = MainStatus::Stopped; + } + MainStatus::Running { started, .. } => { + *started = Utc::now(); + start(shared).await?; + } + MainStatus::BackingUp { .. } => (), + }, + Status::Running => match *status { + MainStatus::Stopped | MainStatus::Stopping => { + stop(shared).await?; + } + MainStatus::Running { .. } => (), + MainStatus::BackingUp { .. } => { + pause(shared).await?; + } + }, + Status::Paused => match *status { + MainStatus::Stopped | MainStatus::Stopping => { + stop(shared).await?; + } + MainStatus::Running { .. } => { + resume(shared).await?; + } + MainStatus::BackingUp { .. } => (), + }, + } + status.save(&mut db).await?; + Ok(()) +} + +pub async fn synchronizer(shared: &ManagerSharedState) { + loop { + if let Err(e) = synchronize_once(shared).await { + tracing::error!( + "Synchronizer for {}@{} failed: {}", + shared.manifest.id, + shared.manifest.version, + e + ); + tracing::debug!("{:?}", e); + } else { + shared.synchronized.notify_waiters(); + } + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(5)) => (), + _ = shared.synchronize_now.notified() => (), + } + } +} diff --git a/appmgr/src/status/mod.rs b/appmgr/src/status/mod.rs index 4bacd6a31..ee9fcb42e 100644 --- a/appmgr/src/status/mod.rs +++ b/appmgr/src/status/mod.rs @@ -1,7 +1,6 @@ use std::collections::BTreeMap; use chrono::{DateTime, Utc}; -use color_eyre::eyre::eyre; use patch_db::{DbHandle, HasModel}; use serde::{Deserialize, Serialize}; use tracing::instrument; @@ -9,64 +8,12 @@ use tracing::instrument; use self::health_check::HealthCheckId; use crate::context::RpcContext; use crate::dependencies::DependencyErrors; -use crate::manager::{Manager, Status as ManagerStatus}; use crate::notifications::NotificationLevel; use crate::s9pk::manifest::Manifest; use crate::status::health_check::HealthCheckResult; use crate::Error; pub mod health_check; - -#[instrument(skip(ctx))] -pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> { - let mut db = ctx.db.handle(); - let pkg_ids = crate::db::DatabaseModel::new() - .package_data() - .keys(&mut db, false) - .await?; - for id in pkg_ids { - if let Err(e) = async { - let (mut status, manager) = if let Some(installed) = crate::db::DatabaseModel::new() - .package_data() - .idx_model(&id) - .and_then(|m| m.installed()) - .check(&mut db) - .await? - { - ( - installed.clone().status().get_mut(&mut db).await?, - ctx.managers - .get(&( - id.clone(), - installed - .manifest() - .version() - .get(&mut db, true) - .await? - .to_owned(), - )) - .await - .ok_or_else(|| Error::new(eyre!("No Manager"), crate::ErrorKind::Docker))?, - ) - } else { - return Ok::<_, Error>(()); - }; - - let res = status.main.synchronize(&manager).await?; - - status.save(&mut db).await?; - - Ok(res) - } - .await - { - tracing::error!("Error syncronizing status of {}: {}", id, e); - tracing::debug!("{:?}", e); - } - } - Ok(()) -} - #[derive(Clone, Debug, Deserialize, Serialize, HasModel)] #[serde(rename_all = "kebab-case")] pub struct Status { @@ -90,47 +37,8 @@ pub enum MainStatus { started: Option>, health: BTreeMap, }, - Restoring { - running: bool, - }, } impl MainStatus { - #[instrument(skip(manager))] - pub async fn synchronize(&mut self, manager: &Manager) -> Result<(), Error> { - match manager.status() { - ManagerStatus::Stopped => match self { - MainStatus::Stopped => (), - MainStatus::Stopping => { - *self = MainStatus::Stopped; - } - MainStatus::Running { started, .. } => { - *started = Utc::now(); - manager.start().await?; - } - MainStatus::BackingUp { .. } => (), - MainStatus::Restoring { .. } => (), - }, - ManagerStatus::Running => match self { - MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restoring { .. } => { - manager.stop().await?; - } - MainStatus::Running { .. } => (), - MainStatus::BackingUp { .. } => { - manager.pause().await?; - } - }, - ManagerStatus::Paused => match self { - MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restoring { .. } => { - manager.stop().await?; - } - MainStatus::Running { .. } => { - manager.resume().await?; - } - MainStatus::BackingUp { .. } => (), - }, - } - Ok(()) - } #[instrument(skip(ctx, db, manifest))] pub async fn check( &mut self, @@ -189,8 +97,7 @@ impl MainStatus { MainStatus::Running { .. } | MainStatus::BackingUp { started: Some(_), .. - } - | MainStatus::Restoring { running: true } => true, + } => true, _ => false, } } @@ -202,9 +109,6 @@ impl MainStatus { MainStatus::BackingUp { started, .. } => { *started = None; } - MainStatus::Restoring { running } => { - *running = false; - } _ => (), } }