Feature/full embassy recovery (#775)

* kinda working

* recovery working

* Update appmgr/src/manager/mod.rs

Co-authored-by: Keagan McClelland <keagan.mcclelland@gmail.com>
This commit is contained in:
Aiden McClelland
2021-11-08 14:14:47 -07:00
parent 46cb0c2aa8
commit d4297b16d2
22 changed files with 899 additions and 554 deletions

View File

@@ -30,16 +30,6 @@
"nullable": []
}
},
"165daa7d6a60cb42122373b2c5ac7d39399bcc99992f0002ee7bfef50a8daceb": {
"query": "DELETE FROM certificates WHERE id = 0 OR id = 1;",
"describe": {
"columns": [],
"parameters": {
"Right": 0
},
"nullable": []
}
},
"177c4b9cc7901a3b906e5969b86b1c11e6acbfb8e86e98f197d7333030b17964": {
"query": "DELETE FROM notifications WHERE id = ?",
"describe": {
@@ -50,6 +40,16 @@
"nullable": []
}
},
"1eee1fdc793919c391008854407143d7a11b4668486c11a760b49af49992f9f8": {
"query": "REPLACE INTO tor (package, interface, key) VALUES (?, 'main', ?)",
"describe": {
"columns": [],
"parameters": {
"Right": 2
},
"nullable": []
}
},
"3502e58f2ab48fb4566d21c920c096f81acfa3ff0d02f970626a4dcd67bac71d": {
"query": "SELECT tor_key FROM account",
"describe": {
@@ -296,16 +296,6 @@
]
}
},
"70a100abc8ca04ffc559ca16460d4fd4c65aa34952ade2681491f0b44dc1aaa1": {
"query": "INSERT OR REPLACE INTO account (id, password, tor_key) VALUES (?, ?, ?)",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
}
},
"7d548d2472fa3707bd17364b4800e229b9c2b1c0a22e245bf4e635b9b16b8c24": {
"query": "INSERT INTO certificates (priv_key_pem, certificate_pem, lookup_string, created_at, updated_at) VALUES (?, ?, ?, datetime('now'), datetime('now'))",
"describe": {
@@ -364,6 +354,16 @@
]
}
},
"9fcedab1ba34daa2c6ae97c5953c09821b35b55be75b0c66045ab31a2cf4553e": {
"query": "REPLACE INTO account (id, password, tor_key) VALUES (?, ?, ?)",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
}
},
"a1cbaac36d8e14c8c3e7276237c4824bff18861f91b0b08aa5791704c492acb7": {
"query": "INSERT INTO certificates (id, priv_key_pem, certificate_pem, lookup_string, created_at, updated_at) VALUES (1, ?, ?, NULL, datetime('now'), datetime('now'))",
"describe": {

View File

@@ -41,15 +41,29 @@ impl<'de> Deserialize<'de> for OsBackup {
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(rename = "kebab-case")]
struct OsBackupDe {
tor_key: TorSecretKeyV3,
tor_key: String,
root_ca_key: String,
root_ca_cert: String,
ui: Value,
}
let int = OsBackupDe::deserialize(deserializer)?;
let key_vec = base32::decode(base32::Alphabet::RFC4648 { padding: true }, &int.tor_key)
.ok_or(serde::de::Error::invalid_value(
serde::de::Unexpected::Str(&int.tor_key),
&"an RFC4648 encoded string",
))?;
if key_vec.len() != 64 {
return Err(serde::de::Error::invalid_value(
serde::de::Unexpected::Str(&int.tor_key),
&"a 64 byte value encoded as an RFC4648 string",
));
}
let mut key_slice = [0; 64];
key_slice.clone_from_slice(&key_vec);
Ok(OsBackup {
tor_key: int.tor_key,
tor_key: TorSecretKeyV3::from(key_slice),
root_ca_key: PKey::<Private>::private_key_from_pem(int.root_ca_key.as_bytes())
.map_err(serde::de::Error::custom)?,
root_ca_cert: X509::from_pem(int.root_ca_cert.as_bytes())
@@ -64,14 +78,18 @@ impl Serialize for OsBackup {
S: serde::Serializer,
{
#[derive(Serialize)]
#[serde(rename = "kebab-case")]
struct OsBackupSer<'a> {
tor_key: &'a TorSecretKeyV3,
tor_key: String,
root_ca_key: String,
root_ca_cert: String,
ui: &'a Value,
}
OsBackupSer {
tor_key: &self.tor_key,
tor_key: base32::encode(
base32::Alphabet::RFC4648 { padding: true },
&self.tor_key.as_bytes(),
),
root_ca_key: String::from_utf8(
self.root_ca_key
.private_key_to_pem_pkcs8()
@@ -224,7 +242,9 @@ async fn perform_backup<Db: DbHandle>(
continue;
};
let main_status_model = installed_model.clone().status().main();
let (started, health) = match main_status_model.get(&mut db, true).await?.into_owned() {
let mut tx = db.begin().await?; // for lock scope
let (started, health) = match main_status_model.get(&mut tx, true).await?.into_owned() {
MainStatus::Running { started, health } => (Some(started.clone()), health.clone()),
MainStatus::Stopped | MainStatus::Stopping => (None, Default::default()),
MainStatus::BackingUp { .. } => {
@@ -239,7 +259,6 @@ async fn perform_backup<Db: DbHandle>(
continue;
}
};
let mut tx = db.begin().await?; // for lock scope
main_status_model
.put(
&mut tx,
@@ -251,7 +270,6 @@ async fn perform_backup<Db: DbHandle>(
.await?;
tx.save().await?; // drop locks
installed_model.lock(&mut db, LockType::Write).await;
let manifest = installed_model
.clone()
.manifest()
@@ -266,6 +284,9 @@ async fn perform_backup<Db: DbHandle>(
})?
.synchronize()
.await;
installed_model.lock(&mut db, LockType::Write).await;
let guard = backup_guard.mount_package_backup(&package_id).await?;
let res = manifest
.backup

View File

@@ -22,8 +22,8 @@ use crate::version::{Current, VersionT};
use crate::volume::{backup_dir, Volume, VolumeId, Volumes, BACKUP_DIR};
use crate::{Error, ResultExt};
mod backup_bulk;
mod restore;
pub mod backup_bulk;
pub mod restore;
#[derive(Debug, Deserialize, Serialize)]
pub struct BackupReport {
@@ -47,7 +47,7 @@ pub fn backup() -> Result<(), Error> {
Ok(())
}
#[command(rename = "backup", subcommands(restore::restore_packages))]
#[command(rename = "backup", subcommands(restore::restore_packages_rpc))]
pub fn package_backup() -> Result<(), Error> {
Ok(())
}

View File

@@ -1,25 +1,37 @@
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use clap::ArgMatches;
use color_eyre::eyre::eyre;
use futures::future::BoxFuture;
use futures::FutureExt;
use openssl::x509::X509;
use patch_db::{DbHandle, PatchDbHandle, Revision};
use rpc_toolkit::command;
use tokio::fs::File;
use tokio::task::JoinHandle;
use torut::onion::OnionAddressV3;
use tracing::instrument;
use crate::auth::check_password_against_db;
use crate::context::RpcContext;
use crate::backup::backup_bulk::OsBackup;
use crate::context::{RpcContext, SetupContext};
use crate::db::model::{PackageDataEntry, StaticFiles};
use crate::db::util::WithRevision;
use crate::disk::util::{BackupMountGuard, PackageBackupMountGuard, TmpMountGuard};
use crate::install::progress::{InstallProgress, InstallProgressTracker};
use crate::install::{install_s9pk_or_cleanup, PKG_PUBLIC_DIR};
use crate::s9pk::manifest::PackageId;
use crate::disk::util::{BackupMountGuard, PackageBackupMountGuard, PartitionInfo, TmpMountGuard};
use crate::install::progress::InstallProgress;
use crate::install::{download_install_s9pk, PKG_PUBLIC_DIR};
use crate::net::ssl::SslManager;
use crate::s9pk::manifest::{Manifest, PackageId};
use crate::s9pk::reader::S9pkReader;
use crate::util::{display_none, Version};
use crate::volume::BACKUP_DIR;
use crate::Error;
use crate::setup::RecoveryStatus;
use crate::util::io::dir_size;
use crate::util::{display_none, IoFormat};
use crate::volume::{backup_dir, BACKUP_DIR, PKG_VOLUME_DIR};
use crate::{Error, ResultExt};
fn parse_comma_separated(arg: &str, _: &ArgMatches<'_>) -> Result<Vec<PackageId>, Error> {
arg.split(",")
@@ -29,7 +41,7 @@ fn parse_comma_separated(arg: &str, _: &ArgMatches<'_>) -> Result<Vec<PackageId>
#[command(rename = "restore", display(display_none))]
#[instrument(skip(ctx, old_password, password))]
pub async fn restore_packages(
pub async fn restore_packages_rpc(
#[context] ctx: RpcContext,
#[arg(parse(parse_comma_separated))] ids: Vec<PackageId>,
#[arg] logicalname: PathBuf,
@@ -46,18 +58,9 @@ pub async fn restore_packages(
if old_password.is_some() {
backup_guard.change_password(&password)?;
}
let (revision, guards) = assure_restoring(&ctx, &mut db, ids, &backup_guard).await?;
let mut tasks = Vec::with_capacity(guards.len());
for (id, version, progress, guard) in guards {
let ctx = ctx.clone();
tasks.push(tokio::spawn(async move {
if let Err(e) = restore_package(&ctx, &id, &version, progress, guard).await {
tracing::error!("Error restoring package {}: {}", id, e);
tracing::debug!("{:?}", e);
}
}));
}
let (revision, backup_guard, tasks, _) =
restore_packages(&ctx, &mut db, backup_guard, ids).await?;
tokio::spawn(async {
futures::future::join_all(tasks).await;
@@ -73,6 +76,195 @@ pub async fn restore_packages(
})
}
async fn approximate_progress(
rpc_ctx: &RpcContext,
progress: &mut ProgressInfo,
) -> Result<(), Error> {
for (id, size) in &mut progress.target_volume_size {
let dir = rpc_ctx.datadir.join(PKG_VOLUME_DIR).join(id).join("data");
if tokio::fs::metadata(&dir).await.is_err() {
*size = 0;
} else {
*size = dir_size(&dir).await?;
}
}
Ok(())
}
async fn approximate_progress_loop(
ctx: &SetupContext,
rpc_ctx: &RpcContext,
mut starting_info: ProgressInfo,
) {
loop {
if let Err(e) = approximate_progress(rpc_ctx, &mut starting_info).await {
tracing::error!("Failed to approximate restore progress: {}", e);
tracing::debug!("{:?}", e);
} else {
*ctx.recovery_status.write().await = Some(Ok(starting_info.flatten()));
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
#[derive(Debug, Default)]
struct ProgressInfo {
package_installs: BTreeMap<PackageId, Arc<InstallProgress>>,
src_volume_size: BTreeMap<PackageId, u64>,
target_volume_size: BTreeMap<PackageId, u64>,
}
impl ProgressInfo {
fn flatten(&self) -> RecoveryStatus {
let mut total_bytes = 0;
let mut bytes_transferred = 0;
for (_, progress) in &self.package_installs {
total_bytes += ((progress.size.unwrap_or(0) as f64) * 2.2) as u64;
bytes_transferred += progress.downloaded.load(Ordering::SeqCst);
bytes_transferred += ((progress.validated.load(Ordering::SeqCst) as f64) * 0.2) as u64;
bytes_transferred += progress.unpacked.load(Ordering::SeqCst);
}
for (_, size) in &self.src_volume_size {
total_bytes += *size;
}
for (_, size) in &self.target_volume_size {
bytes_transferred += *size;
}
if bytes_transferred > total_bytes {
bytes_transferred = total_bytes;
}
RecoveryStatus {
total_bytes,
bytes_transferred,
}
}
}
#[instrument(skip(ctx))]
pub async fn recover_full_embassy(
ctx: SetupContext,
disk_guid: Arc<String>,
embassy_password: String,
recovery_partition: PartitionInfo,
recovery_password: Option<String>,
) -> Result<(OnionAddressV3, X509, BoxFuture<'static, Result<(), Error>>), Error> {
let backup_guard = BackupMountGuard::mount(
TmpMountGuard::mount(&recovery_partition.logicalname, None).await?,
recovery_password.as_deref().unwrap_or_default(),
)
.await?;
let os_backup_path = backup_guard.as_ref().join("os-backup.cbor");
let os_backup: OsBackup =
IoFormat::Cbor.from_slice(&tokio::fs::read(&os_backup_path).await.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
os_backup_path.display().to_string(),
)
})?)?;
let password = argon2::hash_encoded(
embassy_password.as_bytes(),
&rand::random::<[u8; 16]>()[..],
&argon2::Config::default(),
)
.with_kind(crate::ErrorKind::PasswordHashGeneration)?;
let key_vec = os_backup.tor_key.as_bytes().to_vec();
let secret_store = ctx.secret_store().await?;
sqlx::query!(
"REPLACE INTO account (id, password, tor_key) VALUES (?, ?, ?)",
0,
password,
key_vec,
)
.execute(&mut secret_store.acquire().await?)
.await?;
SslManager::import_root_ca(
secret_store.clone(),
os_backup.root_ca_key,
os_backup.root_ca_cert.clone(),
)
.await?;
secret_store.close().await;
Ok((
os_backup.tor_key.public().get_onion_address(),
os_backup.root_ca_cert,
async move {
let rpc_ctx = RpcContext::init(ctx.config_path.as_ref(), disk_guid).await?;
let mut db = rpc_ctx.db.handle();
let ids = backup_guard
.metadata
.package_backups
.keys()
.cloned()
.collect();
let (_, backup_guard, tasks, progress_info) = restore_packages(
&rpc_ctx,
&mut db,
backup_guard,
ids,
)
.await?;
tokio::select! {
res = futures::future::join_all(tasks) => res.into_iter().map(|res| res.with_kind(crate::ErrorKind::Unknown).and_then(|a|a)).collect::<Result<(), Error>>()?,
_ = approximate_progress_loop(&ctx, &rpc_ctx, progress_info) => unreachable!(concat!(module_path!(), "::approximate_progress_loop should not terminate")),
}
backup_guard.unmount().await?;
rpc_ctx.shutdown().await
}.boxed()
))
}
async fn restore_packages(
ctx: &RpcContext,
db: &mut PatchDbHandle,
backup_guard: BackupMountGuard<TmpMountGuard>,
ids: Vec<PackageId>,
) -> Result<
(
Option<Arc<Revision>>,
BackupMountGuard<TmpMountGuard>,
Vec<JoinHandle<Result<(), Error>>>,
ProgressInfo,
),
Error,
> {
let (revision, guards) = assure_restoring(&ctx, db, ids, &backup_guard).await?;
let mut progress_info = ProgressInfo::default();
let mut tasks = Vec::with_capacity(guards.len());
for (manifest, guard) in guards {
let id = manifest.id.clone();
let (progress, task) = restore_package(ctx.clone(), manifest, guard).await?;
progress_info.package_installs.insert(id.clone(), progress);
progress_info
.src_volume_size
.insert(id.clone(), dir_size(backup_dir(&id)).await?);
progress_info.target_volume_size.insert(id.clone(), 0);
tasks.push(tokio::spawn(async move {
if let Err(e) = task.await {
tracing::error!("Error restoring package {}: {}", id, e);
tracing::debug!("{:?}", e);
Err(e)
} else {
Ok(())
}
}));
}
Ok((revision, backup_guard, tasks, progress_info))
}
#[instrument(skip(ctx, db, backup_guard))]
async fn assure_restoring(
ctx: &RpcContext,
@@ -82,12 +274,7 @@ async fn assure_restoring(
) -> Result<
(
Option<Arc<Revision>>,
Vec<(
PackageId,
Version,
Arc<InstallProgress>,
PackageBackupMountGuard,
)>,
Vec<(Manifest, PackageBackupMountGuard)>,
),
Error,
> {
@@ -116,9 +303,6 @@ async fn assure_restoring(
let manifest = rdr.manifest().await?;
let version = manifest.version.clone();
let progress = InstallProgress::new(Some(tokio::fs::metadata(&s9pk_path).await?.len()));
progress
.download_complete
.store(true, std::sync::atomic::Ordering::SeqCst);
let public_dir_path = ctx
.datadir
@@ -146,41 +330,52 @@ async fn assure_restoring(
*model = Some(PackageDataEntry::Restoring {
install_progress: progress.clone(),
static_files: StaticFiles::local(&id, &version, manifest.assets.icon_type()),
manifest,
manifest: manifest.clone(),
});
model.save(&mut tx).await?;
guards.push((id, version, progress, guard));
guards.push((manifest, guard));
}
Ok((tx.commit(None).await?, guards))
}
#[instrument(skip(ctx, guard))]
async fn restore_package(
ctx: &RpcContext,
id: &PackageId,
version: &Version,
progress: Arc<InstallProgress>,
async fn restore_package<'a>(
ctx: RpcContext,
manifest: Manifest,
guard: PackageBackupMountGuard,
) -> Result<(), Error> {
let s9pk_path = Path::new(BACKUP_DIR).join(id).join(format!("{}.s9pk", id));
let progress_reader =
InstallProgressTracker::new(File::open(&s9pk_path).await?, progress.clone());
let mut s9pk_reader = progress
.track_read_during(
crate::db::DatabaseModel::new()
.package_data()
.idx_model(id)
.and_then(|pde| pde.install_progress()),
&ctx.db,
|| S9pkReader::from_reader(progress_reader, true),
) -> Result<(Arc<InstallProgress>, BoxFuture<'static, Result<(), Error>>), Error> {
let s9pk_path = Path::new(BACKUP_DIR)
.join(&manifest.id)
.join(format!("{}.s9pk", manifest.id));
let len = tokio::fs::metadata(&s9pk_path)
.await
.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
s9pk_path.display().to_string(),
)
})?
.len();
let file = File::open(&s9pk_path).await.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
s9pk_path.display().to_string(),
)
.await?;
})?;
install_s9pk_or_cleanup(ctx, id, version, &mut s9pk_reader, progress).await?;
let progress = InstallProgress::new(Some(len));
guard.unmount().await?;
Ok((
progress.clone(),
async move {
download_install_s9pk(&ctx, &manifest, progress, file).await?;
Ok(())
guard.unmount().await?;
Ok(())
}
.boxed(),
))
}

View File

@@ -3,9 +3,7 @@ use std::sync::Arc;
use embassy::context::rpc::RpcContextConfig;
use embassy::context::{DiagnosticContext, SetupContext};
use embassy::db::model::ServerStatus;
use embassy::disk::main::DEFAULT_PASSWORD;
use embassy::install::PKG_DOCKER_DIR;
use embassy::middleware::cors::cors;
use embassy::middleware::diagnostic::diagnostic;
use embassy::middleware::encrypt::encrypt;
@@ -26,8 +24,7 @@ fn status_fn(_: i32) -> StatusCode {
}
#[instrument]
async fn init(cfg_path: Option<&str>) -> Result<(), Error> {
let cfg = RpcContextConfig::load(cfg_path).await?;
async fn setup_or_init(cfg_path: Option<&str>) -> Result<(), Error> {
embassy::disk::util::mount("LABEL=EMBASSY", "/embassy-os").await?;
if tokio::fs::metadata("/embassy-os/disk.guid").await.is_err() {
#[cfg(feature = "avahi")]
@@ -73,102 +70,20 @@ async fn init(cfg_path: Option<&str>) -> Result<(), Error> {
})
.await
.with_kind(embassy::ErrorKind::Network)?;
drop(ctx);
embassy::disk::main::export(
} else {
let cfg = RpcContextConfig::load(cfg_path).await?;
embassy::disk::main::import(
tokio::fs::read_to_string("/embassy-os/disk.guid") // unique identifier for volume group - keeps track of the disk that goes with your embassy
.await?
.trim(),
cfg.datadir(),
DEFAULT_PASSWORD,
)
.await?;
tracing::info!("Loaded Disk");
embassy::init::init(&cfg).await?;
}
embassy::disk::main::import(
tokio::fs::read_to_string("/embassy-os/disk.guid") // unique identifier for volume group - keeps track of the disk that goes with your embassy
.await?
.trim(),
cfg.datadir(),
DEFAULT_PASSWORD,
)
.await?;
tracing::info!("Loaded Disk");
let secret_store = cfg.secret_store().await?;
let log_dir = cfg.datadir().join("main").join("logs");
if tokio::fs::metadata(&log_dir).await.is_err() {
tokio::fs::create_dir_all(&log_dir).await?;
}
embassy::disk::util::bind(&log_dir, "/var/log/journal", false).await?;
Command::new("systemctl")
.arg("restart")
.arg("systemd-journald")
.invoke(embassy::ErrorKind::Journald)
.await?;
tracing::info!("Mounted Logs");
let tmp_dir = cfg.datadir().join("package-data/tmp");
if tokio::fs::metadata(&tmp_dir).await.is_err() {
tokio::fs::create_dir_all(&tmp_dir).await?;
}
let tmp_docker = cfg.datadir().join("package-data/tmp/docker");
if tokio::fs::metadata(&tmp_docker).await.is_ok() {
tokio::fs::remove_dir_all(&tmp_docker).await?;
}
Command::new("cp")
.arg("-r")
.arg("/var/lib/docker")
.arg(&tmp_docker)
.invoke(embassy::ErrorKind::Filesystem)
.await?;
Command::new("systemctl")
.arg("stop")
.arg("docker")
.invoke(embassy::ErrorKind::Docker)
.await?;
embassy::disk::util::bind(&tmp_docker, "/var/lib/docker", false).await?;
Command::new("systemctl")
.arg("reset-failed")
.arg("docker")
.invoke(embassy::ErrorKind::Docker)
.await?;
Command::new("systemctl")
.arg("start")
.arg("docker")
.invoke(embassy::ErrorKind::Docker)
.await?;
tracing::info!("Mounted Docker Data");
embassy::install::load_images(cfg.datadir().join(PKG_DOCKER_DIR)).await?;
tracing::info!("Loaded Docker Images");
// Loading system images
embassy::install::load_images("/var/lib/embassy/system-images").await?;
tracing::info!("Loaded System Docker Images");
embassy::ssh::sync_keys_from_db(&secret_store, "/root/.ssh/authorized_keys").await?;
tracing::info!("Synced SSH Keys");
embassy::hostname::sync_hostname().await?;
tracing::info!("Synced Hostname");
embassy::net::wifi::synchronize_wpa_supplicant_conf(&cfg.datadir().join("main")).await?;
tracing::info!("Synchronized wpa_supplicant.conf");
let db = cfg.db(&secret_store).await?;
let mut handle = db.handle();
let mut info = embassy::db::DatabaseModel::new()
.server_info()
.get_mut(&mut handle)
.await?;
match info.status {
ServerStatus::Running | ServerStatus::Updated | ServerStatus::BackingUp => {
info.status = ServerStatus::Running;
}
ServerStatus::Updating => {
info.update_progress = None;
info.status = ServerStatus::Running;
}
}
info.save(&mut handle).await?;
embassy::version::init(&mut handle).await?;
Ok(())
}
@@ -196,8 +111,8 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<Option<Shutdown>, Error> {
run_script_if_exists("/embassy-os/preinit.sh").await;
let res = if let Err(e) = init(cfg_path).await {
(|| async {
let res = if let Err(e) = setup_or_init(cfg_path).await {
async {
tracing::error!("{}", e.source);
tracing::debug!("{}", e.source);
embassy::sound::BEETHOVEN.play().await?;
@@ -259,7 +174,7 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<Option<Shutdown>, Error> {
.await
.with_kind(embassy::ErrorKind::Network)?,
)
})()
}
.await
} else {
Ok(None)

View File

@@ -36,212 +36,217 @@ fn err_to_500(e: Error) -> Response<Body> {
#[instrument]
async fn inner_main(cfg_path: Option<&str>) -> Result<Option<Shutdown>, Error> {
let rpc_ctx = RpcContext::init(
cfg_path,
Arc::new(
tokio::fs::read_to_string("/embassy-os/disk.guid") // unique identifier for volume group - keeps track of the disk that goes with your embassy
.await?
.trim()
.to_owned(),
),
)
.await?;
let mut shutdown_recv = rpc_ctx.shutdown.subscribe();
let sig_handler_ctx = rpc_ctx.clone();
let sig_handler = tokio::spawn(async move {
use tokio::signal::unix::SignalKind;
futures::future::select_all(
[
SignalKind::interrupt(),
SignalKind::quit(),
SignalKind::terminate(),
]
.iter()
.map(|s| {
async move {
signal(*s)
.expect(&format!("register {:?} handler", s))
.recv()
.await
}
.boxed()
}),
let (rpc_ctx, shutdown) = {
let rpc_ctx = RpcContext::init(
cfg_path,
Arc::new(
tokio::fs::read_to_string("/embassy-os/disk.guid") // unique identifier for volume group - keeps track of the disk that goes with your embassy
.await?
.trim()
.to_owned(),
),
)
.await;
sig_handler_ctx
.shutdown
.send(None)
.map_err(|_| ())
.expect("send shutdown signal");
});
tokio::fs::write("/etc/nginx/sites-available/default", {
let info = embassy::db::DatabaseModel::new()
.server_info()
.get(&mut rpc_ctx.db.handle(), true)
.await?;
format!(
include_str!("../nginx/main-ui.conf.template"),
lan_hostname = info.lan_address.host_str().unwrap(),
tor_hostname = info.tor_address.host_str().unwrap()
)
})
.await
.with_ctx(|_| {
(
embassy::ErrorKind::Filesystem,
"/etc/nginx/sites-available/default",
)
})?;
Command::new("systemctl")
.arg("reload")
.arg("nginx")
.invoke(embassy::ErrorKind::Nginx)
.await?;
let mut shutdown_recv = rpc_ctx.shutdown.subscribe();
let auth = auth(rpc_ctx.clone());
let ctx = rpc_ctx.clone();
let server = rpc_server!({
command: embassy::main_api,
context: ctx,
status: status_fn,
middleware: [
cors,
auth,
]
})
.with_graceful_shutdown({
let mut shutdown = rpc_ctx.shutdown.subscribe();
async move {
shutdown.recv().await.expect("context dropped");
}
});
let rev_cache_ctx = rpc_ctx.clone();
let revision_cache_task = tokio::spawn(async move {
let mut sub = rev_cache_ctx.db.subscribe();
let mut shutdown = rev_cache_ctx.shutdown.subscribe();
loop {
let rev = match tokio::select! {
a = sub.recv() => a,
_ = shutdown.recv() => break,
} {
Ok(a) => a,
Err(_) => {
rev_cache_ctx.revision_cache.write().await.truncate(0);
continue;
}
}; // TODO: handle falling behind
let mut cache = rev_cache_ctx.revision_cache.write().await;
cache.push_back(rev);
if cache.len() > rev_cache_ctx.revision_cache_size {
cache.pop_front();
}
}
});
let ws_ctx = rpc_ctx.clone();
let ws_server = {
let builder = Server::bind(&ws_ctx.bind_ws);
let make_svc = ::rpc_toolkit::hyper::service::make_service_fn(move |_| {
let ctx = ws_ctx.clone();
async move {
Ok::<_, ::rpc_toolkit::hyper::Error>(::rpc_toolkit::hyper::service::service_fn(
move |req| {
let ctx = ctx.clone();
async move {
match req.uri().path() {
"/db" => Ok(subscribe(ctx, req).await.unwrap_or_else(err_to_500)),
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty()),
}
}
},
))
}
let sig_handler_ctx = rpc_ctx.clone();
let sig_handler = tokio::spawn(async move {
use tokio::signal::unix::SignalKind;
futures::future::select_all(
[
SignalKind::interrupt(),
SignalKind::quit(),
SignalKind::terminate(),
]
.iter()
.map(|s| {
async move {
signal(*s)
.expect(&format!("register {:?} handler", s))
.recv()
.await
}
.boxed()
}),
)
.await;
sig_handler_ctx
.shutdown
.send(None)
.map_err(|_| ())
.expect("send shutdown signal");
});
builder.serve(make_svc)
}
.with_graceful_shutdown({
let mut shutdown = rpc_ctx.shutdown.subscribe();
async move {
shutdown.recv().await.expect("context dropped");
}
});
let file_server_ctx = rpc_ctx.clone();
let file_server = {
static_server::init(file_server_ctx, {
tokio::fs::write("/etc/nginx/sites-available/default", {
let info = embassy::db::DatabaseModel::new()
.server_info()
.get(&mut rpc_ctx.db.handle(), true)
.await?;
format!(
include_str!("../nginx/main-ui.conf.template"),
lan_hostname = info.lan_address.host_str().unwrap(),
tor_hostname = info.tor_address.host_str().unwrap()
)
})
.await
.with_ctx(|_| {
(
embassy::ErrorKind::Filesystem,
"/etc/nginx/sites-available/default",
)
})?;
Command::new("systemctl")
.arg("reload")
.arg("nginx")
.invoke(embassy::ErrorKind::Nginx)
.await?;
let auth = auth(rpc_ctx.clone());
let ctx = rpc_ctx.clone();
let server = rpc_server!({
command: embassy::main_api,
context: ctx,
status: status_fn,
middleware: [
cors,
auth,
]
})
.with_graceful_shutdown({
let mut shutdown = rpc_ctx.shutdown.subscribe();
async move {
shutdown.recv().await.expect("context dropped");
}
})
});
let rev_cache_ctx = rpc_ctx.clone();
let revision_cache_task = tokio::spawn(async move {
let mut sub = rev_cache_ctx.db.subscribe();
let mut shutdown = rev_cache_ctx.shutdown.subscribe();
loop {
let rev = match tokio::select! {
a = sub.recv() => a,
_ = shutdown.recv() => break,
} {
Ok(a) => a,
Err(_) => {
rev_cache_ctx.revision_cache.write().await.truncate(0);
continue;
}
}; // TODO: handle falling behind
let mut cache = rev_cache_ctx.revision_cache.write().await;
cache.push_back(rev);
if cache.len() > rev_cache_ctx.revision_cache_size {
cache.pop_front();
}
}
});
let ws_ctx = rpc_ctx.clone();
let ws_server = {
let builder = Server::bind(&ws_ctx.bind_ws);
let make_svc = ::rpc_toolkit::hyper::service::make_service_fn(move |_| {
let ctx = ws_ctx.clone();
async move {
Ok::<_, ::rpc_toolkit::hyper::Error>(::rpc_toolkit::hyper::service::service_fn(
move |req| {
let ctx = ctx.clone();
async move {
match req.uri().path() {
"/db" => {
Ok(subscribe(ctx, req).await.unwrap_or_else(err_to_500))
}
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty()),
}
}
},
))
}
});
builder.serve(make_svc)
}
.with_graceful_shutdown({
let mut shutdown = rpc_ctx.shutdown.subscribe();
async move {
shutdown.recv().await.expect("context dropped");
}
});
let file_server_ctx = rpc_ctx.clone();
let file_server = {
static_server::init(file_server_ctx, {
let mut shutdown = rpc_ctx.shutdown.subscribe();
async move {
shutdown.recv().await.expect("context dropped");
}
})
};
let tor_health_ctx = rpc_ctx.clone();
let tor_client = Client::builder()
.proxy(
Proxy::http(format!(
"socks5h://{}:{}",
rpc_ctx.tor_socks.ip(),
rpc_ctx.tor_socks.port()
))
.with_kind(crate::ErrorKind::Network)?,
)
.build()
.with_kind(crate::ErrorKind::Network)?;
let tor_health_daemon = daemon(
move || {
let ctx = tor_health_ctx.clone();
let client = tor_client.clone();
async move { tor_health_check(&client, &ctx.net_controller.tor).await }
},
Duration::from_secs(300),
rpc_ctx.shutdown.subscribe(),
);
embassy::sound::MARIO_COIN.play().await?;
futures::try_join!(
server
.map_err(|e| Error::new(e, ErrorKind::Network))
.map_ok(|_| tracing::debug!("RPC Server Shutdown")),
revision_cache_task
.map_err(|e| Error::new(
eyre!("{}", e).wrap_err("Revision Cache daemon panicked!"),
ErrorKind::Unknown
))
.map_ok(|_| tracing::debug!("Revision Cache Shutdown")),
ws_server
.map_err(|e| Error::new(e, ErrorKind::Network))
.map_ok(|_| tracing::debug!("WebSocket Server Shutdown")),
file_server
.map_err(|e| Error::new(e, ErrorKind::Network))
.map_ok(|_| tracing::debug!("Static File Server Shutdown")),
tor_health_daemon
.map_err(|e| Error::new(
e.wrap_err("Tor Health Daemon panicked!"),
ErrorKind::Unknown
))
.map_ok(|_| tracing::debug!("Tor Health Daemon Shutdown")),
)?;
let mut shutdown = shutdown_recv
.recv()
.await
.with_kind(crate::ErrorKind::Unknown)?;
sig_handler.abort();
if let Some(shutdown) = &mut shutdown {
drop(shutdown.db_handle.take());
}
(rpc_ctx, shutdown)
};
let tor_health_ctx = rpc_ctx.clone();
let tor_client = Client::builder()
.proxy(
Proxy::http(format!(
"socks5h://{}:{}",
rpc_ctx.tor_socks.ip(),
rpc_ctx.tor_socks.port()
))
.with_kind(crate::ErrorKind::Network)?,
)
.build()
.with_kind(crate::ErrorKind::Network)?;
let tor_health_daemon = daemon(
move || {
let ctx = tor_health_ctx.clone();
let client = tor_client.clone();
async move { tor_health_check(&client, &ctx.net_controller.tor).await }
},
Duration::from_secs(300),
rpc_ctx.shutdown.subscribe(),
);
embassy::sound::MARIO_COIN.play().await?;
futures::try_join!(
server
.map_err(|e| Error::new(e, ErrorKind::Network))
.map_ok(|_| tracing::debug!("RPC Server Shutdown")),
revision_cache_task
.map_err(|e| Error::new(
eyre!("{}", e).wrap_err("Revision Cache daemon panicked!"),
ErrorKind::Unknown
))
.map_ok(|_| tracing::debug!("Revision Cache Shutdown")),
ws_server
.map_err(|e| Error::new(e, ErrorKind::Network))
.map_ok(|_| tracing::debug!("WebSocket Server Shutdown")),
file_server
.map_err(|e| Error::new(e, ErrorKind::Network))
.map_ok(|_| tracing::debug!("Static File Server Shutdown")),
tor_health_daemon
.map_err(|e| Error::new(
e.wrap_err("Tor Health Daemon panicked!"),
ErrorKind::Unknown
))
.map_ok(|_| tracing::debug!("Tor Health Daemon Shutdown")),
)?;
let mut shutdown = shutdown_recv
.recv()
.await
.with_kind(crate::ErrorKind::Unknown)?;
if let Some(shutdown) = &mut shutdown {
drop(shutdown.db_handle.take());
}
rpc_ctx.managers.empty().await?;
sig_handler.abort();
rpc_ctx.shutdown().await?;
Ok(shutdown)
}

View File

@@ -132,11 +132,14 @@ impl RpcContext {
disk_guid: Arc<String>,
) -> Result<Self, Error> {
let base = RpcContextConfig::load(cfg_path).await?;
let log_epoch = Arc::new(AtomicU64::new(rand::random()));
let logger = EmbassyLogger::init(log_epoch.clone(), base.log_server.clone(), false);
tracing::info!("Loaded Config");
let logger = EmbassyLogger::init(base.log_server.clone(), false);
tracing::info!("Set Logger");
let (shutdown, _) = tokio::sync::broadcast::channel(1);
let secret_store = base.secret_store().await?;
tracing::info!("Opened Sqlite DB");
let db = base.db(&secret_store).await?;
tracing::info!("Opened PatchDB");
let share = crate::db::DatabaseModel::new()
.server_info()
.share_stats()
@@ -144,6 +147,7 @@ impl RpcContext {
.await?;
logger.set_sharing(*share);
let docker = Docker::connect_with_unix_defaults()?;
tracing::info!("Connected to Docker");
let net_controller = NetController::init(
([127, 0, 0, 1], 80).into(),
crate::net::tor::os_key(&mut secret_store.acquire().await?).await?,
@@ -153,9 +157,11 @@ impl RpcContext {
None,
)
.await?;
tracing::info!("Initialized Net Controller");
let managers = ManagerMap::default();
let metrics_cache = RwLock::new(None);
let notification_manager = NotificationManager::new(secret_store.clone());
tracing::info!("Initialized Notification Manager");
let seed = Arc::new(RpcContextSeed {
bind_rpc: base.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()),
bind_ws: base.bind_ws.unwrap_or(([127, 0, 0, 1], 5960).into()),
@@ -172,8 +178,8 @@ impl RpcContext {
metrics_cache,
shutdown,
websocket_count: AtomicUsize::new(0),
log_epoch: logger.epoch(),
logger,
log_epoch,
tor_socks: base.tor_socks.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
9050,
@@ -188,6 +194,7 @@ impl RpcContext {
.await
});
let res = Self(seed);
tracing::info!("Initialized Package Managers");
res.managers
.init(
&res,
@@ -223,6 +230,23 @@ impl RpcContext {
.await?
.to_owned())
}
#[instrument(skip(self))]
pub async fn shutdown(self) -> Result<(), Error> {
self.managers.empty().await?;
match Arc::try_unwrap(self.0) {
Ok(seed) => {
let RpcContextSeed { secret_store, .. } = seed;
secret_store.close().await;
}
Err(ctx) => {
tracing::warn!(
"{} RPC Context(s) are still being held somewhere. This is likely a mistake.",
Arc::strong_count(&ctx) - 1
);
}
}
Ok(())
}
}
impl Context for RpcContext {
fn host(&self) -> Host<&str> {

View File

@@ -55,6 +55,7 @@ impl SetupContextConfig {
}
pub struct SetupContextSeed {
pub config_path: Option<PathBuf>,
pub bind_rpc: SocketAddr,
pub shutdown: Sender<()>,
pub datadir: PathBuf,
@@ -68,10 +69,11 @@ pub struct SetupContext(Arc<SetupContextSeed>);
impl SetupContext {
#[instrument(skip(path))]
pub async fn init<P: AsRef<Path>>(path: Option<P>) -> Result<Self, Error> {
let cfg = SetupContextConfig::load(path).await?;
let cfg = SetupContextConfig::load(path.as_ref()).await?;
let (shutdown, _) = tokio::sync::broadcast::channel(1);
let datadir = cfg.datadir().to_owned();
Ok(Self(Arc::new(SetupContextSeed {
config_path: path.as_ref().map(|p| p.as_ref().to_owned()),
bind_rpc: cfg.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()),
shutdown,
datadir,

View File

@@ -106,6 +106,7 @@ pub async fn create_fs<P: AsRef<Path>>(
.invoke(crate::ErrorKind::DiskManagement)
.await?;
Command::new("cryptsetup")
.arg("-q")
.arg("luksFormat")
.arg(format!("--key-file={}", PASSWORD_PATH))
.arg(format!("--keyfile-size={}", password.len()))
@@ -113,6 +114,7 @@ pub async fn create_fs<P: AsRef<Path>>(
.invoke(crate::ErrorKind::DiskManagement)
.await?;
Command::new("cryptsetup")
.arg("-q")
.arg("luksOpen")
.arg(format!("--key-file={}", PASSWORD_PATH))
.arg(format!("--keyfile-size={}", password.len()))
@@ -183,6 +185,7 @@ pub async fn unmount_fs<P: AsRef<Path>>(
unmount(datadir.as_ref().join(name)).await?;
}
Command::new("cryptsetup")
.arg("-q")
.arg("luksClose")
.arg(format!("{}_{}", guid, name))
.invoke(crate::ErrorKind::DiskManagement)
@@ -196,6 +199,10 @@ pub async fn unmount_all_fs<P: AsRef<Path>>(guid: &str, datadir: P) -> Result<()
unmount_fs(guid, &datadir, "main", false).await?;
unmount_fs(guid, &datadir, "swap", true).await?;
unmount_fs(guid, &datadir, "package-data", false).await?;
Command::new("dmsetup")
.arg("remove_all") // TODO: find a higher finesse way to do this for portability reasons
.invoke(crate::ErrorKind::DiskManagement)
.await?;
Ok(())
}
@@ -251,6 +258,7 @@ pub async fn mount_fs<P: AsRef<Path>>(
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, PASSWORD_PATH))?;
Command::new("cryptsetup")
.arg("-q")
.arg("luksOpen")
.arg(format!("--key-file={}", PASSWORD_PATH))
.arg(format!("--keyfile-size={}", password.len()))

View File

@@ -191,6 +191,7 @@ pub async fn get_percentage<P: AsRef<Path>>(path: P) -> Result<u64, Error> {
.parse::<u64>()?)
}
#[instrument]
pub async fn pvscan() -> Result<BTreeMap<PathBuf, Option<String>>, Error> {
let pvscan_out = Command::new("pvscan")
.invoke(crate::ErrorKind::DiskManagement)

88
appmgr/src/init.rs Normal file
View File

@@ -0,0 +1,88 @@
use tokio::process::Command;
use crate::context::rpc::RpcContextConfig;
use crate::db::model::ServerStatus;
use crate::install::PKG_DOCKER_DIR;
use crate::util::Invoke;
use crate::Error;
pub async fn init(cfg: &RpcContextConfig) -> Result<(), Error> {
let secret_store = cfg.secret_store().await?;
let log_dir = cfg.datadir().join("main").join("logs");
if tokio::fs::metadata(&log_dir).await.is_err() {
tokio::fs::create_dir_all(&log_dir).await?;
}
crate::disk::util::bind(&log_dir, "/var/log/journal", false).await?;
Command::new("systemctl")
.arg("restart")
.arg("systemd-journald")
.invoke(crate::ErrorKind::Journald)
.await?;
tracing::info!("Mounted Logs");
let tmp_dir = cfg.datadir().join("package-data/tmp");
if tokio::fs::metadata(&tmp_dir).await.is_err() {
tokio::fs::create_dir_all(&tmp_dir).await?;
}
let tmp_docker = cfg.datadir().join("package-data/tmp/docker");
if tokio::fs::metadata(&tmp_docker).await.is_ok() {
tokio::fs::remove_dir_all(&tmp_docker).await?;
}
Command::new("cp")
.arg("-r")
.arg("/var/lib/docker")
.arg(&tmp_docker)
.invoke(crate::ErrorKind::Filesystem)
.await?;
Command::new("systemctl")
.arg("stop")
.arg("docker")
.invoke(crate::ErrorKind::Docker)
.await?;
crate::disk::util::bind(&tmp_docker, "/var/lib/docker", false).await?;
Command::new("systemctl")
.arg("reset-failed")
.arg("docker")
.invoke(crate::ErrorKind::Docker)
.await?;
Command::new("systemctl")
.arg("start")
.arg("docker")
.invoke(crate::ErrorKind::Docker)
.await?;
tracing::info!("Mounted Docker Data");
crate::install::load_images(cfg.datadir().join(PKG_DOCKER_DIR)).await?;
tracing::info!("Loaded Docker Images");
// Loading system images
crate::install::load_images("/var/lib/embassy/system-images").await?;
tracing::info!("Loaded System Docker Images");
crate::ssh::sync_keys_from_db(&secret_store, "/root/.ssh/authorized_keys").await?;
tracing::info!("Synced SSH Keys");
crate::hostname::sync_hostname().await?;
tracing::info!("Synced Hostname");
crate::net::wifi::synchronize_wpa_supplicant_conf(&cfg.datadir().join("main")).await?;
tracing::info!("Synchronized wpa_supplicant.conf");
let db = cfg.db(&secret_store).await?;
let mut handle = db.handle();
let mut info = crate::db::DatabaseModel::new()
.server_info()
.get_mut(&mut handle)
.await?;
match info.status {
ServerStatus::Running | ServerStatus::Updated | ServerStatus::BackingUp => {
info.status = ServerStatus::Running;
}
ServerStatus::Updating => {
info.update_progress = None;
info.status = ServerStatus::Running;
}
}
info.save(&mut handle).await?;
crate::version::init(&mut handle).await?;
Ok(())
}

View File

@@ -11,7 +11,6 @@ use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
use http::StatusCode;
use patch_db::{DbHandle, LockType};
use reqwest::Response;
use rpc_toolkit::command;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt};
@@ -137,7 +136,25 @@ pub async fn install(
tokio::spawn(async move {
let mut db_handle = ctx.db.handle();
if let Err(e) = download_install_s9pk(&ctx, &man, s9pk).await {
if let Err(e) = download_install_s9pk(
&ctx,
&man,
InstallProgress::new(s9pk.content_length()),
tokio_util::io::StreamReader::new(s9pk.bytes_stream().map_err(|e| {
std::io::Error::new(
if e.is_connect() {
std::io::ErrorKind::ConnectionRefused
} else if e.is_timeout() {
std::io::ErrorKind::TimedOut
} else {
std::io::ErrorKind::Other
},
e,
)
})),
)
.await
{
let err_str = format!("Install of {}@{} Failed: {}", man.id, man.version, e);
tracing::error!("{}", err_str);
tracing::debug!("{:?}", e);
@@ -250,57 +267,42 @@ pub async fn uninstall_impl(ctx: RpcContext, id: PackageId) -> Result<WithRevisi
})
}
#[instrument(skip(ctx, temp_manifest))]
#[instrument(skip(ctx, temp_manifest, s9pk))]
pub async fn download_install_s9pk(
ctx: &RpcContext,
temp_manifest: &Manifest,
s9pk: Response,
progress: Arc<InstallProgress>,
mut s9pk: impl AsyncRead + Unpin,
) -> Result<(), Error> {
let pkg_id = &temp_manifest.id;
let version = &temp_manifest.version;
let pkg_cache_dir = ctx
let pkg_archive_dir = ctx
.datadir
.join(PKG_ARCHIVE_DIR)
.join(pkg_id)
.join(version.as_str());
tokio::fs::create_dir_all(&pkg_cache_dir).await?;
let pkg_cache = pkg_cache_dir.join(AsRef::<Path>::as_ref(pkg_id).with_extension("s9pk"));
tokio::fs::create_dir_all(&pkg_archive_dir).await?;
let pkg_archive = pkg_archive_dir.join(AsRef::<Path>::as_ref(pkg_id).with_extension("s9pk"));
let pkg_data_entry = crate::db::DatabaseModel::new()
.package_data()
.idx_model(pkg_id);
let progress = InstallProgress::new(s9pk.content_length());
let progress_model = pkg_data_entry.and_then(|pde| pde.install_progress());
File::delete(&pkg_cache).await?;
File::delete(&pkg_archive).await?;
let mut dst = OpenOptions::new()
.create(true)
.write(true)
.read(true)
.open(&pkg_cache)
.open(&pkg_archive)
.await?;
progress
.track_download_during(progress_model.clone(), &ctx.db, || async {
let mut progress_writer = InstallProgressTracker::new(&mut dst, progress.clone());
tokio::io::copy(
&mut tokio_util::io::StreamReader::new(s9pk.bytes_stream().map_err(|e| {
std::io::Error::new(
if e.is_connect() {
std::io::ErrorKind::ConnectionRefused
} else if e.is_timeout() {
std::io::ErrorKind::TimedOut
} else {
std::io::ErrorKind::Other
},
e,
)
})),
&mut progress_writer,
)
.await?;
tokio::io::copy(&mut s9pk, &mut progress_writer).await?;
progress.download_complete();
Ok(())
})

View File

@@ -16,6 +16,7 @@ pub mod disk;
pub mod error;
pub mod hostname;
pub mod id;
pub mod init;
pub mod inspect;
pub mod install;
pub mod logs;
@@ -42,7 +43,6 @@ pub use config::Config;
pub use error::{Error, ErrorKind, ResultExt};
use rpc_toolkit::command;
use rpc_toolkit::yajrc::RpcError;
pub use version::init;
#[command(metadata(authenticated = false))]
pub fn echo(#[arg] message: String) -> Result<String, RpcError> {

View File

@@ -110,6 +110,14 @@ impl ManagerMap {
|((id, version), man)| async move {
man.exit().await?;
tracing::debug!("Manager for {}@{} shutdown", id, version);
if let Err(e) = Arc::try_unwrap(man) {
tracing::trace!(
"Manager for {}@{} still has {} other open references",
id,
version,
Arc::strong_count(&e) - 1
);
}
Ok::<_, Error>(())
},
))

View File

@@ -69,6 +69,7 @@ pub async fn synchronizer(shared: &ManagerSharedState) {
);
tracing::debug!("{:?}", e);
} else {
tracing::trace!("{} status synchronized", shared.manifest.id);
shared.synchronized.notify_waiters();
}
tokio::select! {

View File

@@ -1,6 +1,3 @@
use crate::context::RpcContext;
use crate::{Error, ResultExt};
use basic_cookies::Cookie;
use color_eyre::eyre::eyre;
use digest::Digest;
@@ -16,6 +13,9 @@ use rpc_toolkit::yajrc::RpcMethod;
use rpc_toolkit::Metadata;
use serde::{Deserialize, Serialize};
use sha2::Sha256;
use crate::context::RpcContext;
use crate::{Error, ResultExt};
pub trait AsLogoutSessionId {
fn as_logout_session_id(self) -> String;
}

View File

@@ -1,11 +1,12 @@
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use color_eyre::eyre::eyre;
use futures::future::BoxFuture;
use futures::{FutureExt, TryStreamExt};
use futures::{FutureExt, TryFutureExt, TryStreamExt};
use openssl::x509::X509;
use rpc_toolkit::command;
use rpc_toolkit::yajrc::RpcError;
@@ -15,19 +16,22 @@ use tokio::io::AsyncWriteExt;
use torut::onion::{OnionAddressV3, TorSecretKeyV3};
use tracing::instrument;
use crate::backup::restore::recover_full_embassy;
use crate::context::rpc::RpcContextConfig;
use crate::context::SetupContext;
use crate::db::model::RecoveredPackageInfo;
use crate::disk::main::DEFAULT_PASSWORD;
use crate::disk::util::{pvscan, DiskInfo, PartitionInfo, TmpMountGuard};
use crate::id::Id;
use crate::init::init;
use crate::install::PKG_PUBLIC_DIR;
use crate::net::ssl::SslManager;
use crate::s9pk::manifest::PackageId;
use crate::sound::BEETHOVEN;
use crate::util::io::from_yaml_async_reader;
use crate::util::io::{dir_size, from_yaml_async_reader};
use crate::util::Version;
use crate::volume::{data_dir, VolumeId};
use crate::{Error, ResultExt};
use crate::{ensure_code, Error, ResultExt};
#[command(subcommands(status, disk, execute, recovery))]
pub fn setup() -> Result<(), Error> {
@@ -69,8 +73,8 @@ pub fn recovery() -> Result<(), Error> {
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct RecoveryStatus {
bytes_transferred: u64,
total_bytes: u64,
pub bytes_transferred: u64,
pub total_bytes: u64,
}
#[command(rename = "status", rpc_only, metadata(authenticated = false))]
@@ -122,7 +126,7 @@ pub async fn execute(
}
#[instrument(skip(ctx))]
pub async fn complete_setup(ctx: SetupContext, guid: String) -> Result<(), Error> {
pub async fn complete_setup(ctx: SetupContext, guid: Arc<String>) -> Result<(), Error> {
let mut guid_file = File::create("/embassy-os/disk.guid").await?;
guid_file.write_all(guid.as_bytes()).await?;
guid_file.sync_all().await?;
@@ -130,7 +134,7 @@ pub async fn complete_setup(ctx: SetupContext, guid: String) -> Result<(), Error
Ok(())
}
#[instrument(skip(ctx))]
#[instrument(skip(ctx, embassy_password, recovery_password))]
pub async fn execute_inner(
ctx: SetupContext,
embassy_logicalname: PathBuf,
@@ -144,14 +148,61 @@ pub async fn execute_inner(
crate::ErrorKind::InvalidRequest,
));
}
let guid = crate::disk::main::create(
&[embassy_logicalname],
&pvscan().await?,
&ctx.datadir,
DEFAULT_PASSWORD,
)
.await?;
crate::disk::main::import(&guid, &ctx.datadir, DEFAULT_PASSWORD).await?;
let guid = Arc::new(
crate::disk::main::create(
&[embassy_logicalname],
&pvscan().await?,
&ctx.datadir,
DEFAULT_PASSWORD,
)
.await?,
);
crate::disk::main::import(&*guid, &ctx.datadir, DEFAULT_PASSWORD).await?;
let res = if let Some(recovery_partition) = recovery_partition {
if recovery_partition
.embassy_os
.as_ref()
.map(|v| &*v.version < &emver::Version::new(0, 2, 8, 0))
.unwrap_or(true)
{
return Err(Error::new(eyre!("Unsupported version of EmbassyOS. Please update to at least 0.2.8 before recovering."), crate::ErrorKind::VersionIncompatible));
}
let (tor_addr, root_ca, recover_fut) = recover(
ctx.clone(),
guid.clone(),
embassy_password,
recovery_partition,
recovery_password,
)
.await?;
init(&RpcContextConfig::load(ctx.config_path.as_ref()).await?).await?;
tokio::spawn(async move {
if let Err(e) = recover_fut
.and_then(|_| complete_setup(ctx.clone(), guid))
.await
{
BEETHOVEN.play().await.unwrap_or_default(); // ignore error in playing the song
tracing::error!("Error recovering drive!: {}", e);
tracing::debug!("{:?}", e);
*ctx.recovery_status.write().await = Some(Err(e.into()));
}
});
(tor_addr, root_ca)
} else {
let res = fresh_setup(&ctx, &embassy_password).await?;
init(&RpcContextConfig::load(ctx.config_path.as_ref()).await?).await?;
complete_setup(ctx, guid).await?;
res
};
Ok(res)
}
async fn fresh_setup(
ctx: &SetupContext,
embassy_password: &str,
) -> Result<(OnionAddressV3, X509), Error> {
let password = argon2::hash_encoded(
embassy_password.as_bytes(),
&rand::random::<[u8; 16]>()[..],
@@ -162,7 +213,7 @@ pub async fn execute_inner(
let key_vec = tor_key.as_bytes().to_vec();
let sqlite_pool = ctx.secret_store().await?;
sqlx::query!(
"INSERT OR REPLACE INTO account (id, password, tor_key) VALUES (?, ?, ?)",
"REPLACE INTO account (id, password, tor_key) VALUES (?, ?, ?)",
0,
password,
key_vec,
@@ -174,76 +225,46 @@ pub async fn execute_inner(
.export_root_ca()
.await?;
sqlite_pool.close().await;
if let Some(recovery_partition) = recovery_partition {
if recovery_partition
.embassy_os
.as_ref()
.map(|v| &*v.version < &emver::Version::new(0, 2, 8, 0))
.unwrap_or(true)
{
return Err(Error::new(eyre!("Unsupported version of EmbassyOS. Please update to at least 0.2.8 before recovering."), crate::ErrorKind::VersionIncompatible));
}
tokio::spawn(async move {
if let Err(e) = recover(ctx.clone(), guid, recovery_partition, recovery_password).await
{
BEETHOVEN.play().await.unwrap_or_default(); // ignore error in playing the song
tracing::error!("Error recovering drive!: {}", e);
tracing::debug!("{:?}", e);
*ctx.recovery_status.write().await = Some(Err(e.into()));
}
});
} else {
complete_setup(ctx, guid).await?;
}
Ok((tor_key.public().get_onion_address(), root_ca))
}
#[instrument(skip(ctx))]
#[instrument(skip(ctx, embassy_password, recovery_password))]
async fn recover(
ctx: SetupContext,
guid: String,
guid: Arc<String>,
embassy_password: String,
recovery_partition: PartitionInfo,
recovery_password: Option<String>,
) -> Result<(), Error> {
) -> Result<(OnionAddressV3, X509, BoxFuture<'static, Result<(), Error>>), Error> {
let recovery_version = recovery_partition
.embassy_os
.as_ref()
.map(|i| i.version.clone())
.unwrap_or_default();
if recovery_version.major() == 0 && recovery_version.minor() == 2 {
recover_v2(&ctx, recovery_partition).await?;
let res = if recovery_version.major() == 0 && recovery_version.minor() == 2 {
let (tor_addr, root_ca) = fresh_setup(&ctx, &embassy_password).await?;
(
tor_addr,
root_ca,
recover_v2(ctx.clone(), recovery_partition).boxed(),
)
} else if recovery_version.major() == 0 && recovery_version.minor() == 3 {
recover_v3(&ctx, recovery_partition, recovery_password).await?;
recover_full_embassy(
ctx.clone(),
guid.clone(),
embassy_password,
recovery_partition,
recovery_password,
)
.await?
} else {
return Err(Error::new(
eyre!("Unsupported version of EmbassyOS: {}", recovery_version),
crate::ErrorKind::VersionIncompatible,
));
}
};
complete_setup(ctx, guid).await
}
fn dir_size<'a, P: AsRef<Path> + 'a + Send + Sync>(
path: P,
res: &'a AtomicU64,
) -> BoxFuture<'a, Result<(), std::io::Error>> {
async move {
tokio_stream::wrappers::ReadDirStream::new(tokio::fs::read_dir(path.as_ref()).await?)
.try_for_each(|e| async move {
let m = e.metadata().await?;
if m.is_file() {
res.fetch_add(m.len(), Ordering::Relaxed);
} else if m.is_dir() {
dir_size(e.path(), res).await?;
}
Ok(())
})
.await
}
.boxed()
Ok(res)
}
fn dir_copy<'a, P0: AsRef<Path> + 'a + Send + Sync, P1: AsRef<Path> + 'a + Send + Sync>(
@@ -300,15 +321,7 @@ fn dir_copy<'a, P0: AsRef<Path> + 'a + Send + Sync, P1: AsRef<Path> + 'a + Send
format!("cp -P {} -> {}", src_path.display(), dst_path.display()),
)
})?;
// Removed (see https://unix.stackexchange.com/questions/87200/change-permissions-for-a-symbolic-link):
// tokio::fs::set_permissions(&dst_path, m.permissions())
// .await
// .with_ctx(|_| {
// (
// crate::ErrorKind::Filesystem,
// format!("chmod {}", dst_path.display()),
// )
// })?;
// Do not set permissions (see https://unix.stackexchange.com/questions/87200/change-permissions-for-a-symbolic-link)
}
Ok(())
})
@@ -319,7 +332,7 @@ fn dir_copy<'a, P0: AsRef<Path> + 'a + Send + Sync, P1: AsRef<Path> + 'a + Send
}
#[instrument(skip(ctx))]
async fn recover_v2(ctx: &SetupContext, recovery_partition: PartitionInfo) -> Result<(), Error> {
async fn recover_v2(ctx: SetupContext, recovery_partition: PartitionInfo) -> Result<(), Error> {
let recovery = TmpMountGuard::mount(&recovery_partition.logicalname, None).await?;
let secret_store = ctx.secret_store().await?;
@@ -346,19 +359,16 @@ async fn recover_v2(ctx: &SetupContext, recovery_partition: PartitionInfo) -> Re
.await?;
let volume_path = recovery.as_ref().join("root/volumes");
let total_bytes = AtomicU64::new(0);
let mut total_bytes = 0;
for (pkg_id, _) in &packages {
let volume_src_path = volume_path.join(&pkg_id);
dir_size(&volume_src_path, &total_bytes)
.await
.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
volume_src_path.display().to_string(),
)
})?;
total_bytes += dir_size(&volume_src_path).await.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
volume_src_path.display().to_string(),
)
})?;
}
let total_bytes = total_bytes.load(Ordering::SeqCst);
*ctx.recovery_status.write().await = Some(Ok(RecoveryStatus {
bytes_transferred: 0,
total_bytes,
@@ -392,6 +402,31 @@ async fn recover_v2(ctx: &SetupContext, recovery_partition: PartitionInfo) -> Re
}
} => (),
);
let tor_src_path = recovery
.as_ref()
.join("var/lib/tor")
.join(format!("app-{}", pkg_id))
.join("hs_ed25519_secret_key");
let key_vec = tokio::fs::read(&tor_src_path).await.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
tor_src_path.display().to_string(),
)
})?;
ensure_code!(
key_vec.len() == 96,
crate::ErrorKind::Tor,
"{} not 96 bytes",
tor_src_path.display()
);
let key_vec = key_vec[32..].to_vec();
sqlx::query!(
"REPLACE INTO tor (package, interface, key) VALUES (?, 'main', ?)",
*pkg_id,
key_vec,
)
.execute(&mut secret_store.acquire().await?)
.await?;
let icon_leaf = AsRef::<Path>::as_ref(&pkg_id)
.join(info.version.as_str())
.join("icon.png");
@@ -399,7 +434,6 @@ async fn recover_v2(ctx: &SetupContext, recovery_partition: PartitionInfo) -> Re
.as_ref()
.join("root/agent/icons")
.join(format!("{}.png", pkg_id));
// TODO: tor address
let icon_dst_path = ctx.datadir.join(PKG_PUBLIC_DIR).join(&icon_leaf);
if let Some(parent) = icon_dst_path.parent() {
tokio::fs::create_dir_all(&parent)
@@ -433,15 +467,7 @@ async fn recover_v2(ctx: &SetupContext, recovery_partition: PartitionInfo) -> Re
.await?;
}
secret_store.close().await;
recovery.unmount().await?;
Ok(())
}
#[instrument(skip(ctx))]
async fn recover_v3(
ctx: &SetupContext,
recovery_partition: PartitionInfo,
recovery_password: Option<String>,
) -> Result<(), Error> {
todo!()
}

View File

@@ -22,42 +22,43 @@ impl Shutdown {
pub fn execute(&self) {
use std::process::Command;
tokio::runtime::Builder::new_current_thread()
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
use tokio::process::Command;
.unwrap();
rt.block_on(async {
use tokio::process::Command;
if let Err(e) = Command::new("systemctl")
.arg("stop")
.arg("systemd-journald")
.invoke(crate::ErrorKind::Journald)
.await
{
tracing::error!("Error Stopping Journald: {}", e);
if let Err(e) = Command::new("systemctl")
.arg("stop")
.arg("systemd-journald")
.invoke(crate::ErrorKind::Journald)
.await
{
tracing::error!("Error Stopping Journald: {}", e);
tracing::debug!("{:?}", e);
}
if let Err(e) = Command::new("systemctl")
.arg("stop")
.arg("docker")
.invoke(crate::ErrorKind::Docker)
.await
{
tracing::error!("Error Stopping Docker: {}", e);
tracing::debug!("{:?}", e);
}
if let Some(guid) = &self.disk_guid {
if let Err(e) = export(guid, &self.datadir).await {
tracing::error!("Error Exporting Volume Group: {}", e);
tracing::debug!("{:?}", e);
}
if let Err(e) = Command::new("systemctl")
.arg("stop")
.arg("docker")
.invoke(crate::ErrorKind::Docker)
.await
{
tracing::error!("Error Stopping Docker: {}", e);
tracing::debug!("{:?}", e);
}
if let Some(guid) = &self.disk_guid {
if let Err(e) = export(guid, &self.datadir).await {
tracing::error!("Error Exporting Volume Group: {}", e);
tracing::debug!("{:?}", e);
}
}
if let Err(e) = MARIO_DEATH.play().await {
tracing::error!("Error Playing Shutdown Song: {}", e);
tracing::debug!("{:?}", e);
}
});
}
if let Err(e) = MARIO_DEATH.play().await {
tracing::error!("Error Playing Shutdown Song: {}", e);
tracing::debug!("{:?}", e);
}
});
drop(rt);
if self.restart {
Command::new("reboot").spawn().unwrap().wait().unwrap();
} else {

View File

@@ -93,6 +93,13 @@ impl SoundInterface {
.await
.with_ctx(|_| (ErrorKind::SoundError, SWITCH_FILE.to_string_lossy()))
}
#[instrument(skip(self))]
pub async fn close(mut self) -> Result<(), Error> {
if let Some(lock) = self.0.take() {
lock.unlock().await?;
}
Ok(())
}
}
pub struct Song<Notes> {
@@ -114,6 +121,7 @@ where
Some(n) => sound.play_for_time_slice(self.tempo_qpm, n, slice).await?,
};
}
sound.close().await?;
}
Ok(())
}

View File

@@ -1,7 +1,8 @@
use std::collections::HashMap;
use emver::Version;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
#[serde_as]
#[derive(Debug, Deserialize, Serialize)]

View File

@@ -1,3 +1,7 @@
use std::path::Path;
use futures::future::BoxFuture;
use futures::{FutureExt, TryStreamExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use crate::ResultExt;
@@ -193,3 +197,24 @@ pub async fn copy_and_shutdown<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
w.shutdown().await?;
Ok(())
}
pub fn dir_size<'a, P: AsRef<Path> + 'a + Send + Sync>(
path: P,
) -> BoxFuture<'a, Result<u64, std::io::Error>> {
async move {
tokio_stream::wrappers::ReadDirStream::new(tokio::fs::read_dir(path.as_ref()).await?)
.try_fold(0, |acc, e| async move {
let m = e.metadata().await?;
Ok(acc
+ if m.is_file() {
m.len()
} else if m.is_dir() {
dir_size(e.path()).await?
} else {
0
})
})
.await
}
.boxed()
}

View File

@@ -1,6 +1,7 @@
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use lazy_static::lazy_static;
use reqwest::{Client, Url};
use serde::Serialize;
use tracing::Subscriber;
@@ -51,6 +52,10 @@ impl<S: Subscriber> Layer<S> for SharingLayer {
}
}
lazy_static! {
static ref LOGGER: Mutex<Option<(Arc<AtomicU64>, Arc<AtomicBool>)>> = Mutex::new(None);
}
#[derive(Clone)]
pub struct EmbassyLogger {
log_epoch: Arc<AtomicU64>,
@@ -71,30 +76,39 @@ impl EmbassyLogger {
.with(ErrorLayer::default())
}
pub fn no_sharing() {
use tracing_subscriber::prelude::*;
Self::base_subscriber().init();
color_eyre::install().expect("Color Eyre Init");
Self::init(None, false);
}
pub fn init(log_epoch: Arc<AtomicU64>, share_dest: Option<Url>, share_errors: bool) -> Self {
pub fn init(share_dest: Option<Url>, share_errors: bool) -> Self {
use tracing_subscriber::prelude::*;
let share_dest = match share_dest {
None => "https://beta-registry-0-3.start9labs.com/error-logs".to_owned(), // TODO
Some(a) => a.to_string(),
};
let sharing = Arc::new(AtomicBool::new(share_errors));
let sharing_layer = SharingLayer {
log_epoch: log_epoch.clone(),
share_dest,
sharing: sharing.clone(),
};
let mut guard = LOGGER.lock().unwrap();
let (log_epoch, sharing) = if let Some((log_epoch, sharing)) = guard.take() {
sharing.store(share_errors, Ordering::SeqCst);
(log_epoch, sharing)
} else {
let log_epoch = Arc::new(AtomicU64::new(rand::random()));
let sharing = Arc::new(AtomicBool::new(share_errors));
let share_dest = match share_dest {
None => "https://beta-registry-0-3.start9labs.com/error-logs".to_owned(), // TODO
Some(a) => a.to_string(),
};
let sharing_layer = SharingLayer {
log_epoch: log_epoch.clone(),
share_dest,
sharing: sharing.clone(),
};
Self::base_subscriber().with(sharing_layer).init();
color_eyre::install().expect("Color Eyre Init");
Self::base_subscriber().with(sharing_layer).init();
color_eyre::install().expect("Color Eyre Init");
(log_epoch, sharing)
};
*guard = Some((log_epoch.clone(), sharing.clone()));
EmbassyLogger { log_epoch, sharing }
}
pub fn epoch(&self) -> Arc<AtomicU64> {
self.log_epoch.clone()
}
pub fn set_sharing(&self, sharing: bool) {
self.sharing.store(sharing, Ordering::SeqCst)
}