fix bug that prevents rpc context from dropping without server shutdown

This commit is contained in:
Aiden McClelland
2022-02-17 14:50:53 -07:00
committed by Aiden McClelland
parent 1c32e846db
commit 075e68c254
3 changed files with 32 additions and 18 deletions

View File

@@ -12,6 +12,7 @@ use embassy::middleware::diagnostic::diagnostic;
use embassy::net::mdns::MdnsController;
use embassy::net::tor::tor_health_check;
use embassy::shutdown::Shutdown;
use embassy::system::launch_metrics_task;
use embassy::util::{daemon, Invoke};
use embassy::{static_server, Error, ErrorKind, ResultExt};
use futures::{FutureExt, TryFutureExt};
@@ -98,6 +99,14 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<Option<Shutdown>, Error> {
}
});
let metrics_ctx = rpc_ctx.clone();
let metrics_task = tokio::spawn(async move {
launch_metrics_task(&metrics_ctx.metrics_cache, || {
metrics_ctx.shutdown.subscribe()
})
.await
});
let rev_cache_ctx = rpc_ctx.clone();
let revision_cache_task = tokio::spawn(async move {
let mut sub = rev_cache_ctx.db.subscribe();
@@ -225,12 +234,18 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<Option<Shutdown>, Error> {
server
.map_err(|e| Error::new(e, ErrorKind::Network))
.map_ok(|_| tracing::debug!("RPC Server Shutdown")),
metrics_task
.map_err(|e| Error::new(
eyre!("{}", e).wrap_err("Metrics daemon panicked!"),
ErrorKind::Unknown
))
.map_ok(|_| tracing::debug!("Metrics daemon Shutdown")),
revision_cache_task
.map_err(|e| Error::new(
eyre!("{}", e).wrap_err("Revision Cache daemon panicked!"),
ErrorKind::Unknown
))
.map_ok(|_| tracing::debug!("Revision Cache Shutdown")),
.map_ok(|_| tracing::debug!("Revision Cache daemon Shutdown")),
ws_server
.map_err(|e| Error::new(e, ErrorKind::Network))
.map_ok(|_| tracing::debug!("WebSocket Server Shutdown")),
@@ -239,10 +254,10 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<Option<Shutdown>, Error> {
.map_ok(|_| tracing::debug!("Static File Server Shutdown")),
tor_health_daemon
.map_err(|e| Error::new(
e.wrap_err("Tor Health Daemon panicked!"),
e.wrap_err("Tor Health daemon panicked!"),
ErrorKind::Unknown
))
.map_ok(|_| tracing::debug!("Tor Health Daemon Shutdown")),
.map_ok(|_| tracing::debug!("Tor Health daemon Shutdown")),
)?;
let mut shutdown = shutdown_recv

View File

@@ -34,9 +34,7 @@ use crate::notifications::NotificationManager;
use crate::setup::password_hash;
use crate::shutdown::Shutdown;
use crate::status::{MainStatus, Status};
use crate::system::launch_metrics_task;
use crate::util::io::from_toml_async_reader;
use crate::util::logger::EmbassyLogger;
use crate::util::{AsyncFileExt, Invoke};
use crate::{Error, ResultExt};
@@ -191,13 +189,7 @@ impl RpcContext {
rpc_stream_continuations: Mutex::new(BTreeMap::new()),
wifi_manager: Arc::new(RwLock::new(WpaCli::init("wlan0".to_string()))),
});
let metrics_seed = seed.clone();
tokio::spawn(async move {
launch_metrics_task(&metrics_seed.metrics_cache, || {
metrics_seed.shutdown.subscribe()
})
.await
});
let res = Self(seed);
res.cleanup().await?;
tracing::info!("Cleaned up transient states");
@@ -243,12 +235,6 @@ impl RpcContext {
self.managers.empty().await?;
self.secret_store.close().await;
self.is_closed.store(true, Ordering::SeqCst);
if let Err(ctx) = Arc::try_unwrap(self.0) {
tracing::warn!(
"{} RPC Context(s) are still being held somewhere. This is likely a mistake.",
Arc::strong_count(&ctx) - 1
);
}
Ok(())
}
#[instrument(skip(self))]
@@ -350,3 +336,14 @@ impl Deref for RpcContext {
&*self.0
}
}
impl Drop for RpcContext {
fn drop(&mut self) {
#[cfg(feature = "unstable")]
if self.0.is_closed.load(Ordering::SeqCst) {
tracing::info!(
"RpcContext dropped. {} left.",
Arc::strong_count(&self.0) - 1
);
}
}
}

View File

@@ -338,6 +338,8 @@ pub async fn execute_inner(
tracing::error!("Error recovering drive!: {}", e);
tracing::debug!("{:?}", e);
*ctx.recovery_status.write().await = Some(Err(e.into()));
} else {
tracing::info!("Recovery Complete!");
}
});
(tor_addr, root_ca)