From d4297b16d23aa7a1c97f8230ecc20989b42513b5 Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Mon, 8 Nov 2021 14:14:47 -0700 Subject: [PATCH] Feature/full embassy recovery (#775) * kinda working * recovery working * Update appmgr/src/manager/mod.rs Co-authored-by: Keagan McClelland --- appmgr/sqlx-data.json | 40 +-- appmgr/src/backup/backup_bulk.rs | 35 ++- appmgr/src/backup/mod.rs | 6 +- appmgr/src/backup/restore.rs | 299 ++++++++++++++---- appmgr/src/bin/embassy-init.rs | 105 +------ appmgr/src/bin/embassyd.rs | 397 ++++++++++++------------ appmgr/src/context/rpc.rs | 30 +- appmgr/src/context/setup.rs | 4 +- appmgr/src/disk/main.rs | 8 + appmgr/src/disk/util.rs | 1 + appmgr/src/init.rs | 88 ++++++ appmgr/src/install/mod.rs | 54 ++-- appmgr/src/lib.rs | 2 +- appmgr/src/manager/mod.rs | 8 + appmgr/src/manager/sync.rs | 1 + appmgr/src/middleware/auth.rs | 6 +- appmgr/src/setup.rs | 220 +++++++------ appmgr/src/shutdown.rs | 63 ++-- appmgr/src/sound.rs | 8 + appmgr/src/update/latest_information.rs | 3 +- appmgr/src/util/io.rs | 25 ++ appmgr/src/util/logger.rs | 50 +-- 22 files changed, 899 insertions(+), 554 deletions(-) create mode 100644 appmgr/src/init.rs diff --git a/appmgr/sqlx-data.json b/appmgr/sqlx-data.json index 2c4d42912..e606f580a 100644 --- a/appmgr/sqlx-data.json +++ b/appmgr/sqlx-data.json @@ -30,16 +30,6 @@ "nullable": [] } }, - "165daa7d6a60cb42122373b2c5ac7d39399bcc99992f0002ee7bfef50a8daceb": { - "query": "DELETE FROM certificates WHERE id = 0 OR id = 1;", - "describe": { - "columns": [], - "parameters": { - "Right": 0 - }, - "nullable": [] - } - }, "177c4b9cc7901a3b906e5969b86b1c11e6acbfb8e86e98f197d7333030b17964": { "query": "DELETE FROM notifications WHERE id = ?", "describe": { @@ -50,6 +40,16 @@ "nullable": [] } }, + "1eee1fdc793919c391008854407143d7a11b4668486c11a760b49af49992f9f8": { + "query": "REPLACE INTO tor (package, interface, key) VALUES (?, 'main', ?)", + "describe": { + "columns": [], + "parameters": { + "Right": 2 + }, + "nullable": [] + } + }, "3502e58f2ab48fb4566d21c920c096f81acfa3ff0d02f970626a4dcd67bac71d": { "query": "SELECT tor_key FROM account", "describe": { @@ -296,16 +296,6 @@ ] } }, - "70a100abc8ca04ffc559ca16460d4fd4c65aa34952ade2681491f0b44dc1aaa1": { - "query": "INSERT OR REPLACE INTO account (id, password, tor_key) VALUES (?, ?, ?)", - "describe": { - "columns": [], - "parameters": { - "Right": 3 - }, - "nullable": [] - } - }, "7d548d2472fa3707bd17364b4800e229b9c2b1c0a22e245bf4e635b9b16b8c24": { "query": "INSERT INTO certificates (priv_key_pem, certificate_pem, lookup_string, created_at, updated_at) VALUES (?, ?, ?, datetime('now'), datetime('now'))", "describe": { @@ -364,6 +354,16 @@ ] } }, + "9fcedab1ba34daa2c6ae97c5953c09821b35b55be75b0c66045ab31a2cf4553e": { + "query": "REPLACE INTO account (id, password, tor_key) VALUES (?, ?, ?)", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + } + }, "a1cbaac36d8e14c8c3e7276237c4824bff18861f91b0b08aa5791704c492acb7": { "query": "INSERT INTO certificates (id, priv_key_pem, certificate_pem, lookup_string, created_at, updated_at) VALUES (1, ?, ?, NULL, datetime('now'), datetime('now'))", "describe": { diff --git a/appmgr/src/backup/backup_bulk.rs b/appmgr/src/backup/backup_bulk.rs index a97a175dd..ed3acc243 100644 --- a/appmgr/src/backup/backup_bulk.rs +++ b/appmgr/src/backup/backup_bulk.rs @@ -41,15 +41,29 @@ impl<'de> Deserialize<'de> for OsBackup { D: serde::Deserializer<'de>, { #[derive(Deserialize)] + #[serde(rename = "kebab-case")] struct OsBackupDe { - tor_key: TorSecretKeyV3, + tor_key: String, root_ca_key: String, root_ca_cert: String, ui: Value, } let int = OsBackupDe::deserialize(deserializer)?; + let key_vec = base32::decode(base32::Alphabet::RFC4648 { padding: true }, &int.tor_key) + .ok_or(serde::de::Error::invalid_value( + serde::de::Unexpected::Str(&int.tor_key), + &"an RFC4648 encoded string", + ))?; + if key_vec.len() != 64 { + return Err(serde::de::Error::invalid_value( + serde::de::Unexpected::Str(&int.tor_key), + &"a 64 byte value encoded as an RFC4648 string", + )); + } + let mut key_slice = [0; 64]; + key_slice.clone_from_slice(&key_vec); Ok(OsBackup { - tor_key: int.tor_key, + tor_key: TorSecretKeyV3::from(key_slice), root_ca_key: PKey::::private_key_from_pem(int.root_ca_key.as_bytes()) .map_err(serde::de::Error::custom)?, root_ca_cert: X509::from_pem(int.root_ca_cert.as_bytes()) @@ -64,14 +78,18 @@ impl Serialize for OsBackup { S: serde::Serializer, { #[derive(Serialize)] + #[serde(rename = "kebab-case")] struct OsBackupSer<'a> { - tor_key: &'a TorSecretKeyV3, + tor_key: String, root_ca_key: String, root_ca_cert: String, ui: &'a Value, } OsBackupSer { - tor_key: &self.tor_key, + tor_key: base32::encode( + base32::Alphabet::RFC4648 { padding: true }, + &self.tor_key.as_bytes(), + ), root_ca_key: String::from_utf8( self.root_ca_key .private_key_to_pem_pkcs8() @@ -224,7 +242,9 @@ async fn perform_backup( continue; }; let main_status_model = installed_model.clone().status().main(); - let (started, health) = match main_status_model.get(&mut db, true).await?.into_owned() { + + let mut tx = db.begin().await?; // for lock scope + let (started, health) = match main_status_model.get(&mut tx, true).await?.into_owned() { MainStatus::Running { started, health } => (Some(started.clone()), health.clone()), MainStatus::Stopped | MainStatus::Stopping => (None, Default::default()), MainStatus::BackingUp { .. } => { @@ -239,7 +259,6 @@ async fn perform_backup( continue; } }; - let mut tx = db.begin().await?; // for lock scope main_status_model .put( &mut tx, @@ -251,7 +270,6 @@ async fn perform_backup( .await?; tx.save().await?; // drop locks - installed_model.lock(&mut db, LockType::Write).await; let manifest = installed_model .clone() .manifest() @@ -266,6 +284,9 @@ async fn perform_backup( })? .synchronize() .await; + + installed_model.lock(&mut db, LockType::Write).await; + let guard = backup_guard.mount_package_backup(&package_id).await?; let res = manifest .backup diff --git a/appmgr/src/backup/mod.rs b/appmgr/src/backup/mod.rs index 78e295218..076051d01 100644 --- a/appmgr/src/backup/mod.rs +++ b/appmgr/src/backup/mod.rs @@ -22,8 +22,8 @@ use crate::version::{Current, VersionT}; use crate::volume::{backup_dir, Volume, VolumeId, Volumes, BACKUP_DIR}; use crate::{Error, ResultExt}; -mod backup_bulk; -mod restore; +pub mod backup_bulk; +pub mod restore; #[derive(Debug, Deserialize, Serialize)] pub struct BackupReport { @@ -47,7 +47,7 @@ pub fn backup() -> Result<(), Error> { Ok(()) } -#[command(rename = "backup", subcommands(restore::restore_packages))] +#[command(rename = "backup", subcommands(restore::restore_packages_rpc))] pub fn package_backup() -> Result<(), Error> { Ok(()) } diff --git a/appmgr/src/backup/restore.rs b/appmgr/src/backup/restore.rs index ec9024eff..ddd1aa41d 100644 --- a/appmgr/src/backup/restore.rs +++ b/appmgr/src/backup/restore.rs @@ -1,25 +1,37 @@ +use std::collections::BTreeMap; use std::path::{Path, PathBuf}; +use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Duration; use clap::ArgMatches; use color_eyre::eyre::eyre; +use futures::future::BoxFuture; +use futures::FutureExt; +use openssl::x509::X509; use patch_db::{DbHandle, PatchDbHandle, Revision}; use rpc_toolkit::command; use tokio::fs::File; +use tokio::task::JoinHandle; +use torut::onion::OnionAddressV3; use tracing::instrument; use crate::auth::check_password_against_db; -use crate::context::RpcContext; +use crate::backup::backup_bulk::OsBackup; +use crate::context::{RpcContext, SetupContext}; use crate::db::model::{PackageDataEntry, StaticFiles}; use crate::db::util::WithRevision; -use crate::disk::util::{BackupMountGuard, PackageBackupMountGuard, TmpMountGuard}; -use crate::install::progress::{InstallProgress, InstallProgressTracker}; -use crate::install::{install_s9pk_or_cleanup, PKG_PUBLIC_DIR}; -use crate::s9pk::manifest::PackageId; +use crate::disk::util::{BackupMountGuard, PackageBackupMountGuard, PartitionInfo, TmpMountGuard}; +use crate::install::progress::InstallProgress; +use crate::install::{download_install_s9pk, PKG_PUBLIC_DIR}; +use crate::net::ssl::SslManager; +use crate::s9pk::manifest::{Manifest, PackageId}; use crate::s9pk::reader::S9pkReader; -use crate::util::{display_none, Version}; -use crate::volume::BACKUP_DIR; -use crate::Error; +use crate::setup::RecoveryStatus; +use crate::util::io::dir_size; +use crate::util::{display_none, IoFormat}; +use crate::volume::{backup_dir, BACKUP_DIR, PKG_VOLUME_DIR}; +use crate::{Error, ResultExt}; fn parse_comma_separated(arg: &str, _: &ArgMatches<'_>) -> Result, Error> { arg.split(",") @@ -29,7 +41,7 @@ fn parse_comma_separated(arg: &str, _: &ArgMatches<'_>) -> Result #[command(rename = "restore", display(display_none))] #[instrument(skip(ctx, old_password, password))] -pub async fn restore_packages( +pub async fn restore_packages_rpc( #[context] ctx: RpcContext, #[arg(parse(parse_comma_separated))] ids: Vec, #[arg] logicalname: PathBuf, @@ -46,18 +58,9 @@ pub async fn restore_packages( if old_password.is_some() { backup_guard.change_password(&password)?; } - let (revision, guards) = assure_restoring(&ctx, &mut db, ids, &backup_guard).await?; - let mut tasks = Vec::with_capacity(guards.len()); - for (id, version, progress, guard) in guards { - let ctx = ctx.clone(); - tasks.push(tokio::spawn(async move { - if let Err(e) = restore_package(&ctx, &id, &version, progress, guard).await { - tracing::error!("Error restoring package {}: {}", id, e); - tracing::debug!("{:?}", e); - } - })); - } + let (revision, backup_guard, tasks, _) = + restore_packages(&ctx, &mut db, backup_guard, ids).await?; tokio::spawn(async { futures::future::join_all(tasks).await; @@ -73,6 +76,195 @@ pub async fn restore_packages( }) } +async fn approximate_progress( + rpc_ctx: &RpcContext, + progress: &mut ProgressInfo, +) -> Result<(), Error> { + for (id, size) in &mut progress.target_volume_size { + let dir = rpc_ctx.datadir.join(PKG_VOLUME_DIR).join(id).join("data"); + if tokio::fs::metadata(&dir).await.is_err() { + *size = 0; + } else { + *size = dir_size(&dir).await?; + } + } + Ok(()) +} + +async fn approximate_progress_loop( + ctx: &SetupContext, + rpc_ctx: &RpcContext, + mut starting_info: ProgressInfo, +) { + loop { + if let Err(e) = approximate_progress(rpc_ctx, &mut starting_info).await { + tracing::error!("Failed to approximate restore progress: {}", e); + tracing::debug!("{:?}", e); + } else { + *ctx.recovery_status.write().await = Some(Ok(starting_info.flatten())); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } +} + +#[derive(Debug, Default)] +struct ProgressInfo { + package_installs: BTreeMap>, + src_volume_size: BTreeMap, + target_volume_size: BTreeMap, +} +impl ProgressInfo { + fn flatten(&self) -> RecoveryStatus { + let mut total_bytes = 0; + let mut bytes_transferred = 0; + + for (_, progress) in &self.package_installs { + total_bytes += ((progress.size.unwrap_or(0) as f64) * 2.2) as u64; + bytes_transferred += progress.downloaded.load(Ordering::SeqCst); + bytes_transferred += ((progress.validated.load(Ordering::SeqCst) as f64) * 0.2) as u64; + bytes_transferred += progress.unpacked.load(Ordering::SeqCst); + } + + for (_, size) in &self.src_volume_size { + total_bytes += *size; + } + + for (_, size) in &self.target_volume_size { + bytes_transferred += *size; + } + + if bytes_transferred > total_bytes { + bytes_transferred = total_bytes; + } + + RecoveryStatus { + total_bytes, + bytes_transferred, + } + } +} + +#[instrument(skip(ctx))] +pub async fn recover_full_embassy( + ctx: SetupContext, + disk_guid: Arc, + embassy_password: String, + recovery_partition: PartitionInfo, + recovery_password: Option, +) -> Result<(OnionAddressV3, X509, BoxFuture<'static, Result<(), Error>>), Error> { + let backup_guard = BackupMountGuard::mount( + TmpMountGuard::mount(&recovery_partition.logicalname, None).await?, + recovery_password.as_deref().unwrap_or_default(), + ) + .await?; + + let os_backup_path = backup_guard.as_ref().join("os-backup.cbor"); + let os_backup: OsBackup = + IoFormat::Cbor.from_slice(&tokio::fs::read(&os_backup_path).await.with_ctx(|_| { + ( + crate::ErrorKind::Filesystem, + os_backup_path.display().to_string(), + ) + })?)?; + + let password = argon2::hash_encoded( + embassy_password.as_bytes(), + &rand::random::<[u8; 16]>()[..], + &argon2::Config::default(), + ) + .with_kind(crate::ErrorKind::PasswordHashGeneration)?; + let key_vec = os_backup.tor_key.as_bytes().to_vec(); + let secret_store = ctx.secret_store().await?; + sqlx::query!( + "REPLACE INTO account (id, password, tor_key) VALUES (?, ?, ?)", + 0, + password, + key_vec, + ) + .execute(&mut secret_store.acquire().await?) + .await?; + + SslManager::import_root_ca( + secret_store.clone(), + os_backup.root_ca_key, + os_backup.root_ca_cert.clone(), + ) + .await?; + secret_store.close().await; + + Ok(( + os_backup.tor_key.public().get_onion_address(), + os_backup.root_ca_cert, + async move { + let rpc_ctx = RpcContext::init(ctx.config_path.as_ref(), disk_guid).await?; + let mut db = rpc_ctx.db.handle(); + + let ids = backup_guard + .metadata + .package_backups + .keys() + .cloned() + .collect(); + let (_, backup_guard, tasks, progress_info) = restore_packages( + &rpc_ctx, + &mut db, + backup_guard, + ids, + ) + .await?; + + tokio::select! { + res = futures::future::join_all(tasks) => res.into_iter().map(|res| res.with_kind(crate::ErrorKind::Unknown).and_then(|a|a)).collect::>()?, + _ = approximate_progress_loop(&ctx, &rpc_ctx, progress_info) => unreachable!(concat!(module_path!(), "::approximate_progress_loop should not terminate")), + } + + backup_guard.unmount().await?; + rpc_ctx.shutdown().await + }.boxed() + )) +} + +async fn restore_packages( + ctx: &RpcContext, + db: &mut PatchDbHandle, + backup_guard: BackupMountGuard, + ids: Vec, +) -> Result< + ( + Option>, + BackupMountGuard, + Vec>>, + ProgressInfo, + ), + Error, +> { + let (revision, guards) = assure_restoring(&ctx, db, ids, &backup_guard).await?; + + let mut progress_info = ProgressInfo::default(); + + let mut tasks = Vec::with_capacity(guards.len()); + for (manifest, guard) in guards { + let id = manifest.id.clone(); + let (progress, task) = restore_package(ctx.clone(), manifest, guard).await?; + progress_info.package_installs.insert(id.clone(), progress); + progress_info + .src_volume_size + .insert(id.clone(), dir_size(backup_dir(&id)).await?); + progress_info.target_volume_size.insert(id.clone(), 0); + tasks.push(tokio::spawn(async move { + if let Err(e) = task.await { + tracing::error!("Error restoring package {}: {}", id, e); + tracing::debug!("{:?}", e); + Err(e) + } else { + Ok(()) + } + })); + } + + Ok((revision, backup_guard, tasks, progress_info)) +} + #[instrument(skip(ctx, db, backup_guard))] async fn assure_restoring( ctx: &RpcContext, @@ -82,12 +274,7 @@ async fn assure_restoring( ) -> Result< ( Option>, - Vec<( - PackageId, - Version, - Arc, - PackageBackupMountGuard, - )>, + Vec<(Manifest, PackageBackupMountGuard)>, ), Error, > { @@ -116,9 +303,6 @@ async fn assure_restoring( let manifest = rdr.manifest().await?; let version = manifest.version.clone(); let progress = InstallProgress::new(Some(tokio::fs::metadata(&s9pk_path).await?.len())); - progress - .download_complete - .store(true, std::sync::atomic::Ordering::SeqCst); let public_dir_path = ctx .datadir @@ -146,41 +330,52 @@ async fn assure_restoring( *model = Some(PackageDataEntry::Restoring { install_progress: progress.clone(), static_files: StaticFiles::local(&id, &version, manifest.assets.icon_type()), - manifest, + manifest: manifest.clone(), }); model.save(&mut tx).await?; - guards.push((id, version, progress, guard)); + guards.push((manifest, guard)); } Ok((tx.commit(None).await?, guards)) } #[instrument(skip(ctx, guard))] -async fn restore_package( - ctx: &RpcContext, - id: &PackageId, - version: &Version, - progress: Arc, +async fn restore_package<'a>( + ctx: RpcContext, + manifest: Manifest, guard: PackageBackupMountGuard, -) -> Result<(), Error> { - let s9pk_path = Path::new(BACKUP_DIR).join(id).join(format!("{}.s9pk", id)); - let progress_reader = - InstallProgressTracker::new(File::open(&s9pk_path).await?, progress.clone()); - let mut s9pk_reader = progress - .track_read_during( - crate::db::DatabaseModel::new() - .package_data() - .idx_model(id) - .and_then(|pde| pde.install_progress()), - &ctx.db, - || S9pkReader::from_reader(progress_reader, true), +) -> Result<(Arc, BoxFuture<'static, Result<(), Error>>), Error> { + let s9pk_path = Path::new(BACKUP_DIR) + .join(&manifest.id) + .join(format!("{}.s9pk", manifest.id)); + let len = tokio::fs::metadata(&s9pk_path) + .await + .with_ctx(|_| { + ( + crate::ErrorKind::Filesystem, + s9pk_path.display().to_string(), + ) + })? + .len(); + let file = File::open(&s9pk_path).await.with_ctx(|_| { + ( + crate::ErrorKind::Filesystem, + s9pk_path.display().to_string(), ) - .await?; + })?; - install_s9pk_or_cleanup(ctx, id, version, &mut s9pk_reader, progress).await?; + let progress = InstallProgress::new(Some(len)); - guard.unmount().await?; + Ok(( + progress.clone(), + async move { + download_install_s9pk(&ctx, &manifest, progress, file).await?; - Ok(()) + guard.unmount().await?; + + Ok(()) + } + .boxed(), + )) } diff --git a/appmgr/src/bin/embassy-init.rs b/appmgr/src/bin/embassy-init.rs index 41bf279e4..c05c2f240 100644 --- a/appmgr/src/bin/embassy-init.rs +++ b/appmgr/src/bin/embassy-init.rs @@ -3,9 +3,7 @@ use std::sync::Arc; use embassy::context::rpc::RpcContextConfig; use embassy::context::{DiagnosticContext, SetupContext}; -use embassy::db::model::ServerStatus; use embassy::disk::main::DEFAULT_PASSWORD; -use embassy::install::PKG_DOCKER_DIR; use embassy::middleware::cors::cors; use embassy::middleware::diagnostic::diagnostic; use embassy::middleware::encrypt::encrypt; @@ -26,8 +24,7 @@ fn status_fn(_: i32) -> StatusCode { } #[instrument] -async fn init(cfg_path: Option<&str>) -> Result<(), Error> { - let cfg = RpcContextConfig::load(cfg_path).await?; +async fn setup_or_init(cfg_path: Option<&str>) -> Result<(), Error> { embassy::disk::util::mount("LABEL=EMBASSY", "/embassy-os").await?; if tokio::fs::metadata("/embassy-os/disk.guid").await.is_err() { #[cfg(feature = "avahi")] @@ -73,102 +70,20 @@ async fn init(cfg_path: Option<&str>) -> Result<(), Error> { }) .await .with_kind(embassy::ErrorKind::Network)?; - drop(ctx); - embassy::disk::main::export( + } else { + let cfg = RpcContextConfig::load(cfg_path).await?; + embassy::disk::main::import( tokio::fs::read_to_string("/embassy-os/disk.guid") // unique identifier for volume group - keeps track of the disk that goes with your embassy .await? .trim(), cfg.datadir(), + DEFAULT_PASSWORD, ) .await?; + tracing::info!("Loaded Disk"); + embassy::init::init(&cfg).await?; } - embassy::disk::main::import( - tokio::fs::read_to_string("/embassy-os/disk.guid") // unique identifier for volume group - keeps track of the disk that goes with your embassy - .await? - .trim(), - cfg.datadir(), - DEFAULT_PASSWORD, - ) - .await?; - tracing::info!("Loaded Disk"); - let secret_store = cfg.secret_store().await?; - let log_dir = cfg.datadir().join("main").join("logs"); - if tokio::fs::metadata(&log_dir).await.is_err() { - tokio::fs::create_dir_all(&log_dir).await?; - } - embassy::disk::util::bind(&log_dir, "/var/log/journal", false).await?; - Command::new("systemctl") - .arg("restart") - .arg("systemd-journald") - .invoke(embassy::ErrorKind::Journald) - .await?; - tracing::info!("Mounted Logs"); - let tmp_dir = cfg.datadir().join("package-data/tmp"); - if tokio::fs::metadata(&tmp_dir).await.is_err() { - tokio::fs::create_dir_all(&tmp_dir).await?; - } - let tmp_docker = cfg.datadir().join("package-data/tmp/docker"); - if tokio::fs::metadata(&tmp_docker).await.is_ok() { - tokio::fs::remove_dir_all(&tmp_docker).await?; - } - Command::new("cp") - .arg("-r") - .arg("/var/lib/docker") - .arg(&tmp_docker) - .invoke(embassy::ErrorKind::Filesystem) - .await?; - Command::new("systemctl") - .arg("stop") - .arg("docker") - .invoke(embassy::ErrorKind::Docker) - .await?; - embassy::disk::util::bind(&tmp_docker, "/var/lib/docker", false).await?; - Command::new("systemctl") - .arg("reset-failed") - .arg("docker") - .invoke(embassy::ErrorKind::Docker) - .await?; - Command::new("systemctl") - .arg("start") - .arg("docker") - .invoke(embassy::ErrorKind::Docker) - .await?; - tracing::info!("Mounted Docker Data"); - - embassy::install::load_images(cfg.datadir().join(PKG_DOCKER_DIR)).await?; - tracing::info!("Loaded Docker Images"); - // Loading system images - embassy::install::load_images("/var/lib/embassy/system-images").await?; - tracing::info!("Loaded System Docker Images"); - - embassy::ssh::sync_keys_from_db(&secret_store, "/root/.ssh/authorized_keys").await?; - tracing::info!("Synced SSH Keys"); - - embassy::hostname::sync_hostname().await?; - tracing::info!("Synced Hostname"); - embassy::net::wifi::synchronize_wpa_supplicant_conf(&cfg.datadir().join("main")).await?; - tracing::info!("Synchronized wpa_supplicant.conf"); - - let db = cfg.db(&secret_store).await?; - let mut handle = db.handle(); - let mut info = embassy::db::DatabaseModel::new() - .server_info() - .get_mut(&mut handle) - .await?; - match info.status { - ServerStatus::Running | ServerStatus::Updated | ServerStatus::BackingUp => { - info.status = ServerStatus::Running; - } - ServerStatus::Updating => { - info.update_progress = None; - info.status = ServerStatus::Running; - } - } - info.save(&mut handle).await?; - - embassy::version::init(&mut handle).await?; - Ok(()) } @@ -196,8 +111,8 @@ async fn inner_main(cfg_path: Option<&str>) -> Result, Error> { run_script_if_exists("/embassy-os/preinit.sh").await; - let res = if let Err(e) = init(cfg_path).await { - (|| async { + let res = if let Err(e) = setup_or_init(cfg_path).await { + async { tracing::error!("{}", e.source); tracing::debug!("{}", e.source); embassy::sound::BEETHOVEN.play().await?; @@ -259,7 +174,7 @@ async fn inner_main(cfg_path: Option<&str>) -> Result, Error> { .await .with_kind(embassy::ErrorKind::Network)?, ) - })() + } .await } else { Ok(None) diff --git a/appmgr/src/bin/embassyd.rs b/appmgr/src/bin/embassyd.rs index dcd54454e..55a8aa70a 100644 --- a/appmgr/src/bin/embassyd.rs +++ b/appmgr/src/bin/embassyd.rs @@ -36,212 +36,217 @@ fn err_to_500(e: Error) -> Response { #[instrument] async fn inner_main(cfg_path: Option<&str>) -> Result, Error> { - let rpc_ctx = RpcContext::init( - cfg_path, - Arc::new( - tokio::fs::read_to_string("/embassy-os/disk.guid") // unique identifier for volume group - keeps track of the disk that goes with your embassy - .await? - .trim() - .to_owned(), - ), - ) - .await?; - let mut shutdown_recv = rpc_ctx.shutdown.subscribe(); - - let sig_handler_ctx = rpc_ctx.clone(); - let sig_handler = tokio::spawn(async move { - use tokio::signal::unix::SignalKind; - futures::future::select_all( - [ - SignalKind::interrupt(), - SignalKind::quit(), - SignalKind::terminate(), - ] - .iter() - .map(|s| { - async move { - signal(*s) - .expect(&format!("register {:?} handler", s)) - .recv() - .await - } - .boxed() - }), + let (rpc_ctx, shutdown) = { + let rpc_ctx = RpcContext::init( + cfg_path, + Arc::new( + tokio::fs::read_to_string("/embassy-os/disk.guid") // unique identifier for volume group - keeps track of the disk that goes with your embassy + .await? + .trim() + .to_owned(), + ), ) - .await; - sig_handler_ctx - .shutdown - .send(None) - .map_err(|_| ()) - .expect("send shutdown signal"); - }); - - tokio::fs::write("/etc/nginx/sites-available/default", { - let info = embassy::db::DatabaseModel::new() - .server_info() - .get(&mut rpc_ctx.db.handle(), true) - .await?; - format!( - include_str!("../nginx/main-ui.conf.template"), - lan_hostname = info.lan_address.host_str().unwrap(), - tor_hostname = info.tor_address.host_str().unwrap() - ) - }) - .await - .with_ctx(|_| { - ( - embassy::ErrorKind::Filesystem, - "/etc/nginx/sites-available/default", - ) - })?; - Command::new("systemctl") - .arg("reload") - .arg("nginx") - .invoke(embassy::ErrorKind::Nginx) .await?; + let mut shutdown_recv = rpc_ctx.shutdown.subscribe(); - let auth = auth(rpc_ctx.clone()); - let ctx = rpc_ctx.clone(); - let server = rpc_server!({ - command: embassy::main_api, - context: ctx, - status: status_fn, - middleware: [ - cors, - auth, - ] - }) - .with_graceful_shutdown({ - let mut shutdown = rpc_ctx.shutdown.subscribe(); - async move { - shutdown.recv().await.expect("context dropped"); - } - }); - - let rev_cache_ctx = rpc_ctx.clone(); - let revision_cache_task = tokio::spawn(async move { - let mut sub = rev_cache_ctx.db.subscribe(); - let mut shutdown = rev_cache_ctx.shutdown.subscribe(); - loop { - let rev = match tokio::select! { - a = sub.recv() => a, - _ = shutdown.recv() => break, - } { - Ok(a) => a, - Err(_) => { - rev_cache_ctx.revision_cache.write().await.truncate(0); - continue; - } - }; // TODO: handle falling behind - let mut cache = rev_cache_ctx.revision_cache.write().await; - cache.push_back(rev); - if cache.len() > rev_cache_ctx.revision_cache_size { - cache.pop_front(); - } - } - }); - - let ws_ctx = rpc_ctx.clone(); - let ws_server = { - let builder = Server::bind(&ws_ctx.bind_ws); - - let make_svc = ::rpc_toolkit::hyper::service::make_service_fn(move |_| { - let ctx = ws_ctx.clone(); - async move { - Ok::<_, ::rpc_toolkit::hyper::Error>(::rpc_toolkit::hyper::service::service_fn( - move |req| { - let ctx = ctx.clone(); - async move { - match req.uri().path() { - "/db" => Ok(subscribe(ctx, req).await.unwrap_or_else(err_to_500)), - _ => Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::empty()), - } - } - }, - )) - } + let sig_handler_ctx = rpc_ctx.clone(); + let sig_handler = tokio::spawn(async move { + use tokio::signal::unix::SignalKind; + futures::future::select_all( + [ + SignalKind::interrupt(), + SignalKind::quit(), + SignalKind::terminate(), + ] + .iter() + .map(|s| { + async move { + signal(*s) + .expect(&format!("register {:?} handler", s)) + .recv() + .await + } + .boxed() + }), + ) + .await; + sig_handler_ctx + .shutdown + .send(None) + .map_err(|_| ()) + .expect("send shutdown signal"); }); - builder.serve(make_svc) - } - .with_graceful_shutdown({ - let mut shutdown = rpc_ctx.shutdown.subscribe(); - async move { - shutdown.recv().await.expect("context dropped"); - } - }); - let file_server_ctx = rpc_ctx.clone(); - let file_server = { - static_server::init(file_server_ctx, { + tokio::fs::write("/etc/nginx/sites-available/default", { + let info = embassy::db::DatabaseModel::new() + .server_info() + .get(&mut rpc_ctx.db.handle(), true) + .await?; + format!( + include_str!("../nginx/main-ui.conf.template"), + lan_hostname = info.lan_address.host_str().unwrap(), + tor_hostname = info.tor_address.host_str().unwrap() + ) + }) + .await + .with_ctx(|_| { + ( + embassy::ErrorKind::Filesystem, + "/etc/nginx/sites-available/default", + ) + })?; + Command::new("systemctl") + .arg("reload") + .arg("nginx") + .invoke(embassy::ErrorKind::Nginx) + .await?; + + let auth = auth(rpc_ctx.clone()); + let ctx = rpc_ctx.clone(); + let server = rpc_server!({ + command: embassy::main_api, + context: ctx, + status: status_fn, + middleware: [ + cors, + auth, + ] + }) + .with_graceful_shutdown({ let mut shutdown = rpc_ctx.shutdown.subscribe(); async move { shutdown.recv().await.expect("context dropped"); } - }) + }); + + let rev_cache_ctx = rpc_ctx.clone(); + let revision_cache_task = tokio::spawn(async move { + let mut sub = rev_cache_ctx.db.subscribe(); + let mut shutdown = rev_cache_ctx.shutdown.subscribe(); + loop { + let rev = match tokio::select! { + a = sub.recv() => a, + _ = shutdown.recv() => break, + } { + Ok(a) => a, + Err(_) => { + rev_cache_ctx.revision_cache.write().await.truncate(0); + continue; + } + }; // TODO: handle falling behind + let mut cache = rev_cache_ctx.revision_cache.write().await; + cache.push_back(rev); + if cache.len() > rev_cache_ctx.revision_cache_size { + cache.pop_front(); + } + } + }); + + let ws_ctx = rpc_ctx.clone(); + let ws_server = { + let builder = Server::bind(&ws_ctx.bind_ws); + + let make_svc = ::rpc_toolkit::hyper::service::make_service_fn(move |_| { + let ctx = ws_ctx.clone(); + async move { + Ok::<_, ::rpc_toolkit::hyper::Error>(::rpc_toolkit::hyper::service::service_fn( + move |req| { + let ctx = ctx.clone(); + async move { + match req.uri().path() { + "/db" => { + Ok(subscribe(ctx, req).await.unwrap_or_else(err_to_500)) + } + _ => Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()), + } + } + }, + )) + } + }); + builder.serve(make_svc) + } + .with_graceful_shutdown({ + let mut shutdown = rpc_ctx.shutdown.subscribe(); + async move { + shutdown.recv().await.expect("context dropped"); + } + }); + + let file_server_ctx = rpc_ctx.clone(); + let file_server = { + static_server::init(file_server_ctx, { + let mut shutdown = rpc_ctx.shutdown.subscribe(); + async move { + shutdown.recv().await.expect("context dropped"); + } + }) + }; + + let tor_health_ctx = rpc_ctx.clone(); + let tor_client = Client::builder() + .proxy( + Proxy::http(format!( + "socks5h://{}:{}", + rpc_ctx.tor_socks.ip(), + rpc_ctx.tor_socks.port() + )) + .with_kind(crate::ErrorKind::Network)?, + ) + .build() + .with_kind(crate::ErrorKind::Network)?; + let tor_health_daemon = daemon( + move || { + let ctx = tor_health_ctx.clone(); + let client = tor_client.clone(); + async move { tor_health_check(&client, &ctx.net_controller.tor).await } + }, + Duration::from_secs(300), + rpc_ctx.shutdown.subscribe(), + ); + + embassy::sound::MARIO_COIN.play().await?; + + futures::try_join!( + server + .map_err(|e| Error::new(e, ErrorKind::Network)) + .map_ok(|_| tracing::debug!("RPC Server Shutdown")), + revision_cache_task + .map_err(|e| Error::new( + eyre!("{}", e).wrap_err("Revision Cache daemon panicked!"), + ErrorKind::Unknown + )) + .map_ok(|_| tracing::debug!("Revision Cache Shutdown")), + ws_server + .map_err(|e| Error::new(e, ErrorKind::Network)) + .map_ok(|_| tracing::debug!("WebSocket Server Shutdown")), + file_server + .map_err(|e| Error::new(e, ErrorKind::Network)) + .map_ok(|_| tracing::debug!("Static File Server Shutdown")), + tor_health_daemon + .map_err(|e| Error::new( + e.wrap_err("Tor Health Daemon panicked!"), + ErrorKind::Unknown + )) + .map_ok(|_| tracing::debug!("Tor Health Daemon Shutdown")), + )?; + + let mut shutdown = shutdown_recv + .recv() + .await + .with_kind(crate::ErrorKind::Unknown)?; + + sig_handler.abort(); + + if let Some(shutdown) = &mut shutdown { + drop(shutdown.db_handle.take()); + } + + (rpc_ctx, shutdown) }; - - let tor_health_ctx = rpc_ctx.clone(); - let tor_client = Client::builder() - .proxy( - Proxy::http(format!( - "socks5h://{}:{}", - rpc_ctx.tor_socks.ip(), - rpc_ctx.tor_socks.port() - )) - .with_kind(crate::ErrorKind::Network)?, - ) - .build() - .with_kind(crate::ErrorKind::Network)?; - let tor_health_daemon = daemon( - move || { - let ctx = tor_health_ctx.clone(); - let client = tor_client.clone(); - async move { tor_health_check(&client, &ctx.net_controller.tor).await } - }, - Duration::from_secs(300), - rpc_ctx.shutdown.subscribe(), - ); - - embassy::sound::MARIO_COIN.play().await?; - - futures::try_join!( - server - .map_err(|e| Error::new(e, ErrorKind::Network)) - .map_ok(|_| tracing::debug!("RPC Server Shutdown")), - revision_cache_task - .map_err(|e| Error::new( - eyre!("{}", e).wrap_err("Revision Cache daemon panicked!"), - ErrorKind::Unknown - )) - .map_ok(|_| tracing::debug!("Revision Cache Shutdown")), - ws_server - .map_err(|e| Error::new(e, ErrorKind::Network)) - .map_ok(|_| tracing::debug!("WebSocket Server Shutdown")), - file_server - .map_err(|e| Error::new(e, ErrorKind::Network)) - .map_ok(|_| tracing::debug!("Static File Server Shutdown")), - tor_health_daemon - .map_err(|e| Error::new( - e.wrap_err("Tor Health Daemon panicked!"), - ErrorKind::Unknown - )) - .map_ok(|_| tracing::debug!("Tor Health Daemon Shutdown")), - )?; - - let mut shutdown = shutdown_recv - .recv() - .await - .with_kind(crate::ErrorKind::Unknown)?; - - if let Some(shutdown) = &mut shutdown { - drop(shutdown.db_handle.take()); - } - - rpc_ctx.managers.empty().await?; - - sig_handler.abort(); + rpc_ctx.shutdown().await?; Ok(shutdown) } diff --git a/appmgr/src/context/rpc.rs b/appmgr/src/context/rpc.rs index 7c37d0d1a..cf460c9f9 100644 --- a/appmgr/src/context/rpc.rs +++ b/appmgr/src/context/rpc.rs @@ -132,11 +132,14 @@ impl RpcContext { disk_guid: Arc, ) -> Result { let base = RpcContextConfig::load(cfg_path).await?; - let log_epoch = Arc::new(AtomicU64::new(rand::random())); - let logger = EmbassyLogger::init(log_epoch.clone(), base.log_server.clone(), false); + tracing::info!("Loaded Config"); + let logger = EmbassyLogger::init(base.log_server.clone(), false); + tracing::info!("Set Logger"); let (shutdown, _) = tokio::sync::broadcast::channel(1); let secret_store = base.secret_store().await?; + tracing::info!("Opened Sqlite DB"); let db = base.db(&secret_store).await?; + tracing::info!("Opened PatchDB"); let share = crate::db::DatabaseModel::new() .server_info() .share_stats() @@ -144,6 +147,7 @@ impl RpcContext { .await?; logger.set_sharing(*share); let docker = Docker::connect_with_unix_defaults()?; + tracing::info!("Connected to Docker"); let net_controller = NetController::init( ([127, 0, 0, 1], 80).into(), crate::net::tor::os_key(&mut secret_store.acquire().await?).await?, @@ -153,9 +157,11 @@ impl RpcContext { None, ) .await?; + tracing::info!("Initialized Net Controller"); let managers = ManagerMap::default(); let metrics_cache = RwLock::new(None); let notification_manager = NotificationManager::new(secret_store.clone()); + tracing::info!("Initialized Notification Manager"); let seed = Arc::new(RpcContextSeed { bind_rpc: base.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()), bind_ws: base.bind_ws.unwrap_or(([127, 0, 0, 1], 5960).into()), @@ -172,8 +178,8 @@ impl RpcContext { metrics_cache, shutdown, websocket_count: AtomicUsize::new(0), + log_epoch: logger.epoch(), logger, - log_epoch, tor_socks: base.tor_socks.unwrap_or(SocketAddr::V4(SocketAddrV4::new( Ipv4Addr::new(127, 0, 0, 1), 9050, @@ -188,6 +194,7 @@ impl RpcContext { .await }); let res = Self(seed); + tracing::info!("Initialized Package Managers"); res.managers .init( &res, @@ -223,6 +230,23 @@ impl RpcContext { .await? .to_owned()) } + #[instrument(skip(self))] + pub async fn shutdown(self) -> Result<(), Error> { + self.managers.empty().await?; + match Arc::try_unwrap(self.0) { + Ok(seed) => { + let RpcContextSeed { secret_store, .. } = seed; + secret_store.close().await; + } + Err(ctx) => { + tracing::warn!( + "{} RPC Context(s) are still being held somewhere. This is likely a mistake.", + Arc::strong_count(&ctx) - 1 + ); + } + } + Ok(()) + } } impl Context for RpcContext { fn host(&self) -> Host<&str> { diff --git a/appmgr/src/context/setup.rs b/appmgr/src/context/setup.rs index 78ad239a1..57ec3c878 100644 --- a/appmgr/src/context/setup.rs +++ b/appmgr/src/context/setup.rs @@ -55,6 +55,7 @@ impl SetupContextConfig { } pub struct SetupContextSeed { + pub config_path: Option, pub bind_rpc: SocketAddr, pub shutdown: Sender<()>, pub datadir: PathBuf, @@ -68,10 +69,11 @@ pub struct SetupContext(Arc); impl SetupContext { #[instrument(skip(path))] pub async fn init>(path: Option

) -> Result { - let cfg = SetupContextConfig::load(path).await?; + let cfg = SetupContextConfig::load(path.as_ref()).await?; let (shutdown, _) = tokio::sync::broadcast::channel(1); let datadir = cfg.datadir().to_owned(); Ok(Self(Arc::new(SetupContextSeed { + config_path: path.as_ref().map(|p| p.as_ref().to_owned()), bind_rpc: cfg.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()), shutdown, datadir, diff --git a/appmgr/src/disk/main.rs b/appmgr/src/disk/main.rs index 00b862482..c891e4028 100644 --- a/appmgr/src/disk/main.rs +++ b/appmgr/src/disk/main.rs @@ -106,6 +106,7 @@ pub async fn create_fs>( .invoke(crate::ErrorKind::DiskManagement) .await?; Command::new("cryptsetup") + .arg("-q") .arg("luksFormat") .arg(format!("--key-file={}", PASSWORD_PATH)) .arg(format!("--keyfile-size={}", password.len())) @@ -113,6 +114,7 @@ pub async fn create_fs>( .invoke(crate::ErrorKind::DiskManagement) .await?; Command::new("cryptsetup") + .arg("-q") .arg("luksOpen") .arg(format!("--key-file={}", PASSWORD_PATH)) .arg(format!("--keyfile-size={}", password.len())) @@ -183,6 +185,7 @@ pub async fn unmount_fs>( unmount(datadir.as_ref().join(name)).await?; } Command::new("cryptsetup") + .arg("-q") .arg("luksClose") .arg(format!("{}_{}", guid, name)) .invoke(crate::ErrorKind::DiskManagement) @@ -196,6 +199,10 @@ pub async fn unmount_all_fs>(guid: &str, datadir: P) -> Result<() unmount_fs(guid, &datadir, "main", false).await?; unmount_fs(guid, &datadir, "swap", true).await?; unmount_fs(guid, &datadir, "package-data", false).await?; + Command::new("dmsetup") + .arg("remove_all") // TODO: find a higher finesse way to do this for portability reasons + .invoke(crate::ErrorKind::DiskManagement) + .await?; Ok(()) } @@ -251,6 +258,7 @@ pub async fn mount_fs>( .await .with_ctx(|_| (crate::ErrorKind::Filesystem, PASSWORD_PATH))?; Command::new("cryptsetup") + .arg("-q") .arg("luksOpen") .arg(format!("--key-file={}", PASSWORD_PATH)) .arg(format!("--keyfile-size={}", password.len())) diff --git a/appmgr/src/disk/util.rs b/appmgr/src/disk/util.rs index ab923ae35..1939b7013 100644 --- a/appmgr/src/disk/util.rs +++ b/appmgr/src/disk/util.rs @@ -191,6 +191,7 @@ pub async fn get_percentage>(path: P) -> Result { .parse::()?) } +#[instrument] pub async fn pvscan() -> Result>, Error> { let pvscan_out = Command::new("pvscan") .invoke(crate::ErrorKind::DiskManagement) diff --git a/appmgr/src/init.rs b/appmgr/src/init.rs new file mode 100644 index 000000000..51bc4ea99 --- /dev/null +++ b/appmgr/src/init.rs @@ -0,0 +1,88 @@ +use tokio::process::Command; + +use crate::context::rpc::RpcContextConfig; +use crate::db::model::ServerStatus; +use crate::install::PKG_DOCKER_DIR; +use crate::util::Invoke; +use crate::Error; + +pub async fn init(cfg: &RpcContextConfig) -> Result<(), Error> { + let secret_store = cfg.secret_store().await?; + let log_dir = cfg.datadir().join("main").join("logs"); + if tokio::fs::metadata(&log_dir).await.is_err() { + tokio::fs::create_dir_all(&log_dir).await?; + } + crate::disk::util::bind(&log_dir, "/var/log/journal", false).await?; + Command::new("systemctl") + .arg("restart") + .arg("systemd-journald") + .invoke(crate::ErrorKind::Journald) + .await?; + tracing::info!("Mounted Logs"); + let tmp_dir = cfg.datadir().join("package-data/tmp"); + if tokio::fs::metadata(&tmp_dir).await.is_err() { + tokio::fs::create_dir_all(&tmp_dir).await?; + } + let tmp_docker = cfg.datadir().join("package-data/tmp/docker"); + if tokio::fs::metadata(&tmp_docker).await.is_ok() { + tokio::fs::remove_dir_all(&tmp_docker).await?; + } + Command::new("cp") + .arg("-r") + .arg("/var/lib/docker") + .arg(&tmp_docker) + .invoke(crate::ErrorKind::Filesystem) + .await?; + Command::new("systemctl") + .arg("stop") + .arg("docker") + .invoke(crate::ErrorKind::Docker) + .await?; + crate::disk::util::bind(&tmp_docker, "/var/lib/docker", false).await?; + Command::new("systemctl") + .arg("reset-failed") + .arg("docker") + .invoke(crate::ErrorKind::Docker) + .await?; + Command::new("systemctl") + .arg("start") + .arg("docker") + .invoke(crate::ErrorKind::Docker) + .await?; + tracing::info!("Mounted Docker Data"); + + crate::install::load_images(cfg.datadir().join(PKG_DOCKER_DIR)).await?; + tracing::info!("Loaded Docker Images"); + // Loading system images + crate::install::load_images("/var/lib/embassy/system-images").await?; + tracing::info!("Loaded System Docker Images"); + + crate::ssh::sync_keys_from_db(&secret_store, "/root/.ssh/authorized_keys").await?; + tracing::info!("Synced SSH Keys"); + + crate::hostname::sync_hostname().await?; + tracing::info!("Synced Hostname"); + crate::net::wifi::synchronize_wpa_supplicant_conf(&cfg.datadir().join("main")).await?; + tracing::info!("Synchronized wpa_supplicant.conf"); + + let db = cfg.db(&secret_store).await?; + let mut handle = db.handle(); + let mut info = crate::db::DatabaseModel::new() + .server_info() + .get_mut(&mut handle) + .await?; + match info.status { + ServerStatus::Running | ServerStatus::Updated | ServerStatus::BackingUp => { + info.status = ServerStatus::Running; + } + ServerStatus::Updating => { + info.update_progress = None; + info.status = ServerStatus::Running; + } + } + info.save(&mut handle).await?; + + crate::version::init(&mut handle).await?; + + Ok(()) +} diff --git a/appmgr/src/install/mod.rs b/appmgr/src/install/mod.rs index 8538e7989..dcf5577e7 100644 --- a/appmgr/src/install/mod.rs +++ b/appmgr/src/install/mod.rs @@ -11,7 +11,6 @@ use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryStreamExt}; use http::StatusCode; use patch_db::{DbHandle, LockType}; -use reqwest::Response; use rpc_toolkit::command; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt}; @@ -137,7 +136,25 @@ pub async fn install( tokio::spawn(async move { let mut db_handle = ctx.db.handle(); - if let Err(e) = download_install_s9pk(&ctx, &man, s9pk).await { + if let Err(e) = download_install_s9pk( + &ctx, + &man, + InstallProgress::new(s9pk.content_length()), + tokio_util::io::StreamReader::new(s9pk.bytes_stream().map_err(|e| { + std::io::Error::new( + if e.is_connect() { + std::io::ErrorKind::ConnectionRefused + } else if e.is_timeout() { + std::io::ErrorKind::TimedOut + } else { + std::io::ErrorKind::Other + }, + e, + ) + })), + ) + .await + { let err_str = format!("Install of {}@{} Failed: {}", man.id, man.version, e); tracing::error!("{}", err_str); tracing::debug!("{:?}", e); @@ -250,57 +267,42 @@ pub async fn uninstall_impl(ctx: RpcContext, id: PackageId) -> Result, + mut s9pk: impl AsyncRead + Unpin, ) -> Result<(), Error> { let pkg_id = &temp_manifest.id; let version = &temp_manifest.version; - let pkg_cache_dir = ctx + let pkg_archive_dir = ctx .datadir .join(PKG_ARCHIVE_DIR) .join(pkg_id) .join(version.as_str()); - tokio::fs::create_dir_all(&pkg_cache_dir).await?; - let pkg_cache = pkg_cache_dir.join(AsRef::::as_ref(pkg_id).with_extension("s9pk")); + tokio::fs::create_dir_all(&pkg_archive_dir).await?; + let pkg_archive = pkg_archive_dir.join(AsRef::::as_ref(pkg_id).with_extension("s9pk")); let pkg_data_entry = crate::db::DatabaseModel::new() .package_data() .idx_model(pkg_id); - let progress = InstallProgress::new(s9pk.content_length()); let progress_model = pkg_data_entry.and_then(|pde| pde.install_progress()); - File::delete(&pkg_cache).await?; + File::delete(&pkg_archive).await?; let mut dst = OpenOptions::new() .create(true) .write(true) .read(true) - .open(&pkg_cache) + .open(&pkg_archive) .await?; progress .track_download_during(progress_model.clone(), &ctx.db, || async { let mut progress_writer = InstallProgressTracker::new(&mut dst, progress.clone()); - tokio::io::copy( - &mut tokio_util::io::StreamReader::new(s9pk.bytes_stream().map_err(|e| { - std::io::Error::new( - if e.is_connect() { - std::io::ErrorKind::ConnectionRefused - } else if e.is_timeout() { - std::io::ErrorKind::TimedOut - } else { - std::io::ErrorKind::Other - }, - e, - ) - })), - &mut progress_writer, - ) - .await?; + tokio::io::copy(&mut s9pk, &mut progress_writer).await?; progress.download_complete(); Ok(()) }) diff --git a/appmgr/src/lib.rs b/appmgr/src/lib.rs index 9f35f782a..4a5e75ff3 100644 --- a/appmgr/src/lib.rs +++ b/appmgr/src/lib.rs @@ -16,6 +16,7 @@ pub mod disk; pub mod error; pub mod hostname; pub mod id; +pub mod init; pub mod inspect; pub mod install; pub mod logs; @@ -42,7 +43,6 @@ pub use config::Config; pub use error::{Error, ErrorKind, ResultExt}; use rpc_toolkit::command; use rpc_toolkit::yajrc::RpcError; -pub use version::init; #[command(metadata(authenticated = false))] pub fn echo(#[arg] message: String) -> Result { diff --git a/appmgr/src/manager/mod.rs b/appmgr/src/manager/mod.rs index 1039bb38d..29b7f28b3 100644 --- a/appmgr/src/manager/mod.rs +++ b/appmgr/src/manager/mod.rs @@ -110,6 +110,14 @@ impl ManagerMap { |((id, version), man)| async move { man.exit().await?; tracing::debug!("Manager for {}@{} shutdown", id, version); + if let Err(e) = Arc::try_unwrap(man) { + tracing::trace!( + "Manager for {}@{} still has {} other open references", + id, + version, + Arc::strong_count(&e) - 1 + ); + } Ok::<_, Error>(()) }, )) diff --git a/appmgr/src/manager/sync.rs b/appmgr/src/manager/sync.rs index 65800913f..2c58774d0 100644 --- a/appmgr/src/manager/sync.rs +++ b/appmgr/src/manager/sync.rs @@ -69,6 +69,7 @@ pub async fn synchronizer(shared: &ManagerSharedState) { ); tracing::debug!("{:?}", e); } else { + tracing::trace!("{} status synchronized", shared.manifest.id); shared.synchronized.notify_waiters(); } tokio::select! { diff --git a/appmgr/src/middleware/auth.rs b/appmgr/src/middleware/auth.rs index a9bc17e88..720507045 100644 --- a/appmgr/src/middleware/auth.rs +++ b/appmgr/src/middleware/auth.rs @@ -1,6 +1,3 @@ -use crate::context::RpcContext; -use crate::{Error, ResultExt}; - use basic_cookies::Cookie; use color_eyre::eyre::eyre; use digest::Digest; @@ -16,6 +13,9 @@ use rpc_toolkit::yajrc::RpcMethod; use rpc_toolkit::Metadata; use serde::{Deserialize, Serialize}; use sha2::Sha256; + +use crate::context::RpcContext; +use crate::{Error, ResultExt}; pub trait AsLogoutSessionId { fn as_logout_session_id(self) -> String; } diff --git a/appmgr/src/setup.rs b/appmgr/src/setup.rs index b69b220cd..82a33f662 100644 --- a/appmgr/src/setup.rs +++ b/appmgr/src/setup.rs @@ -1,11 +1,12 @@ use std::collections::BTreeMap; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; use std::time::Duration; use color_eyre::eyre::eyre; use futures::future::BoxFuture; -use futures::{FutureExt, TryStreamExt}; +use futures::{FutureExt, TryFutureExt, TryStreamExt}; use openssl::x509::X509; use rpc_toolkit::command; use rpc_toolkit::yajrc::RpcError; @@ -15,19 +16,22 @@ use tokio::io::AsyncWriteExt; use torut::onion::{OnionAddressV3, TorSecretKeyV3}; use tracing::instrument; +use crate::backup::restore::recover_full_embassy; +use crate::context::rpc::RpcContextConfig; use crate::context::SetupContext; use crate::db::model::RecoveredPackageInfo; use crate::disk::main::DEFAULT_PASSWORD; use crate::disk::util::{pvscan, DiskInfo, PartitionInfo, TmpMountGuard}; use crate::id::Id; +use crate::init::init; use crate::install::PKG_PUBLIC_DIR; use crate::net::ssl::SslManager; use crate::s9pk::manifest::PackageId; use crate::sound::BEETHOVEN; -use crate::util::io::from_yaml_async_reader; +use crate::util::io::{dir_size, from_yaml_async_reader}; use crate::util::Version; use crate::volume::{data_dir, VolumeId}; -use crate::{Error, ResultExt}; +use crate::{ensure_code, Error, ResultExt}; #[command(subcommands(status, disk, execute, recovery))] pub fn setup() -> Result<(), Error> { @@ -69,8 +73,8 @@ pub fn recovery() -> Result<(), Error> { #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] pub struct RecoveryStatus { - bytes_transferred: u64, - total_bytes: u64, + pub bytes_transferred: u64, + pub total_bytes: u64, } #[command(rename = "status", rpc_only, metadata(authenticated = false))] @@ -122,7 +126,7 @@ pub async fn execute( } #[instrument(skip(ctx))] -pub async fn complete_setup(ctx: SetupContext, guid: String) -> Result<(), Error> { +pub async fn complete_setup(ctx: SetupContext, guid: Arc) -> Result<(), Error> { let mut guid_file = File::create("/embassy-os/disk.guid").await?; guid_file.write_all(guid.as_bytes()).await?; guid_file.sync_all().await?; @@ -130,7 +134,7 @@ pub async fn complete_setup(ctx: SetupContext, guid: String) -> Result<(), Error Ok(()) } -#[instrument(skip(ctx))] +#[instrument(skip(ctx, embassy_password, recovery_password))] pub async fn execute_inner( ctx: SetupContext, embassy_logicalname: PathBuf, @@ -144,14 +148,61 @@ pub async fn execute_inner( crate::ErrorKind::InvalidRequest, )); } - let guid = crate::disk::main::create( - &[embassy_logicalname], - &pvscan().await?, - &ctx.datadir, - DEFAULT_PASSWORD, - ) - .await?; - crate::disk::main::import(&guid, &ctx.datadir, DEFAULT_PASSWORD).await?; + let guid = Arc::new( + crate::disk::main::create( + &[embassy_logicalname], + &pvscan().await?, + &ctx.datadir, + DEFAULT_PASSWORD, + ) + .await?, + ); + crate::disk::main::import(&*guid, &ctx.datadir, DEFAULT_PASSWORD).await?; + + let res = if let Some(recovery_partition) = recovery_partition { + if recovery_partition + .embassy_os + .as_ref() + .map(|v| &*v.version < &emver::Version::new(0, 2, 8, 0)) + .unwrap_or(true) + { + return Err(Error::new(eyre!("Unsupported version of EmbassyOS. Please update to at least 0.2.8 before recovering."), crate::ErrorKind::VersionIncompatible)); + } + let (tor_addr, root_ca, recover_fut) = recover( + ctx.clone(), + guid.clone(), + embassy_password, + recovery_partition, + recovery_password, + ) + .await?; + init(&RpcContextConfig::load(ctx.config_path.as_ref()).await?).await?; + tokio::spawn(async move { + if let Err(e) = recover_fut + .and_then(|_| complete_setup(ctx.clone(), guid)) + .await + { + BEETHOVEN.play().await.unwrap_or_default(); // ignore error in playing the song + tracing::error!("Error recovering drive!: {}", e); + tracing::debug!("{:?}", e); + *ctx.recovery_status.write().await = Some(Err(e.into())); + } + }); + (tor_addr, root_ca) + } else { + let res = fresh_setup(&ctx, &embassy_password).await?; + init(&RpcContextConfig::load(ctx.config_path.as_ref()).await?).await?; + complete_setup(ctx, guid).await?; + res + }; + + Ok(res) +} + +async fn fresh_setup( + ctx: &SetupContext, + embassy_password: &str, +) -> Result<(OnionAddressV3, X509), Error> { let password = argon2::hash_encoded( embassy_password.as_bytes(), &rand::random::<[u8; 16]>()[..], @@ -162,7 +213,7 @@ pub async fn execute_inner( let key_vec = tor_key.as_bytes().to_vec(); let sqlite_pool = ctx.secret_store().await?; sqlx::query!( - "INSERT OR REPLACE INTO account (id, password, tor_key) VALUES (?, ?, ?)", + "REPLACE INTO account (id, password, tor_key) VALUES (?, ?, ?)", 0, password, key_vec, @@ -174,76 +225,46 @@ pub async fn execute_inner( .export_root_ca() .await?; sqlite_pool.close().await; - - if let Some(recovery_partition) = recovery_partition { - if recovery_partition - .embassy_os - .as_ref() - .map(|v| &*v.version < &emver::Version::new(0, 2, 8, 0)) - .unwrap_or(true) - { - return Err(Error::new(eyre!("Unsupported version of EmbassyOS. Please update to at least 0.2.8 before recovering."), crate::ErrorKind::VersionIncompatible)); - } - tokio::spawn(async move { - if let Err(e) = recover(ctx.clone(), guid, recovery_partition, recovery_password).await - { - BEETHOVEN.play().await.unwrap_or_default(); // ignore error in playing the song - tracing::error!("Error recovering drive!: {}", e); - tracing::debug!("{:?}", e); - *ctx.recovery_status.write().await = Some(Err(e.into())); - } - }); - } else { - complete_setup(ctx, guid).await?; - } - Ok((tor_key.public().get_onion_address(), root_ca)) } -#[instrument(skip(ctx))] +#[instrument(skip(ctx, embassy_password, recovery_password))] async fn recover( ctx: SetupContext, - guid: String, + guid: Arc, + embassy_password: String, recovery_partition: PartitionInfo, recovery_password: Option, -) -> Result<(), Error> { +) -> Result<(OnionAddressV3, X509, BoxFuture<'static, Result<(), Error>>), Error> { let recovery_version = recovery_partition .embassy_os .as_ref() .map(|i| i.version.clone()) .unwrap_or_default(); - if recovery_version.major() == 0 && recovery_version.minor() == 2 { - recover_v2(&ctx, recovery_partition).await?; + let res = if recovery_version.major() == 0 && recovery_version.minor() == 2 { + let (tor_addr, root_ca) = fresh_setup(&ctx, &embassy_password).await?; + ( + tor_addr, + root_ca, + recover_v2(ctx.clone(), recovery_partition).boxed(), + ) } else if recovery_version.major() == 0 && recovery_version.minor() == 3 { - recover_v3(&ctx, recovery_partition, recovery_password).await?; + recover_full_embassy( + ctx.clone(), + guid.clone(), + embassy_password, + recovery_partition, + recovery_password, + ) + .await? } else { return Err(Error::new( eyre!("Unsupported version of EmbassyOS: {}", recovery_version), crate::ErrorKind::VersionIncompatible, )); - } + }; - complete_setup(ctx, guid).await -} - -fn dir_size<'a, P: AsRef + 'a + Send + Sync>( - path: P, - res: &'a AtomicU64, -) -> BoxFuture<'a, Result<(), std::io::Error>> { - async move { - tokio_stream::wrappers::ReadDirStream::new(tokio::fs::read_dir(path.as_ref()).await?) - .try_for_each(|e| async move { - let m = e.metadata().await?; - if m.is_file() { - res.fetch_add(m.len(), Ordering::Relaxed); - } else if m.is_dir() { - dir_size(e.path(), res).await?; - } - Ok(()) - }) - .await - } - .boxed() + Ok(res) } fn dir_copy<'a, P0: AsRef + 'a + Send + Sync, P1: AsRef + 'a + Send + Sync>( @@ -300,15 +321,7 @@ fn dir_copy<'a, P0: AsRef + 'a + Send + Sync, P1: AsRef + 'a + Send format!("cp -P {} -> {}", src_path.display(), dst_path.display()), ) })?; - // Removed (see https://unix.stackexchange.com/questions/87200/change-permissions-for-a-symbolic-link): - // tokio::fs::set_permissions(&dst_path, m.permissions()) - // .await - // .with_ctx(|_| { - // ( - // crate::ErrorKind::Filesystem, - // format!("chmod {}", dst_path.display()), - // ) - // })?; + // Do not set permissions (see https://unix.stackexchange.com/questions/87200/change-permissions-for-a-symbolic-link) } Ok(()) }) @@ -319,7 +332,7 @@ fn dir_copy<'a, P0: AsRef + 'a + Send + Sync, P1: AsRef + 'a + Send } #[instrument(skip(ctx))] -async fn recover_v2(ctx: &SetupContext, recovery_partition: PartitionInfo) -> Result<(), Error> { +async fn recover_v2(ctx: SetupContext, recovery_partition: PartitionInfo) -> Result<(), Error> { let recovery = TmpMountGuard::mount(&recovery_partition.logicalname, None).await?; let secret_store = ctx.secret_store().await?; @@ -346,19 +359,16 @@ async fn recover_v2(ctx: &SetupContext, recovery_partition: PartitionInfo) -> Re .await?; let volume_path = recovery.as_ref().join("root/volumes"); - let total_bytes = AtomicU64::new(0); + let mut total_bytes = 0; for (pkg_id, _) in &packages { let volume_src_path = volume_path.join(&pkg_id); - dir_size(&volume_src_path, &total_bytes) - .await - .with_ctx(|_| { - ( - crate::ErrorKind::Filesystem, - volume_src_path.display().to_string(), - ) - })?; + total_bytes += dir_size(&volume_src_path).await.with_ctx(|_| { + ( + crate::ErrorKind::Filesystem, + volume_src_path.display().to_string(), + ) + })?; } - let total_bytes = total_bytes.load(Ordering::SeqCst); *ctx.recovery_status.write().await = Some(Ok(RecoveryStatus { bytes_transferred: 0, total_bytes, @@ -392,6 +402,31 @@ async fn recover_v2(ctx: &SetupContext, recovery_partition: PartitionInfo) -> Re } } => (), ); + let tor_src_path = recovery + .as_ref() + .join("var/lib/tor") + .join(format!("app-{}", pkg_id)) + .join("hs_ed25519_secret_key"); + let key_vec = tokio::fs::read(&tor_src_path).await.with_ctx(|_| { + ( + crate::ErrorKind::Filesystem, + tor_src_path.display().to_string(), + ) + })?; + ensure_code!( + key_vec.len() == 96, + crate::ErrorKind::Tor, + "{} not 96 bytes", + tor_src_path.display() + ); + let key_vec = key_vec[32..].to_vec(); + sqlx::query!( + "REPLACE INTO tor (package, interface, key) VALUES (?, 'main', ?)", + *pkg_id, + key_vec, + ) + .execute(&mut secret_store.acquire().await?) + .await?; let icon_leaf = AsRef::::as_ref(&pkg_id) .join(info.version.as_str()) .join("icon.png"); @@ -399,7 +434,6 @@ async fn recover_v2(ctx: &SetupContext, recovery_partition: PartitionInfo) -> Re .as_ref() .join("root/agent/icons") .join(format!("{}.png", pkg_id)); - // TODO: tor address let icon_dst_path = ctx.datadir.join(PKG_PUBLIC_DIR).join(&icon_leaf); if let Some(parent) = icon_dst_path.parent() { tokio::fs::create_dir_all(&parent) @@ -433,15 +467,7 @@ async fn recover_v2(ctx: &SetupContext, recovery_partition: PartitionInfo) -> Re .await?; } + secret_store.close().await; recovery.unmount().await?; Ok(()) } - -#[instrument(skip(ctx))] -async fn recover_v3( - ctx: &SetupContext, - recovery_partition: PartitionInfo, - recovery_password: Option, -) -> Result<(), Error> { - todo!() -} diff --git a/appmgr/src/shutdown.rs b/appmgr/src/shutdown.rs index 8d13ae708..87348e4eb 100644 --- a/appmgr/src/shutdown.rs +++ b/appmgr/src/shutdown.rs @@ -22,42 +22,43 @@ impl Shutdown { pub fn execute(&self) { use std::process::Command; - tokio::runtime::Builder::new_current_thread() + let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() - .unwrap() - .block_on(async { - use tokio::process::Command; + .unwrap(); + rt.block_on(async { + use tokio::process::Command; - if let Err(e) = Command::new("systemctl") - .arg("stop") - .arg("systemd-journald") - .invoke(crate::ErrorKind::Journald) - .await - { - tracing::error!("Error Stopping Journald: {}", e); + if let Err(e) = Command::new("systemctl") + .arg("stop") + .arg("systemd-journald") + .invoke(crate::ErrorKind::Journald) + .await + { + tracing::error!("Error Stopping Journald: {}", e); + tracing::debug!("{:?}", e); + } + if let Err(e) = Command::new("systemctl") + .arg("stop") + .arg("docker") + .invoke(crate::ErrorKind::Docker) + .await + { + tracing::error!("Error Stopping Docker: {}", e); + tracing::debug!("{:?}", e); + } + if let Some(guid) = &self.disk_guid { + if let Err(e) = export(guid, &self.datadir).await { + tracing::error!("Error Exporting Volume Group: {}", e); tracing::debug!("{:?}", e); } - if let Err(e) = Command::new("systemctl") - .arg("stop") - .arg("docker") - .invoke(crate::ErrorKind::Docker) - .await - { - tracing::error!("Error Stopping Docker: {}", e); - tracing::debug!("{:?}", e); - } - if let Some(guid) = &self.disk_guid { - if let Err(e) = export(guid, &self.datadir).await { - tracing::error!("Error Exporting Volume Group: {}", e); - tracing::debug!("{:?}", e); - } - } - if let Err(e) = MARIO_DEATH.play().await { - tracing::error!("Error Playing Shutdown Song: {}", e); - tracing::debug!("{:?}", e); - } - }); + } + if let Err(e) = MARIO_DEATH.play().await { + tracing::error!("Error Playing Shutdown Song: {}", e); + tracing::debug!("{:?}", e); + } + }); + drop(rt); if self.restart { Command::new("reboot").spawn().unwrap().wait().unwrap(); } else { diff --git a/appmgr/src/sound.rs b/appmgr/src/sound.rs index 4a560ad3c..4b3e73674 100644 --- a/appmgr/src/sound.rs +++ b/appmgr/src/sound.rs @@ -93,6 +93,13 @@ impl SoundInterface { .await .with_ctx(|_| (ErrorKind::SoundError, SWITCH_FILE.to_string_lossy())) } + #[instrument(skip(self))] + pub async fn close(mut self) -> Result<(), Error> { + if let Some(lock) = self.0.take() { + lock.unlock().await?; + } + Ok(()) + } } pub struct Song { @@ -114,6 +121,7 @@ where Some(n) => sound.play_for_time_slice(self.tempo_qpm, n, slice).await?, }; } + sound.close().await?; } Ok(()) } diff --git a/appmgr/src/update/latest_information.rs b/appmgr/src/update/latest_information.rs index 0434ae928..eb0139c2b 100644 --- a/appmgr/src/update/latest_information.rs +++ b/appmgr/src/update/latest_information.rs @@ -1,7 +1,8 @@ +use std::collections::HashMap; + use emver::Version; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; -use std::collections::HashMap; #[serde_as] #[derive(Debug, Deserialize, Serialize)] diff --git a/appmgr/src/util/io.rs b/appmgr/src/util/io.rs index f1b599fc1..2bc14c057 100644 --- a/appmgr/src/util/io.rs +++ b/appmgr/src/util/io.rs @@ -1,3 +1,7 @@ +use std::path::Path; + +use futures::future::BoxFuture; +use futures::{FutureExt, TryStreamExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use crate::ResultExt; @@ -193,3 +197,24 @@ pub async fn copy_and_shutdown( w.shutdown().await?; Ok(()) } + +pub fn dir_size<'a, P: AsRef + 'a + Send + Sync>( + path: P, +) -> BoxFuture<'a, Result> { + async move { + tokio_stream::wrappers::ReadDirStream::new(tokio::fs::read_dir(path.as_ref()).await?) + .try_fold(0, |acc, e| async move { + let m = e.metadata().await?; + Ok(acc + + if m.is_file() { + m.len() + } else if m.is_dir() { + dir_size(e.path()).await? + } else { + 0 + }) + }) + .await + } + .boxed() +} diff --git a/appmgr/src/util/logger.rs b/appmgr/src/util/logger.rs index 6931c42ae..2f9e5f538 100644 --- a/appmgr/src/util/logger.rs +++ b/appmgr/src/util/logger.rs @@ -1,6 +1,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use lazy_static::lazy_static; use reqwest::{Client, Url}; use serde::Serialize; use tracing::Subscriber; @@ -51,6 +52,10 @@ impl Layer for SharingLayer { } } +lazy_static! { + static ref LOGGER: Mutex, Arc)>> = Mutex::new(None); +} + #[derive(Clone)] pub struct EmbassyLogger { log_epoch: Arc, @@ -71,30 +76,39 @@ impl EmbassyLogger { .with(ErrorLayer::default()) } pub fn no_sharing() { - use tracing_subscriber::prelude::*; - - Self::base_subscriber().init(); - color_eyre::install().expect("Color Eyre Init"); + Self::init(None, false); } - pub fn init(log_epoch: Arc, share_dest: Option, share_errors: bool) -> Self { + pub fn init(share_dest: Option, share_errors: bool) -> Self { use tracing_subscriber::prelude::*; - let share_dest = match share_dest { - None => "https://beta-registry-0-3.start9labs.com/error-logs".to_owned(), // TODO - Some(a) => a.to_string(), - }; - let sharing = Arc::new(AtomicBool::new(share_errors)); - let sharing_layer = SharingLayer { - log_epoch: log_epoch.clone(), - share_dest, - sharing: sharing.clone(), - }; + let mut guard = LOGGER.lock().unwrap(); + let (log_epoch, sharing) = if let Some((log_epoch, sharing)) = guard.take() { + sharing.store(share_errors, Ordering::SeqCst); + (log_epoch, sharing) + } else { + let log_epoch = Arc::new(AtomicU64::new(rand::random())); + let sharing = Arc::new(AtomicBool::new(share_errors)); + let share_dest = match share_dest { + None => "https://beta-registry-0-3.start9labs.com/error-logs".to_owned(), // TODO + Some(a) => a.to_string(), + }; + let sharing_layer = SharingLayer { + log_epoch: log_epoch.clone(), + share_dest, + sharing: sharing.clone(), + }; - Self::base_subscriber().with(sharing_layer).init(); - color_eyre::install().expect("Color Eyre Init"); + Self::base_subscriber().with(sharing_layer).init(); + color_eyre::install().expect("Color Eyre Init"); + (log_epoch, sharing) + }; + *guard = Some((log_epoch.clone(), sharing.clone())); EmbassyLogger { log_epoch, sharing } } + pub fn epoch(&self) -> Arc { + self.log_epoch.clone() + } pub fn set_sharing(&self, sharing: bool) { self.sharing.store(sharing, Ordering::SeqCst) }