Fixes/recover dying during starting (#1224)

* fix: Fix the panic becaues of the double install logging

Co-authored-by: Aiden McClelland <dr-bonez@users.noreply.github.com>

* fix: Clippy fix for docker

* chore: Add notification on top of error

Co-authored-by: Aiden McClelland <dr-bonez@users.noreply.github.com>
This commit is contained in:
J M
2022-02-16 16:38:59 -07:00
committed by GitHub
parent d204c1dfba
commit 4a69b1138d
4 changed files with 54 additions and 24 deletions

View File

@@ -152,7 +152,7 @@ impl DockerAction {
} else { } else {
EitherFuture::Left(futures::future::pending::<Result<_, Error>>()) EitherFuture::Left(futures::future::pending::<Result<_, Error>>())
}; };
if let (Some(input), Some(stdin)) = (&input_buf, &mut handle.stdin) { if let (Some(input), Some(mut stdin)) = (&input_buf, handle.stdin.take()) {
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
stdin stdin
.write_all(input) .write_all(input)

View File

@@ -17,7 +17,6 @@ use torut::onion::OnionAddressV3;
use tracing::instrument; use tracing::instrument;
use super::target::BackupTargetId; use super::target::BackupTargetId;
use crate::auth::check_password_against_db;
use crate::backup::backup_bulk::OsBackup; use crate::backup::backup_bulk::OsBackup;
use crate::context::{RpcContext, SetupContext}; use crate::context::{RpcContext, SetupContext};
use crate::db::model::{PackageDataEntry, StaticFiles}; use crate::db::model::{PackageDataEntry, StaticFiles};
@@ -34,10 +33,11 @@ use crate::util::display_none;
use crate::util::io::dir_size; use crate::util::io::dir_size;
use crate::util::serde::IoFormat; use crate::util::serde::IoFormat;
use crate::volume::{backup_dir, BACKUP_DIR, PKG_VOLUME_DIR}; use crate::volume::{backup_dir, BACKUP_DIR, PKG_VOLUME_DIR};
use crate::{auth::check_password_against_db, notifications::NotificationLevel};
use crate::{Error, ResultExt}; use crate::{Error, ResultExt};
fn parse_comma_separated(arg: &str, _: &ArgMatches<'_>) -> Result<Vec<PackageId>, Error> { fn parse_comma_separated(arg: &str, _: &ArgMatches<'_>) -> Result<Vec<PackageId>, Error> {
arg.split(",") arg.split(',')
.map(|s| s.trim().parse().map_err(Error::from)) .map(|s| s.trim().parse().map_err(Error::from))
.collect() .collect()
} }
@@ -124,18 +124,18 @@ impl ProgressInfo {
let mut total_bytes = 0; let mut total_bytes = 0;
let mut bytes_transferred = 0; let mut bytes_transferred = 0;
for (_, progress) in &self.package_installs { for progress in self.package_installs.values() {
total_bytes += ((progress.size.unwrap_or(0) as f64) * 2.2) as u64; total_bytes += ((progress.size.unwrap_or(0) as f64) * 2.2) as u64;
bytes_transferred += progress.downloaded.load(Ordering::SeqCst); bytes_transferred += progress.downloaded.load(Ordering::SeqCst);
bytes_transferred += ((progress.validated.load(Ordering::SeqCst) as f64) * 0.2) as u64; bytes_transferred += ((progress.validated.load(Ordering::SeqCst) as f64) * 0.2) as u64;
bytes_transferred += progress.unpacked.load(Ordering::SeqCst); bytes_transferred += progress.unpacked.load(Ordering::SeqCst);
} }
for (_, size) in &self.src_volume_size { for size in self.src_volume_size.values() {
total_bytes += *size; total_bytes += *size;
} }
for (_, size) in &self.target_volume_size { for size in self.target_volume_size.values() {
bytes_transferred += *size; bytes_transferred += *size;
} }
@@ -221,7 +221,39 @@ pub async fn recover_full_embassy(
.await?; .await?;
tokio::select! { tokio::select! {
res = futures::future::join_all(tasks) => res.into_iter().map(|res| res.with_kind(crate::ErrorKind::Unknown).and_then(|a|a)).collect::<Result<(), Error>>()?, res = futures::future::join_all(tasks) => {
for res in res {
match res.with_kind(crate::ErrorKind::Unknown) {
Ok((Ok(_), _)) => (),
Ok((Err(err), package_id)) => {
if let Err(err) = rpc_ctx.notification_manager.notify(
&mut db,
Some(package_id.clone()),
NotificationLevel::Error,
"Restoration Failure".to_string(), format!("Error restoring package {}: {}", package_id,err), (), None).await{
tracing::error!("Failed to notify: {}", err);
tracing::debug!("{:?}", err);
};
tracing::error!("Error restoring package {}: {}", package_id, err);
tracing::debug!("{:?}", err);
},
Err(e) => {
if let Err(err) = rpc_ctx.notification_manager.notify(
&mut db,
None,
NotificationLevel::Error,
"Restoration Failure".to_string(), format!("Error restoring ?: {}", e), (), None).await {
tracing::error!("Failed to notify: {}", err);
tracing::debug!("{:?}", err);
}
tracing::error!("Error restoring packages: {}", e);
tracing::debug!("{:?}", e);
},
}
}
},
_ = approximate_progress_loop(&ctx, &rpc_ctx, progress_info) => unreachable!(concat!(module_path!(), "::approximate_progress_loop should not terminate")), _ = approximate_progress_loop(&ctx, &rpc_ctx, progress_info) => unreachable!(concat!(module_path!(), "::approximate_progress_loop should not terminate")),
} }
@@ -240,12 +272,12 @@ async fn restore_packages(
( (
Option<Arc<Revision>>, Option<Arc<Revision>>,
BackupMountGuard<TmpMountGuard>, BackupMountGuard<TmpMountGuard>,
Vec<JoinHandle<Result<(), Error>>>, Vec<JoinHandle<(Result<(), Error>, PackageId)>>,
ProgressInfo, ProgressInfo,
), ),
Error, Error,
> { > {
let (revision, guards) = assure_restoring(&ctx, db, ids, &backup_guard).await?; let (revision, guards) = assure_restoring(ctx, db, ids, &backup_guard).await?;
let mut progress_info = ProgressInfo::default(); let mut progress_info = ProgressInfo::default();
@@ -258,15 +290,19 @@ async fn restore_packages(
.src_volume_size .src_volume_size
.insert(id.clone(), dir_size(backup_dir(&id)).await?); .insert(id.clone(), dir_size(backup_dir(&id)).await?);
progress_info.target_volume_size.insert(id.clone(), 0); progress_info.target_volume_size.insert(id.clone(), 0);
tasks.push(tokio::spawn(async move { let package_id = id.clone();
if let Err(e) = task.await { tasks.push(tokio::spawn(
tracing::error!("Error restoring package {}: {}", id, e); async move {
tracing::debug!("{:?}", e); if let Err(e) = task.await {
Err(e) tracing::error!("Error restoring package {}: {}", id, e);
} else { tracing::debug!("{:?}", e);
Ok(()) Err(e)
} else {
Ok(())
}
} }
})); .map(|x| (x, package_id)),
));
} }
Ok((revision, backup_guard, tasks, progress_info)) Ok((revision, backup_guard, tasks, progress_info))

View File

@@ -127,8 +127,6 @@ pub struct RpcContextSeed {
pub revision_cache: RwLock<VecDeque<Arc<Revision>>>, pub revision_cache: RwLock<VecDeque<Arc<Revision>>>,
pub metrics_cache: RwLock<Option<crate::system::Metrics>>, pub metrics_cache: RwLock<Option<crate::system::Metrics>>,
pub shutdown: broadcast::Sender<Option<Shutdown>>, pub shutdown: broadcast::Sender<Option<Shutdown>>,
pub websocket_count: AtomicUsize,
pub logger: EmbassyLogger,
pub tor_socks: SocketAddr, pub tor_socks: SocketAddr,
pub notification_manager: NotificationManager, pub notification_manager: NotificationManager,
pub open_authed_websockets: Mutex<BTreeMap<HashSessionToken, Vec<oneshot::Sender<()>>>>, pub open_authed_websockets: Mutex<BTreeMap<HashSessionToken, Vec<oneshot::Sender<()>>>>,
@@ -150,8 +148,6 @@ impl RpcContext {
Ipv4Addr::new(127, 0, 0, 1), Ipv4Addr::new(127, 0, 0, 1),
9050, 9050,
))); )));
let logger = EmbassyLogger::init();
tracing::info!("Set Logger");
let (shutdown, _) = tokio::sync::broadcast::channel(1); let (shutdown, _) = tokio::sync::broadcast::channel(1);
let secret_store = base.secret_store().await?; let secret_store = base.secret_store().await?;
tracing::info!("Opened Sqlite DB"); tracing::info!("Opened Sqlite DB");
@@ -189,8 +185,6 @@ impl RpcContext {
revision_cache: RwLock::new(VecDeque::new()), revision_cache: RwLock::new(VecDeque::new()),
metrics_cache, metrics_cache,
shutdown, shutdown,
websocket_count: AtomicUsize::new(0),
logger,
tor_socks: tor_proxy, tor_socks: tor_proxy,
notification_manager, notification_manager,
open_authed_websockets: Mutex::new(BTreeMap::new()), open_authed_websockets: Mutex::new(BTreeMap::new()),

View File

@@ -20,7 +20,7 @@ impl EmbassyLogger {
} }
pub fn init() -> Self { pub fn init() -> Self {
Self::base_subscriber().init(); Self::base_subscriber().init();
color_eyre::install().expect("Color Eyre Init"); color_eyre::install().unwrap_or_else(|_| tracing::warn!("tracing too many times"));
EmbassyLogger {} EmbassyLogger {}
} }