use std::collections::BTreeMap; use std::sync::Arc; use color_eyre::eyre::eyre; use patch_db::DbHandle; use sqlx::{Executor, Postgres}; use tokio::sync::RwLock; use tracing::instrument; use super::Manager; use crate::context::RpcContext; use crate::s9pk::manifest::{Manifest, PackageId}; use crate::util::Version; use crate::Error; #[derive(Default)] pub struct ManagerMap(RwLock>>); impl ManagerMap { #[instrument(skip_all)] pub async fn init( &self, ctx: &RpcContext, db: &mut Db, secrets: &mut Ex, ) -> Result<(), Error> where for<'a> &'a mut Ex: Executor<'a, Database = Postgres>, { let mut res = BTreeMap::new(); for package in crate::db::DatabaseModel::new() .package_data() .keys(db) .await? { let man: Manifest = if let Some(manifest) = crate::db::DatabaseModel::new() .package_data() .idx_model(&package) .and_then(|pkg| pkg.installed()) .map(|m| m.manifest()) .get(db) .await? .to_owned() { manifest } else { continue; }; res.insert( (package, man.version.clone()), Arc::new(Manager::new(ctx.clone(), man).await?), ); } *self.0.write().await = res; Ok(()) } #[instrument(skip_all)] pub async fn add(&self, ctx: RpcContext, manifest: Manifest) -> Result<(), Error> { let mut lock = self.0.write().await; let id = (manifest.id.clone(), manifest.version.clone()); if let Some(man) = lock.remove(&id) { man.exit().await; } lock.insert(id, Arc::new(Manager::new(ctx, manifest).await?)); Ok(()) } #[instrument(skip_all)] pub async fn remove(&self, id: &(PackageId, Version)) { if let Some(man) = self.0.write().await.remove(id) { man.exit().await; } } #[instrument(skip_all)] pub async fn empty(&self) -> Result<(), Error> { let res = futures::future::join_all(std::mem::take(&mut *self.0.write().await).into_iter().map( |((id, version), man)| async move { tracing::debug!("Manager for {}@{} shutting down", id, version); man.exit().await; tracing::debug!("Manager for {}@{} is shutdown", id, version); if let Err(e) = Arc::try_unwrap(man) { tracing::trace!( "Manager for {}@{} still has {} other open references", id, version, Arc::strong_count(&e) - 1 ); } Ok::<_, Error>(()) }, )) .await; res.into_iter().fold(Ok(()), |res, x| match (res, x) { (Ok(()), x) => x, (Err(e), Ok(())) => Err(e), (Err(e1), Err(e2)) => Err(Error::new(eyre!("{}, {}", e1.source, e2.source), e1.kind)), }) } #[instrument(skip_all)] pub async fn get(&self, id: &(PackageId, Version)) -> Option> { self.0.read().await.get(id).cloned() } }