diff --git a/backend/src/bin/embassyd.rs b/backend/src/bin/embassyd.rs index 1dfb802bb..6aed68cde 100644 --- a/backend/src/bin/embassyd.rs +++ b/backend/src/bin/embassyd.rs @@ -12,6 +12,7 @@ use embassy::middleware::diagnostic::diagnostic; use embassy::net::mdns::MdnsController; use embassy::net::tor::tor_health_check; use embassy::shutdown::Shutdown; +use embassy::system::launch_metrics_task; use embassy::util::{daemon, Invoke}; use embassy::{static_server, Error, ErrorKind, ResultExt}; use futures::{FutureExt, TryFutureExt}; @@ -98,6 +99,14 @@ async fn inner_main(cfg_path: Option<&str>) -> Result, Error> { } }); + let metrics_ctx = rpc_ctx.clone(); + let metrics_task = tokio::spawn(async move { + launch_metrics_task(&metrics_ctx.metrics_cache, || { + metrics_ctx.shutdown.subscribe() + }) + .await + }); + let rev_cache_ctx = rpc_ctx.clone(); let revision_cache_task = tokio::spawn(async move { let mut sub = rev_cache_ctx.db.subscribe(); @@ -225,12 +234,18 @@ async fn inner_main(cfg_path: Option<&str>) -> Result, Error> { server .map_err(|e| Error::new(e, ErrorKind::Network)) .map_ok(|_| tracing::debug!("RPC Server Shutdown")), + metrics_task + .map_err(|e| Error::new( + eyre!("{}", e).wrap_err("Metrics daemon panicked!"), + ErrorKind::Unknown + )) + .map_ok(|_| tracing::debug!("Metrics daemon Shutdown")), revision_cache_task .map_err(|e| Error::new( eyre!("{}", e).wrap_err("Revision Cache daemon panicked!"), ErrorKind::Unknown )) - .map_ok(|_| tracing::debug!("Revision Cache Shutdown")), + .map_ok(|_| tracing::debug!("Revision Cache daemon Shutdown")), ws_server .map_err(|e| Error::new(e, ErrorKind::Network)) .map_ok(|_| tracing::debug!("WebSocket Server Shutdown")), @@ -239,10 +254,10 @@ async fn inner_main(cfg_path: Option<&str>) -> Result, Error> { .map_ok(|_| tracing::debug!("Static File Server Shutdown")), tor_health_daemon .map_err(|e| Error::new( - e.wrap_err("Tor Health Daemon panicked!"), + e.wrap_err("Tor Health daemon panicked!"), ErrorKind::Unknown )) - .map_ok(|_| tracing::debug!("Tor Health Daemon Shutdown")), + .map_ok(|_| tracing::debug!("Tor Health daemon Shutdown")), )?; let mut shutdown = shutdown_recv diff --git a/backend/src/context/rpc.rs b/backend/src/context/rpc.rs index 0f39b6307..ddb88d937 100644 --- a/backend/src/context/rpc.rs +++ b/backend/src/context/rpc.rs @@ -34,9 +34,7 @@ use crate::notifications::NotificationManager; use crate::setup::password_hash; use crate::shutdown::Shutdown; use crate::status::{MainStatus, Status}; -use crate::system::launch_metrics_task; use crate::util::io::from_toml_async_reader; -use crate::util::logger::EmbassyLogger; use crate::util::{AsyncFileExt, Invoke}; use crate::{Error, ResultExt}; @@ -191,13 +189,7 @@ impl RpcContext { rpc_stream_continuations: Mutex::new(BTreeMap::new()), wifi_manager: Arc::new(RwLock::new(WpaCli::init("wlan0".to_string()))), }); - let metrics_seed = seed.clone(); - tokio::spawn(async move { - launch_metrics_task(&metrics_seed.metrics_cache, || { - metrics_seed.shutdown.subscribe() - }) - .await - }); + let res = Self(seed); res.cleanup().await?; tracing::info!("Cleaned up transient states"); @@ -243,12 +235,6 @@ impl RpcContext { self.managers.empty().await?; self.secret_store.close().await; self.is_closed.store(true, Ordering::SeqCst); - if let Err(ctx) = Arc::try_unwrap(self.0) { - tracing::warn!( - "{} RPC Context(s) are still being held somewhere. This is likely a mistake.", - Arc::strong_count(&ctx) - 1 - ); - } Ok(()) } #[instrument(skip(self))] @@ -350,3 +336,14 @@ impl Deref for RpcContext { &*self.0 } } +impl Drop for RpcContext { + fn drop(&mut self) { + #[cfg(feature = "unstable")] + if self.0.is_closed.load(Ordering::SeqCst) { + tracing::info!( + "RpcContext dropped. {} left.", + Arc::strong_count(&self.0) - 1 + ); + } + } +} diff --git a/backend/src/setup.rs b/backend/src/setup.rs index 7a4ec6906..1846d3c89 100644 --- a/backend/src/setup.rs +++ b/backend/src/setup.rs @@ -338,6 +338,8 @@ pub async fn execute_inner( tracing::error!("Error recovering drive!: {}", e); tracing::debug!("{:?}", e); *ctx.recovery_status.write().await = Some(Err(e.into())); + } else { + tracing::info!("Recovery Complete!"); } }); (tor_addr, root_ca)