diff --git a/appmgr/src/config/mod.rs b/appmgr/src/config/mod.rs index 49a587e87..93345c1f8 100644 --- a/appmgr/src/config/mod.rs +++ b/appmgr/src/config/mod.rs @@ -161,7 +161,7 @@ pub async fn get( .clone() .manifest() .config() - .get(&mut db) + .get(&mut db, true) .await? .to_owned() .ok_or_else(|| { @@ -170,8 +170,13 @@ pub async fn get( crate::ErrorKind::NotFound, ) })?; - let version = pkg_model.clone().manifest().version().get(&mut db).await?; - let volumes = pkg_model.manifest().volumes().get(&mut db).await?; + let version = pkg_model + .clone() + .manifest() + .version() + .get(&mut db, true) + .await?; + let volumes = pkg_model.manifest().volumes().get(&mut db, true).await?; action.get(ctx.extension(), &*version, &*volumes).await } @@ -295,15 +300,20 @@ pub fn configure<'a, Db: DbHandle>( .clone() .manifest() .config() - .get(db) + .get(db, true) .await? .to_owned() .ok_or_else(|| { Error::new(anyhow!("{} has no config", id), crate::ErrorKind::NotFound) })?; - let version = pkg_model.clone().manifest().version().get(db).await?; - let dependencies = pkg_model.clone().manifest().dependencies().get(db).await?; - let volumes = pkg_model.clone().manifest().volumes().get(db).await?; + let version = pkg_model.clone().manifest().version().get(db, true).await?; + let dependencies = pkg_model + .clone() + .manifest() + .dependencies() + .get(db, true) + .await?; + let volumes = pkg_model.clone().manifest().volumes().get(db, true).await?; // get current config and current spec let ConfigRes { @@ -406,7 +416,7 @@ pub fn configure<'a, Db: DbHandle>( overrides.insert(id.clone(), config.clone()); // handle dependents - let dependents = pkg_model.clone().current_dependents().get(db).await?; + let dependents = pkg_model.clone().current_dependents().get(db, true).await?; let prev = old_config.map(Value::Object).unwrap_or_default(); let next = Value::Object(config.clone()); for (dependent, dep_info) in &*dependents { @@ -447,12 +457,12 @@ pub fn configure<'a, Db: DbHandle>( .idx_model(dependency) .expect(db) .await? - .get(db) + .get(db, true) .await? .critical { status.main.stop(); - let dependents = model.current_dependents().get(db).await?; + let dependents = model.current_dependents().get(db, true).await?; for (dependent, _) in &*dependents { let dependent_model = crate::db::DatabaseModel::new() .package_data() @@ -496,10 +506,15 @@ pub fn configure<'a, Db: DbHandle>( .expect(db) .await? .config() - .get(db) + .get(db, true) .await? { - let version = dependent_model.clone().manifest().version().get(db).await?; + let version = dependent_model + .clone() + .manifest() + .version() + .get(db, true) + .await?; if let Err(error) = cfg.check(dependent, &*version, &config).await? { let dep_err = DependencyError::ConfigUnsatisfied { error }; handle_broken_dependents( diff --git a/appmgr/src/config/spec.rs b/appmgr/src/config/spec.rs index ee3ccc258..fac60719c 100644 --- a/appmgr/src/config/spec.rs +++ b/appmgr/src/config/spec.rs @@ -1564,7 +1564,7 @@ impl PackagePointerSpec { .and_then(|pde| pde.installed()) .and_then(|installed| installed.interface_addresses().idx_model(interface)) .and_then(|addresses| addresses.tor_address()) - .get(db) + .get(db, true) .await .map_err(|e| ConfigurationError::SystemError(Error::from(e)))?; Ok(addr.to_owned().map(Value::String).unwrap_or(Value::Null)) @@ -1576,7 +1576,7 @@ impl PackagePointerSpec { .and_then(|pde| pde.installed()) .and_then(|installed| installed.interface_addresses().idx_model(interface)) .and_then(|addresses| addresses.lan_address()) - .get(db) + .get(db, true) .await .map_err(|e| ConfigurationError::SystemError(Error::from(e)))?; Ok(addr.to_owned().map(Value::String).unwrap_or(Value::Null)) @@ -1594,18 +1594,18 @@ impl PackagePointerSpec { let version = manifest_model .clone() .map(|manifest| manifest.version()) - .get(db) + .get(db, true) .await .map_err(|e| ConfigurationError::SystemError(Error::from(e)))?; let cfg_actions = manifest_model .clone() .and_then(|manifest| manifest.config()) - .get(db) + .get(db, true) .await .map_err(|e| ConfigurationError::SystemError(Error::from(e)))?; let volumes = manifest_model .map(|manifest| manifest.volumes()) - .get(db) + .get(db, true) .await .map_err(|e| ConfigurationError::SystemError(Error::from(e)))?; if let (Some(version), Some(cfg_actions), Some(volumes)) = diff --git a/appmgr/src/context/rpc.rs b/appmgr/src/context/rpc.rs index 2f7b3f5e5..cd2a2a8f5 100644 --- a/appmgr/src/context/rpc.rs +++ b/appmgr/src/context/rpc.rs @@ -93,7 +93,7 @@ impl RpcContext { Ok(crate::db::DatabaseModel::new() .server_info() .registry() - .get(&mut self.db.handle()) + .get(&mut self.db.handle(), false) .await? .to_owned()) } diff --git a/appmgr/src/dependencies.rs b/appmgr/src/dependencies.rs index f2504ccfa..14651a349 100644 --- a/appmgr/src/dependencies.rs +++ b/appmgr/src/dependencies.rs @@ -144,8 +144,8 @@ impl DepInfo { .await? { ( - dep_model.clone().manifest().get(db).await?, - dep_model.get(db).await?, + dep_model.clone().manifest().get(db, true).await?, + dep_model.get(db, true).await?, ) } else { return Ok(Err(DependencyError::NotInstalled)); diff --git a/appmgr/src/install/mod.rs b/appmgr/src/install/mod.rs index 2e80dad2c..d52b67198 100644 --- a/appmgr/src/install/mod.rs +++ b/appmgr/src/install/mod.rs @@ -74,8 +74,6 @@ pub async fn download_install_s9pk( let pkg_id = &temp_manifest.id; let version = &temp_manifest.version; - let mut db = ctx.db.handle(); - let pkg_cache_dir = Path::new(PKG_CACHE).join(pkg_id).join(version.as_str()); tokio::fs::create_dir_all(&pkg_cache_dir).await?; let pkg_cache = AsRef::::as_ref(pkg_id).with_extension("s9pk"); @@ -87,7 +85,8 @@ pub async fn download_install_s9pk( let res = (|| async { let progress = InstallProgress::new(s9pk.content_length()); let static_files = StaticFiles::remote(pkg_id, version, temp_manifest.assets.icon_type()); - let mut pde = pkg_data_entry.get_mut(&mut db).await?; + let mut db_handle = ctx.db.handle(); + let mut pde = pkg_data_entry.get_mut(&mut db_handle).await?; match pde.take() { Some(PackageDataEntry::Installed { installed, @@ -115,7 +114,8 @@ pub async fn download_install_s9pk( )) } } - pde.save(&mut db).await?; + pde.save(&mut db_handle).await?; + drop(db_handle); let progress_model = pkg_data_entry.and_then(|pde| pde.install_progress()); async fn check_cache( @@ -153,9 +153,7 @@ pub async fn download_install_s9pk( pkg_id, version, progress - .track_read_during(model, &ctx.db, db, || { - S9pkReader::from_reader(progress_reader) - }) + .track_read_during(model, &ctx.db, || S9pkReader::from_reader(progress_reader)) .await, )?; if hash.as_bytes() == rdr.hash_str().as_bytes() { @@ -172,7 +170,7 @@ pub async fn download_install_s9pk( &progress, progress_model.clone(), &ctx, - &mut db, + &mut ctx.db.handle(), ) .await; @@ -188,7 +186,7 @@ pub async fn download_install_s9pk( .await?; progress - .track_download_during(progress_model.clone(), &ctx.db, &mut db, || async { + .track_download_during(progress_model.clone(), &ctx.db, || async { let mut progress_writer = InstallProgressTracker::new(&mut dst, progress.clone()); tokio::io::copy( @@ -216,25 +214,26 @@ pub async fn download_install_s9pk( let progress_reader = InstallProgressTracker::new(dst, progress.clone()); let rdr = progress - .track_read_during(progress_model.clone(), &ctx.db, &mut db, || { + .track_read_during(progress_model.clone(), &ctx.db, || { S9pkReader::from_reader(progress_reader) }) .await?; rdr }; - install_s9pk(&ctx, &mut db, pkg_id, version, &mut s9pk_reader, progress).await?; + install_s9pk(&ctx, pkg_id, version, &mut s9pk_reader, progress).await?; Ok(()) })() .await; if let Err(e) = res { + let mut handle = ctx.db.handle(); let mut broken = crate::db::DatabaseModel::new() .broken_packages() - .get_mut(&mut db) + .get_mut(&mut handle) .await?; broken.push(pkg_id.clone()); - broken.save(&mut db).await?; + broken.save(&mut handle).await?; Err(e) } else { Ok(()) @@ -243,7 +242,6 @@ pub async fn download_install_s9pk( pub async fn install_s9pk( ctx: &RpcContext, - mut db: &mut PatchDbHandle, pkg_id: &PackageId, version: &Version, rdr: &mut S9pkReader>, @@ -254,7 +252,7 @@ pub async fn install_s9pk( let model = crate::db::DatabaseModel::new() .package_data() .idx_model(pkg_id) - .check(db) + .check(&mut ctx.db.handle()) .await? .ok_or_else(|| { Error::new( @@ -266,7 +264,7 @@ pub async fn install_s9pk( log::info!("Install {}@{}: Unpacking Manifest", pkg_id, version); let manifest = progress - .track_read_during(progress_model.clone(), &ctx.db, db, || rdr.manifest()) + .track_read_during(progress_model.clone(), &ctx.db, || rdr.manifest()) .await?; log::info!("Install {}@{}: Unpacked Manifest", pkg_id, version); @@ -277,7 +275,7 @@ pub async fn install_s9pk( log::info!("Install {}@{}: Unpacking LICENSE.md", pkg_id, version); progress - .track_read_during(progress_model.clone(), &ctx.db, db, || async { + .track_read_during(progress_model.clone(), &ctx.db, || async { let license_path = public_dir_path.join("LICENSE.md"); let mut dst = File::create(&license_path).await?; tokio::io::copy(&mut rdr.license().await?, &mut dst).await?; @@ -289,7 +287,7 @@ pub async fn install_s9pk( log::info!("Install {}@{}: Unpacking INSTRUCTIONS.md", pkg_id, version); progress - .track_read_during(progress_model.clone(), &ctx.db, db, || async { + .track_read_during(progress_model.clone(), &ctx.db, || async { let instructions_path = public_dir_path.join("INSTRUCTIONS.md"); let mut dst = File::create(&instructions_path).await?; tokio::io::copy(&mut rdr.instructions().await?, &mut dst).await?; @@ -307,7 +305,7 @@ pub async fn install_s9pk( icon_path.display() ); progress - .track_read_during(progress_model.clone(), &ctx.db, db, || async { + .track_read_during(progress_model.clone(), &ctx.db, || async { let icon_path = public_dir_path.join(&icon_path); let mut dst = File::create(&icon_path).await?; tokio::io::copy(&mut rdr.icon().await?, &mut dst).await?; @@ -324,7 +322,7 @@ pub async fn install_s9pk( log::info!("Install {}@{}: Unpacking Docker Images", pkg_id, version); progress - .track_read_during(progress_model.clone(), &ctx.db, db, || async { + .track_read_during(progress_model.clone(), &ctx.db, || async { let mut load = tokio::process::Command::new("docker") .arg("load") .stdin(Stdio::piped()) @@ -359,9 +357,10 @@ pub async fn install_s9pk( progress.read_complete.store(true, Ordering::SeqCst); - progress_model.put(&mut db, &progress).await?; + progress_model.put(&mut ctx.db.handle(), &progress).await?; - let mut tx = db.begin().await?; + let mut handle = ctx.db.handle(); + let mut tx = handle.begin().await?; let mut sql_tx = ctx.secret_store.begin().await?; log::info!("Install {}@{}: Creating manager", pkg_id, version); @@ -398,7 +397,7 @@ pub async fn install_s9pk( let mut deps = IndexMap::new(); for package in crate::db::DatabaseModel::new() .package_data() - .keys(&mut tx) + .keys(&mut tx, true) .await? { if let Some(dep) = crate::db::DatabaseModel::new() @@ -408,7 +407,7 @@ pub async fn install_s9pk( .await? .installed() .and_then(|i| i.current_dependencies().idx_model(pkg_id)) - .get(&mut tx) + .get(&mut tx, true) .await? .to_owned() { diff --git a/appmgr/src/install/progress.rs b/appmgr/src/install/progress.rs index d0b5b6bd2..42ef58700 100644 --- a/appmgr/src/install/progress.rs +++ b/appmgr/src/install/progress.rs @@ -43,18 +43,13 @@ impl InstallProgress { self: Arc, model: OptionModel, mut db: Db, - ) -> (Db, Result<(), Error>) { + ) -> Result<(), Error> { while !self.download_complete.load(Ordering::SeqCst) { - if let Err(e) = model.put(&mut db, &self).await { - return (db, Err(e.into())); - } + model.put(&mut db, &self).await?; tokio::time::sleep(Duration::from_secs(1)).await; } - if let Err(e) = model.put(&mut db, &self).await { - (db, Err(e.into())) - } else { - (db, Ok(())) - } + model.put(&mut db, &self).await?; + Ok(()) } pub async fn track_download_during< F: FnOnce() -> Fut, @@ -64,16 +59,13 @@ impl InstallProgress { self: &Arc, model: OptionModel, db: &PatchDb, - handle: &mut PatchDbHandle, f: F, ) -> Result { - let local_db = std::mem::replace(handle, db.handle()); + let local_db = db.handle(); let tracker = tokio::spawn(self.clone().track_download(model.clone(), local_db)); let res = f().await; self.download_complete.store(true, Ordering::SeqCst); - let (local_db, tracker_res) = tracker.await.unwrap(); - let _ = std::mem::replace(handle, local_db); - tracker_res?; + tracker.await.unwrap()?; res } pub async fn track_read( @@ -81,14 +73,13 @@ impl InstallProgress { model: OptionModel, mut db: Db, complete: Arc, - ) -> (Db, Result<(), Error>) { + ) -> Result<(), Error> { while !complete.load(Ordering::SeqCst) { - if let Err(e) = model.put(&mut db, &self).await { - return (db, Err(e.into())); - } + model.put(&mut db, &self).await?; tokio::time::sleep(Duration::from_secs(1)).await; } - (db, Ok(())) + model.put(&mut db, &self).await?; + Ok(()) } pub async fn track_read_during< F: FnOnce() -> Fut, @@ -98,10 +89,9 @@ impl InstallProgress { self: &Arc, model: OptionModel, db: &PatchDb, - handle: &mut PatchDbHandle, f: F, ) -> Result { - let local_db = std::mem::replace(handle, db.handle()); + let local_db = db.handle(); let complete = Arc::new(AtomicBool::new(false)); let tracker = tokio::spawn(self.clone().track_read( model.clone(), @@ -110,9 +100,7 @@ impl InstallProgress { )); let res = f().await; complete.store(true, Ordering::SeqCst); - let (local_db, tracker_res) = tracker.await.unwrap(); - let _ = std::mem::replace(handle, local_db); - tracker_res?; + tracker.await.unwrap()?; res } } diff --git a/appmgr/src/manager/mod.rs b/appmgr/src/manager/mod.rs index e59456366..3223b83aa 100644 --- a/appmgr/src/manager/mod.rs +++ b/appmgr/src/manager/mod.rs @@ -1,12 +1,12 @@ use std::collections::HashMap; use std::future::Future; use std::net::Ipv4Addr; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::task::Poll; -use std::time::Duration; use anyhow::anyhow; +use bollard::container::StopContainerOptions; use bollard::Docker; use patch_db::{DbHandle, PatchDbHandle}; use sqlx::{Executor, Sqlite}; @@ -14,17 +14,13 @@ use tokio::sync::watch::error::RecvError; use tokio::sync::watch::{channel, Receiver, Sender}; use tokio::sync::RwLock; use tokio::task::JoinHandle; -use tokio_stream::wrappers::WatchStream; use torut::onion::TorSecretKeyV3; use crate::action::docker::DockerAction; -use crate::context::RpcContext; use crate::net::interface::InterfaceId; -use crate::net::mdns::MdnsController; -use crate::net::tor::TorController; use crate::net::NetController; use crate::s9pk::manifest::{Manifest, PackageId}; -use crate::util::Version; +use crate::util::{Container, Version}; use crate::{Error, ResultExt}; pub struct ManagerMap(RwLock>>); @@ -49,8 +45,10 @@ impl ManagerMap { ) -> Result<(), Error> { let mut lock = self.0.write().await; let id = (manifest.id.clone(), manifest.version.clone()); - if lock.contains_key(&id) { - return Ok(()); + if let Some(man) = lock.get(&id) { + if !man.thread.is_empty().await { + return Ok(()); + } } lock.insert( id, @@ -60,7 +58,11 @@ impl ManagerMap { } pub async fn remove(&self, id: &(PackageId, Version)) { - self.0.write().await.remove(id); + if let Some(man) = self.0.write().await.remove(id) { + if let Err(e) = man.exit().await { + log::error!("Error shutting down manager: {}", e); + } + } } pub async fn get(&self, id: &(PackageId, Version)) -> Option> { @@ -69,8 +71,24 @@ impl ManagerMap { } pub struct Manager { + shared: Arc, + thread: Container>, +} + +pub enum Status { + Running = 0, + Stopped = 1, + Paused = 2, +} + +struct ManagerSharedState { + status: AtomicUsize, on_stop: Sender, - thread: JoinHandle<()>, + docker: Docker, + net_ctl: Arc, + manifest: Manifest, + container_name: String, + tor_keys: HashMap, } #[derive(Clone, Copy)] @@ -80,21 +98,17 @@ pub enum OnStop { Exit, } -async fn run_main( - docker: &Docker, - net_ctl: &NetController, - manifest: &Manifest, - tor_keys: &HashMap, -) -> Result, Error> { - let rt_manifest = manifest.clone(); +async fn run_main(state: &Arc) -> Result, Error> { + let rt_state = state.clone(); let mut runtime = tokio::spawn(async move { - rt_manifest + rt_state + .manifest .main .execute::<(), ()>( - &rt_manifest.id, - &rt_manifest.version, + &rt_state.manifest.id, + &rt_state.manifest.version, None, - &rt_manifest.volumes, + &rt_state.manifest.volumes, None, false, ) @@ -102,8 +116,9 @@ async fn run_main( }); let mut ip = None::; loop { - match docker - .inspect_container(&DockerAction::container_name(&manifest.id, None), None) + match state + .docker + .inspect_container(&state.container_name, None) .await { Ok(res) => { @@ -140,11 +155,13 @@ async fn run_main( ) })?; - net_ctl + state + .net_ctl .add( - &manifest.id, + &state.manifest.id, ip, - manifest + state + .manifest .interfaces .0 .iter() @@ -152,7 +169,8 @@ async fn run_main( Ok(( id.clone(), info, - tor_keys + state + .tor_keys .get(id) .ok_or_else(|| { Error::new( @@ -175,7 +193,10 @@ async fn run_main( ) }) .and_then(|a| a); - net_ctl.remove(&manifest.id, manifest.interfaces.0.keys().cloned()); + state.net_ctl.remove( + &state.manifest.id, + state.manifest.interfaces.0.keys().cloned(), + ); res } @@ -187,6 +208,16 @@ impl Manager { tor_keys: HashMap, ) -> Result { let (on_stop, mut recv) = channel(OnStop::Sleep); + let shared = Arc::new(ManagerSharedState { + status: AtomicUsize::new(Status::Stopped as usize), + on_stop, + docker, + net_ctl, + container_name: DockerAction::container_name(&manifest.id, None), + manifest, + tor_keys, + }); + let thread_shared = shared.clone(); let thread = tokio::spawn(async move { loop { fn handle_stop_action<'a>( @@ -205,17 +236,36 @@ impl Manager { match stop_action { OnStop::Sleep => { if let Some(fut) = fut { + thread_shared.status.store( + Status::Stopped as usize, + std::sync::atomic::Ordering::SeqCst, + ); fut.await.unwrap(); continue; } } OnStop::Exit => { + thread_shared.status.store( + Status::Stopped as usize, + std::sync::atomic::Ordering::SeqCst, + ); break; } - OnStop::Restart => (), + OnStop::Restart => { + thread_shared.status.store( + Status::Running as usize, + std::sync::atomic::Ordering::SeqCst, + ); + } } - match run_main(&docker, &*net_ctl, &manifest, &tor_keys).await { - Ok(Ok(())) => break, + match run_main(&thread_shared).await { + Ok(Ok(())) => { + thread_shared + .on_stop + .send(OnStop::Sleep) + .map_err(|_| ()) + .unwrap(); // recv is still in scope, cannot fail + } Ok(Err(e)) => { todo!("application crashed") } @@ -225,12 +275,115 @@ impl Manager { } } }); - Ok(Manager { on_stop, thread }) + Ok(Manager { + shared, + thread: Container::new(Some(thread)), + }) } -} -impl Drop for Manager { - fn drop(&mut self) { - let _ = self.on_stop.send(OnStop::Exit); + pub fn status(&self) -> Status { + match self.shared.status.load(std::sync::atomic::Ordering::SeqCst) { + 0 => Status::Running, + 1 => Status::Stopped, + 2 => Status::Paused, + _ => unreachable!(), + } + } + + pub async fn stop(&self) -> Result<(), Error> { + self.shared.on_stop.send(OnStop::Sleep).map_err(|_| { + Error::new( + anyhow!("Manager has already been shutdown"), + crate::ErrorKind::Docker, + ) + })?; + if matches!(self.status(), Status::Paused) { + self.resume().await?; + } + match self + .shared + .docker + .stop_container( + &self.shared.container_name, + Some(StopContainerOptions { t: 30 }), + ) + .await + { + Err(bollard::errors::Error::DockerResponseNotFoundError { .. }) + | Err(bollard::errors::Error::DockerResponseConflictError { .. }) => (), // Already stopped + a => a?, + }; + self.shared.status.store( + Status::Stopped as usize, + std::sync::atomic::Ordering::SeqCst, + ); + Ok(()) + } + + pub async fn start(&self) -> Result<(), Error> { + self.shared.on_stop.send(OnStop::Restart).map_err(|_| { + Error::new( + anyhow!("Manager has already been shutdown"), + crate::ErrorKind::Docker, + ) + })?; + self.shared.status.store( + Status::Running as usize, + std::sync::atomic::Ordering::SeqCst, + ); + Ok(()) + } + + pub async fn pause(&self) -> Result<(), Error> { + self.shared + .docker + .pause_container(&self.shared.container_name) + .await?; + self.shared + .status + .store(Status::Paused as usize, std::sync::atomic::Ordering::SeqCst); + Ok(()) + } + + pub async fn resume(&self) -> Result<(), Error> { + self.shared + .docker + .unpause_container(&self.shared.container_name) + .await?; + self.shared.status.store( + Status::Running as usize, + std::sync::atomic::Ordering::SeqCst, + ); + Ok(()) + } + + async fn exit(&self) -> Result<(), Error> { + let _ = self.shared.on_stop.send(OnStop::Exit); + match self + .shared + .docker + .stop_container( + &self.shared.container_name, + Some(StopContainerOptions { t: 30 }), + ) + .await + { + Err(bollard::errors::Error::DockerResponseNotFoundError { .. }) + | Err(bollard::errors::Error::DockerResponseConflictError { .. }) => (), + a => a?, + }; + self.shared.status.store( + Status::Stopped as usize, + std::sync::atomic::Ordering::SeqCst, + ); + if let Some(thread) = self.thread.take().await { + thread.await.map_err(|e| { + Error::new( + anyhow!("Manager thread panicked: {}", e), + crate::ErrorKind::Docker, + ) + })?; + } + Ok(()) } } diff --git a/appmgr/src/net/mdns.rs b/appmgr/src/net/mdns.rs index 63d702e00..00fff5acf 100644 --- a/appmgr/src/net/mdns.rs +++ b/appmgr/src/net/mdns.rs @@ -119,7 +119,7 @@ impl MdnsControllerInner { } } fn add<'a, I: IntoIterator>( - &self, + &mut self, pkg_id: &PackageId, interfaces: I, ) { @@ -130,7 +130,7 @@ impl MdnsControllerInner { ); self.sync(); } - fn remove>(&self, pkg_id: &PackageId, interfaces: I) { + fn remove>(&mut self, pkg_id: &PackageId, interfaces: I) { for interface_id in interfaces { self.services.remove(&(pkg_id.clone(), interface_id)); } diff --git a/appmgr/src/status/mod.rs b/appmgr/src/status/mod.rs index 2c373852c..b39b05879 100644 --- a/appmgr/src/status/mod.rs +++ b/appmgr/src/status/mod.rs @@ -18,6 +18,7 @@ use crate::db::model::{ CurrentDependencyInfo, InstalledPackageDataEntryModel, PackageDataEntryModel, }; use crate::dependencies::{Dependencies, DependencyError}; +use crate::manager::{Manager, Status as ManagerStatus}; use crate::net::interface::InterfaceId; use crate::s9pk::manifest::{Manifest, PackageId}; use crate::status::health_check::HealthCheckResultVariant; @@ -31,7 +32,7 @@ pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> { let mut db = ctx.db.handle(); let mut pkg_ids = crate::db::DatabaseModel::new() .package_data() - .keys(&mut db) + .keys(&mut db, true) .await?; let mut container_names = Vec::with_capacity(pkg_ids.len()); for id in pkg_ids.clone().into_iter() { @@ -42,7 +43,7 @@ pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> { .await? .installed() .map(|i| i.manifest().version()) - .get(&mut db) + .get(&mut db, true) .await? { container_names.push(DockerAction::container_name(id.as_ref(), None)); @@ -61,10 +62,10 @@ pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> { filters, })) .await?; - let mut fuckening = false; for summary in info { let id = if let Some(id) = summary.names.iter().flatten().find_map(|s| { - DockerAction::uncontainer_name(s.as_str()).and_then(|(id, _)| pkg_ids.take(id)) + // DockerAction::uncontainer_name(s.as_str()).and_then(|(id, _)| pkg_ids.take(&id)) + todo!() }) { id } else { @@ -75,7 +76,7 @@ pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> { id: &PackageId, db: &mut Db, summary: &ContainerSummaryInner, - ) -> Result { + ) -> Result<(), Error> { let pkg_data = crate::db::DatabaseModel::new() .package_data() .idx_model(id) @@ -91,32 +92,23 @@ pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> { if let Some(installed) = pkg_data.installed().check(db).await? { ( installed.clone().status().get_mut(db).await?, - installed.manifest().get(db).await?, + installed.manifest().get(db, true).await?, ) } else { - return Ok(false); + return Ok(()); }; - let res = status.main.synchronize(docker, &*manifest, summary).await?; + let res = status.main.synchronize(todo!()).await?; status.save(db).await?; Ok(res) } - match status(&ctx.docker, &id, &mut db, &summary).await { - Ok(a) => fuckening |= a, - Err(e) => log::error!("Error syncronizing status of {}: {}", id, e), + if let Err(e) = status(&ctx.docker, &id, &mut db, &summary).await { + log::error!("Error syncronizing status of {}: {}", id, e); } } - if fuckening { - tokio::process::Command::new("service") - .arg("docker") - .arg("restart") - .invoke(crate::ErrorKind::Docker) - .await?; - } - for id in pkg_ids { log::warn!("No container for {}", id); } @@ -126,17 +118,9 @@ pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> { pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> { let mut db = ctx.db.handle(); - let hosts = Arc::new( - crate::db::DatabaseModel::new() - .network() - .hosts() - .get(&mut db) - .await? - .to_owned(), - ); let pkg_ids = crate::db::DatabaseModel::new() .package_data() - .keys(&mut db) + .keys(&mut db, true) .await?; let mut status_manifest = Vec::with_capacity(pkg_ids.len()); let mut status_deps = Vec::with_capacity(pkg_ids.len()); @@ -155,11 +139,11 @@ pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> { if let Some(installed) = model.installed().check(&mut db).await? { status_manifest.push(( installed.clone().status(), - Arc::new(installed.clone().manifest().get(&mut db).await?), + Arc::new(installed.clone().manifest().get(&mut db, true).await?), )); status_deps.push(( installed.clone().status(), - Arc::new(installed.current_dependencies().get(&mut db).await?), + Arc::new(installed.current_dependencies().get(&mut db, true).await?), )); } } @@ -181,30 +165,25 @@ pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> { } let (status_sender, mut statuses_recv) = tokio::sync::mpsc::channel(status_manifest.len() + 1); let mut statuses = HashMap::with_capacity(status_manifest.len()); - futures::stream::iter( - status_manifest - .into_iter() - .zip(pkg_ids.clone()) - .zip(std::iter::repeat(hosts)), - ) - .for_each_concurrent(None, move |(((status, manifest), id), hosts)| { - let status_sender = status_sender.clone(); - async move { - match tokio::spawn(main_status(status, manifest, ctx.db.handle())) - .await - .unwrap() - { - Err(e) => { - log::error!("Error running main health check for {}: {}", id, e); - log::debug!("{:?}", e); - } - Ok(status) => { - status_sender.send((id, status)).await.expect("unreachable"); + futures::stream::iter(status_manifest.into_iter().zip(pkg_ids.clone())) + .for_each_concurrent(None, move |((status, manifest), id)| { + let status_sender = status_sender.clone(); + async move { + match tokio::spawn(main_status(status, manifest, ctx.db.handle())) + .await + .unwrap() + { + Err(e) => { + log::error!("Error running main health check for {}: {}", id, e); + log::debug!("{:?}", e); + } + Ok(status) => { + status_sender.send((id, status)).await.expect("unreachable"); + } } } - } - }) - .await; + }) + .await; while let Some((id, status)) = statuses_recv.recv().await { statuses.insert(id, status); } @@ -271,75 +250,40 @@ pub enum MainStatus { }, } impl MainStatus { - pub async fn synchronize( - &mut self, - docker: &Docker, - manifest: &Manifest, - summary: &ContainerSummaryInner, - ) -> Result { - // true if Docker Fuckening - async fn check_fuckening(docker: &Docker, manifest: &Manifest) -> Result { - Ok(docker - .inspect_container( - &DockerAction::container_name(&manifest.id, &manifest.version), - None, - ) - .await? - .state - .as_ref() - .and_then(|s| s.status) - == Some(ContainerStateStatusEnum::RUNNING)) - } - let name = DockerAction::container_name(&manifest.id, None); - let state = summary.state.as_ref().map(|s| s.as_str()); - match state { - Some("created") | Some("exited") => match self { + pub async fn synchronize(&mut self, manager: &Manager) -> Result<(), Error> { + match manager.status() { + ManagerStatus::Stopped => match self { MainStatus::Stopped => (), MainStatus::Stopping => { *self = MainStatus::Stopped; } MainStatus::Running { started, .. } => { *started = Utc::now(); - docker - .start_container(&name, None::>) - .await?; + manager.start().await?; } MainStatus::BackingUp { .. } => (), MainStatus::Restoring { .. } => (), }, - Some("running") | Some("restarting") => match self { + ManagerStatus::Running => match self { MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restoring { .. } => { - docker - .stop_container(&name, Some(StopContainerOptions { t: 30 })) - .await?; - return check_fuckening(docker, manifest).await; + manager.stop().await?; } MainStatus::Running { .. } => (), MainStatus::BackingUp { .. } => { - docker.pause_container(&name).await?; + manager.pause().await?; } }, - Some("paused") => match self { + ManagerStatus::Paused => match self { MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restoring { .. } => { - docker.unpause_container(&name).await?; - docker - .stop_container(&name, Some(StopContainerOptions { t: 30 })) - .await?; - return check_fuckening(docker, manifest).await; + manager.stop().await?; } MainStatus::Running { .. } => { - docker.unpause_container(&name).await?; + manager.resume().await?; } MainStatus::BackingUp { .. } => (), }, - unknown => { - return Err(Error::new( - anyhow!("Unexpected Docker Status: {:?}", unknown), - crate::ErrorKind::Docker, - )); - } } - Ok(false) + Ok(()) } pub async fn check(&mut self, manifest: &Manifest) -> Result<(), Error> { match self { diff --git a/appmgr/src/util.rs b/appmgr/src/util.rs index 1c7434723..a3883be69 100644 --- a/appmgr/src/util.rs +++ b/appmgr/src/util.rs @@ -16,7 +16,7 @@ use serde_json::Value; use sqlx::{Executor, Sqlite}; use tokio::fs::File; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use crate::{Error, ResultExt as _}; @@ -826,11 +826,17 @@ pub fn parse_duration(arg: &str, matches: &ArgMatches<'_>) -> Result(RwLock>); impl Container { - pub fn new() -> Self { - Container(RwLock::new(None)) + pub fn new(value: Option) -> Self { + Container(RwLock::new(value)) } - pub async fn set(&self, value: T) { - *self.0.write().await = Some(value); + pub async fn set(&self, value: T) -> Option { + std::mem::replace(&mut *self.0.write().await, Some(value)) + } + pub async fn take(&self) -> Option { + std::mem::replace(&mut *self.0.write().await, None) + } + pub async fn is_empty(&self) -> bool { + self.0.read().await.is_none() } pub async fn drop(&self) { *self.0.write().await = None;