instrument all the things

This commit is contained in:
Aiden McClelland
2021-10-11 19:01:37 -06:00
committed by Aiden McClelland
parent 11bd1e0609
commit 69382f788d
44 changed files with 284 additions and 30 deletions

View File

@@ -33,7 +33,7 @@ ubuntu.img:
unxz ubuntu.img.xz
product_key.txt:
$(which echo) -n "X" > product_key.txt
$(shell which echo) -n "X" > product_key.txt
cat /dev/random | base32 | head -c11 | tr '[:upper:]' '[:lower:]' >> product_key.txt
echo >> product_key.txt

View File

@@ -7,7 +7,7 @@ Wants=avahi-daemon.service nginx.service tor.service
[Service]
Type=oneshot
Environment=RUST_LOG=embassy_init=info,embassy=info
ExecStart=/usr/local/bin/embassy-init -vvv
ExecStart=/usr/local/bin/embassy-init
RemainAfterExit=true
[Install]

View File

@@ -6,6 +6,7 @@ use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tracing::instrument;
use crate::context::RpcContext;
use crate::id::{Id, ImageId};
@@ -35,6 +36,7 @@ pub struct DockerAction {
pub shm_size_mb: Option<usize>, // TODO: use postfix sizing? like 1k vs 1m vs 1g
}
impl DockerAction {
#[instrument(skip(ctx, input))]
pub async fn execute<I: Serialize, O: for<'de> Deserialize<'de>>(
&self,
ctx: &RpcContext,
@@ -115,6 +117,7 @@ impl DockerAction {
})
}
#[instrument(skip(ctx, input))]
pub async fn sandboxed<I: Serialize, O: for<'de> Deserialize<'de>>(
&self,
ctx: &RpcContext,

View File

@@ -8,6 +8,7 @@ use indexmap::IndexSet;
use patch_db::HasModel;
use rpc_toolkit::command;
use serde::{Deserialize, Serialize};
use tracing::instrument;
use self::docker::DockerAction;
use crate::config::{Config, ConfigSpec};
@@ -106,6 +107,7 @@ pub struct Action {
pub input_spec: ConfigSpec,
}
impl Action {
#[instrument(skip(ctx))]
pub async fn execute(
&self,
ctx: &RpcContext,
@@ -142,6 +144,7 @@ pub enum ActionImplementation {
Docker(DockerAction),
}
impl ActionImplementation {
#[instrument(skip(ctx, input))]
pub async fn execute<I: Serialize, O: for<'de> Deserialize<'de>>(
&self,
ctx: &RpcContext,
@@ -160,6 +163,7 @@ impl ActionImplementation {
}
}
}
#[instrument(skip(ctx, input))]
pub async fn sandboxed<I: Serialize, O: for<'de> Deserialize<'de>>(
&self,
ctx: &RpcContext,
@@ -194,6 +198,7 @@ fn display_action_result(action_result: ActionResult, matches: &ArgMatches<'_>)
}
#[command(about = "Executes an action", display(display_action_result))]
#[instrument(skip(ctx))]
pub async fn action(
#[context] ctx: RpcContext,
#[arg(rename = "id")] pkg_id: PackageId,

View File

@@ -12,6 +12,7 @@ use rpc_toolkit::command_helpers::prelude::{RequestParts, ResponseParts};
use rpc_toolkit::yajrc::RpcError;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tracing::instrument;
use crate::context::{CliContext, RpcContext};
use crate::middleware::auth::{get_id, hash_token};
@@ -42,6 +43,7 @@ fn gen_pwd() {
)
}
#[instrument]
async fn cli_login(
ctx: CliContext,
password: Option<String>,
@@ -70,6 +72,7 @@ async fn cli_login(
display(display_none),
metadata(authenticated = false)
)]
#[instrument(skip(ctx, password))]
pub async fn login(
#[context] ctx: RpcContext,
#[request] req: &RequestParts,
@@ -124,6 +127,7 @@ pub async fn login(
}
#[command(display(display_none), metadata(authenticated = false))]
#[instrument(skip(ctx))]
pub async fn logout(
#[context] ctx: RpcContext,
#[request] req: &RequestParts,
@@ -199,6 +203,7 @@ fn display_sessions(arg: SessionList, matches: &ArgMatches<'_>) {
}
#[command(display(display_sessions))]
#[instrument(skip(ctx))]
pub async fn list(
#[context] ctx: RpcContext,
#[request] req: &RequestParts,
@@ -231,10 +236,11 @@ pub async fn list(
}
fn parse_comma_separated(arg: &str, _: &ArgMatches<'_>) -> Result<Vec<String>, RpcError> {
Ok(arg.split(",").map(|s| s.to_owned()).collect())
Ok(arg.split(",").map(|s| s.trim().to_owned()).collect())
}
#[command(display(display_none))]
#[instrument(skip(ctx))]
pub async fn kill(
#[context] ctx: RpcContext,
#[arg(parse(parse_comma_separated))] ids: Vec<String>,

View File

@@ -2,6 +2,7 @@ use color_eyre::eyre::eyre;
use patch_db::HasModel;
use regex::NoExpand;
use serde::{Deserialize, Serialize};
use tracing::instrument;
use crate::action::{ActionImplementation, NoOutput};
use crate::context::RpcContext;
@@ -16,6 +17,7 @@ pub struct BackupActions {
pub restore: ActionImplementation,
}
impl BackupActions {
#[instrument(skip(ctx))]
pub async fn create(
&self,
ctx: &RpcContext,

View File

@@ -16,11 +16,13 @@ use embassy::{Error, ResultExt};
use http::StatusCode;
use rpc_toolkit::rpc_server;
use tokio::process::Command;
use tracing::instrument;
fn status_fn(_: i32) -> StatusCode {
StatusCode::OK
}
#[instrument]
async fn init(cfg_path: Option<&str>) -> Result<(), Error> {
// return Err(eyre!("Test failure").with_kind(embassy::ErrorKind::Unknown));
let cfg = RpcContextConfig::load(cfg_path).await?;
@@ -127,7 +129,7 @@ async fn init(cfg_path: Option<&str>) -> Result<(), Error> {
tracing::info!("Loaded Docker Images");
embassy::ssh::sync_keys_from_db(&secret_store, "/root/.ssh/authorized_keys").await?;
tracing::info!("Synced SSH Keys");
// todo!("sync wifi");
embassy::hostname::sync_hostname().await?;
tracing::info!("Synced Hostname");
@@ -187,6 +189,7 @@ async fn run_script_if_exists<P: AsRef<Path>>(path: P) {
}
}
#[instrument]
async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
embassy::sound::BEP.play().await?;

View File

@@ -4,6 +4,7 @@ use color_eyre::eyre::eyre;
use nix::sys::signal::Signal;
use patch_db::HasModel;
use serde::{Deserialize, Serialize};
use tracing::instrument;
use super::{Config, ConfigSpec};
use crate::action::ActionImplementation;
@@ -28,6 +29,7 @@ pub struct ConfigActions {
pub set: ActionImplementation,
}
impl ConfigActions {
#[instrument(skip(ctx))]
pub async fn get(
&self,
ctx: &RpcContext,
@@ -51,6 +53,7 @@ impl ConfigActions {
})
}
#[instrument(skip(ctx))]
pub async fn set(
&self,
ctx: &RpcContext,

View File

@@ -11,6 +11,7 @@ use rand::SeedableRng;
use regex::Regex;
use rpc_toolkit::command;
use serde_json::Value;
use tracing::instrument;
use crate::action::docker::DockerAction;
use crate::context::RpcContext;
@@ -145,6 +146,7 @@ pub fn config(#[arg] id: PackageId) -> Result<PackageId, Error> {
}
#[command(display(display_serializable))]
#[instrument(skip(ctx))]
pub async fn get(
#[context] ctx: RpcContext,
#[parent_data] id: PackageId,
@@ -182,6 +184,7 @@ pub async fn get(
subcommands(self(set_impl(async, context(RpcContext))), set_dry),
display(display_none)
)]
#[instrument]
pub fn set(
#[parent_data] id: PackageId,
#[allow(unused_variables)]
@@ -195,6 +198,7 @@ pub fn set(
}
#[command(rename = "dry", display(display_serializable))]
#[instrument(skip(ctx))]
pub async fn set_dry(
#[context] ctx: RpcContext,
#[parent_data] (id, config, timeout, _): (
@@ -233,6 +237,7 @@ pub async fn set_dry(
Ok(BreakageRes(breakages))
}
#[instrument(skip(ctx))]
pub async fn set_impl(
ctx: RpcContext,
(id, config, timeout, expire_id): (PackageId, Option<Config>, Option<Duration>, Option<String>),
@@ -269,6 +274,7 @@ pub async fn set_impl(
})
}
#[instrument(skip(ctx, db))]
pub fn configure<'a, Db: DbHandle>(
ctx: &'a RpcContext,
db: &'a mut Db,

View File

@@ -13,6 +13,7 @@ use rpc_toolkit::reqwest::{Client, Url};
use rpc_toolkit::url::Host;
use rpc_toolkit::Context;
use serde::Deserialize;
use tracing::instrument;
use crate::ResultExt;
@@ -56,6 +57,7 @@ const DEFAULT_PORT: u16 = 5959;
pub struct CliContext(Arc<CliContextSeed>);
impl CliContext {
/// BLOCKING
#[instrument(skip(matches))]
pub fn init(matches: &ArgMatches) -> Result<Self, crate::Error> {
let cfg_path = Path::new(matches.value_of("config").unwrap_or(crate::CONFIG_PATH));
let base = if cfg_path.exists() {

View File

@@ -8,6 +8,7 @@ use rpc_toolkit::Context;
use serde::Deserialize;
use tokio::fs::File;
use tokio::sync::broadcast::Sender;
use tracing::instrument;
use url::Host;
use crate::shutdown::Shutdown;
@@ -22,6 +23,7 @@ pub struct DiagnosticContextConfig {
pub zfs_pool_name: Option<String>,
}
impl DiagnosticContextConfig {
#[instrument(skip(path))]
pub async fn load<P: AsRef<Path>>(path: Option<P>) -> Result<Self, Error> {
let cfg_path = path
.as_ref()
@@ -54,6 +56,7 @@ pub struct DiagnosticContextSeed {
#[derive(Clone)]
pub struct DiagnosticContext(Arc<DiagnosticContextSeed>);
impl DiagnosticContext {
#[instrument(skip(path))]
pub async fn init<P: AsRef<Path>>(path: Option<P>, error: Error) -> Result<Self, Error> {
let cfg = DiagnosticContextConfig::load(path).await?;

View File

@@ -133,6 +133,7 @@ pub struct RpcContextSeed {
#[derive(Clone)]
pub struct RpcContext(Arc<RpcContextSeed>);
impl RpcContext {
#[instrument(skip(cfg_path))]
pub async fn init<P: AsRef<Path>>(cfg_path: Option<P>) -> Result<Self, Error> {
let base = RpcContextConfig::load(cfg_path).await?;
let log_epoch = Arc::new(AtomicU64::new(rand::random()));
@@ -157,7 +158,7 @@ impl RpcContext {
.await?;
let managers = ManagerMap::default();
let metrics_cache = RwLock::new(None);
let notification_manager = NotificationManager::new(secret_store.clone(), db.clone(), 3600);
let notification_manager = NotificationManager::new(secret_store.clone(), 3600);
let seed = Arc::new(RpcContextSeed {
bind_rpc: base.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()),
bind_ws: base.bind_ws.unwrap_or(([127, 0, 0, 1], 5960).into()),
@@ -199,12 +200,13 @@ impl RpcContext {
// TODO: handle apps in bad / transient state
Ok(res)
}
#[instrument(skip(self))]
pub async fn package_registry_url(&self) -> Result<Url, Error> {
Ok(
if let Some(market) = crate::db::DatabaseModel::new()
.server_info()
.package_marketplace()
.get(&mut self.db.handle(), true)
.get(&mut self.db.handle(), false)
.await?
.to_owned()
{
@@ -214,11 +216,12 @@ impl RpcContext {
},
)
}
#[instrument(skip(self))]
pub async fn eos_registry_url(&self) -> Result<Url, Error> {
Ok(crate::db::DatabaseModel::new()
.server_info()
.eos_marketplace()
.get(&mut self.db.handle(), true)
.get(&mut self.db.handle(), false)
.await?
.to_owned())
}

View File

@@ -7,6 +7,7 @@ use clap::ArgMatches;
use color_eyre::eyre::eyre;
use rpc_toolkit::Context;
use serde::Deserialize;
use tracing::instrument;
use crate::{Error, ResultExt};
@@ -25,6 +26,7 @@ pub struct SdkContextSeed {
pub struct SdkContext(Arc<SdkContextSeed>);
impl SdkContext {
/// BLOCKING
#[instrument(skip(matches))]
pub fn init(matches: &ArgMatches) -> Result<Self, crate::Error> {
let cfg_path = Path::new(matches.value_of("config").unwrap_or(crate::CONFIG_PATH));
let base = if cfg_path.exists() {
@@ -46,6 +48,7 @@ impl SdkContext {
})))
}
/// BLOCKING
#[instrument]
pub fn developer_key(&self) -> Result<ed25519_dalek::Keypair, Error> {
if !self.developer_key_path.exists() {
return Err(Error::new(eyre!("Developer Key does not exist! Please run `embassy-sdk init` before running this command."), crate::ErrorKind::Uninitialized));

View File

@@ -15,6 +15,7 @@ use sqlx::SqlitePool;
use tokio::fs::File;
use tokio::sync::broadcast::Sender;
use tokio::sync::RwLock;
use tracing::instrument;
use url::Host;
use crate::db::model::Database;
@@ -33,6 +34,7 @@ pub struct SetupContextConfig {
pub datadir: Option<PathBuf>,
}
impl SetupContextConfig {
#[instrument(skip(path))]
pub async fn load<P: AsRef<Path>>(path: Option<P>) -> Result<Self, Error> {
let cfg_path = path
.as_ref()
@@ -74,6 +76,7 @@ pub struct SetupContextSeed {
#[derive(Clone)]
pub struct SetupContext(Arc<SetupContextSeed>);
impl SetupContext {
#[instrument(skip(path))]
pub async fn init<P: AsRef<Path>>(path: Option<P>) -> Result<Self, Error> {
let cfg = SetupContextConfig::load(path).await?;
let (shutdown, _) = tokio::sync::broadcast::channel(1);
@@ -89,6 +92,7 @@ impl SetupContext {
recovery_status: RwLock::new(None),
})))
}
#[instrument(skip(self))]
pub async fn db(&self, secret_store: &SqlitePool) -> Result<PatchDb, Error> {
let db_path = self.datadir.join("main").join("embassy.db");
let db = PatchDb::open(&db_path)
@@ -108,6 +112,7 @@ impl SetupContext {
}
Ok(db)
}
#[instrument(skip(self))]
pub async fn secret_store(&self) -> Result<SqlitePool, Error> {
let secret_store = SqlitePool::connect_with(
SqliteConnectOptions::new()
@@ -122,6 +127,7 @@ impl SetupContext {
.with_kind(crate::ErrorKind::Database)?;
Ok(secret_store)
}
#[instrument(skip(self))]
pub async fn product_key(&self) -> Result<Arc<String>, Error> {
Ok(
if let Some(k) = {

View File

@@ -4,6 +4,7 @@ use chrono::Utc;
use color_eyre::eyre::eyre;
use patch_db::DbHandle;
use rpc_toolkit::command;
use tracing::instrument;
use crate::context::RpcContext;
use crate::db::util::WithRevision;
@@ -17,6 +18,7 @@ use crate::util::{display_none, display_serializable};
use crate::{Error, ResultExt};
#[command(display(display_none))]
#[instrument(skip(ctx))]
pub async fn start(
#[context] ctx: RpcContext,
#[arg] id: PackageId,
@@ -66,6 +68,7 @@ pub async fn start(
})
}
#[instrument(skip(db))]
async fn stop_common<Db: DbHandle>(
db: &mut Db,
id: &PackageId,
@@ -101,6 +104,7 @@ pub fn stop(#[arg] id: PackageId) -> Result<PackageId, Error> {
}
#[command(rename = "dry", display(display_serializable))]
#[instrument(skip(ctx))]
pub async fn stop_dry(
#[context] ctx: RpcContext,
#[parent_data] id: PackageId,
@@ -114,6 +118,7 @@ pub async fn stop_dry(
Ok(BreakageRes(breakages))
}
#[instrument(skip(ctx))]
pub async fn stop_impl(ctx: RpcContext, id: PackageId) -> Result<WithRevision<()>, Error> {
let mut db = ctx.db.handle();
let mut tx = db.begin().await?;

View File

@@ -19,6 +19,7 @@ use serde_json::Value;
use tokio::task::JoinError;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::WebSocketStream;
use tracing::instrument;
pub use self::model::DatabaseModel;
use self::util::WithRevision;
@@ -83,6 +84,7 @@ async fn ws_handler<
))
.await
.with_kind(crate::ErrorKind::Network)?;
return Ok(());
}
break;
}
@@ -218,6 +220,7 @@ pub fn put() -> Result<(), RpcError> {
}
#[command(display(display_serializable))]
#[instrument(skip(ctx))]
pub async fn ui(
#[context] ctx: RpcContext,
#[arg] pointer: JsonPointer,
@@ -225,8 +228,11 @@ pub async fn ui(
#[allow(unused_variables)]
#[arg(long = "format")]
format: Option<IoFormat>,
) -> Result<WithRevision<()>, RpcError> {
let ptr = "/ui".parse::<JsonPointer>()? + &pointer;
) -> Result<WithRevision<()>, Error> {
let ptr = "/ui"
.parse::<JsonPointer>()
.with_kind(crate::ErrorKind::Database)?
+ &pointer;
Ok(WithRevision {
response: (),
revision: ctx.db.put(&ptr, &value, None).await?,

View File

@@ -4,12 +4,14 @@ use std::path::Path;
use ed25519_dalek::Keypair;
use rpc_toolkit::command;
use tracing::instrument;
use crate::context::SdkContext;
use crate::util::display_none;
use crate::{Error, ResultExt};
#[command(cli_only, blocking, display(display_none))]
#[instrument(skip(ctx))]
pub fn init(#[context] ctx: SdkContext) -> Result<(), Error> {
if !ctx.developer_key_path.exists() {
let parent = ctx.developer_key_path.parent().unwrap_or(Path::new("/"));

View File

@@ -1,6 +1,7 @@
use std::path::Path;
use tokio::process::Command;
use tracing::instrument;
use crate::util::Invoke;
use crate::{Error, ResultExt};
@@ -10,6 +11,7 @@ pub const DEFAULT_PASSWORD: &'static str = "password";
// TODO: use IncorrectDisk / DiskNotAvailable / DiskCorrupted
#[instrument(skip(disks))]
pub async fn create<I: IntoIterator<Item = P>, P: AsRef<Path>>(
pool_name: &str,
disks: I,
@@ -21,6 +23,7 @@ pub async fn create<I: IntoIterator<Item = P>, P: AsRef<Path>>(
Ok(guid)
}
#[instrument(skip(datadir))]
pub async fn load<P: AsRef<Path>>(
guid: &str,
pool_name: &str,
@@ -32,6 +35,7 @@ pub async fn load<P: AsRef<Path>>(
Ok(())
}
#[instrument(skip(disks))]
pub async fn create_pool<I: IntoIterator<Item = P>, P: AsRef<Path>>(
pool_name: &str,
disks: I,
@@ -56,6 +60,7 @@ pub async fn create_pool<I: IntoIterator<Item = P>, P: AsRef<Path>>(
.to_owned())
}
#[instrument]
pub async fn create_fs(pool_name: &str, password: &str) -> Result<(), Error> {
tokio::fs::write(PASSWORD_PATH, password)
.await
@@ -108,6 +113,7 @@ pub async fn create_fs(pool_name: &str, password: &str) -> Result<(), Error> {
Ok(())
}
#[instrument]
pub async fn create_swap(pool_name: &str) -> Result<(), Error> {
let pagesize = String::from_utf8(
Command::new("getconf")
@@ -138,6 +144,7 @@ pub async fn create_swap(pool_name: &str) -> Result<(), Error> {
Ok(())
}
#[instrument]
pub async fn use_swap(pool_name: &str) -> Result<(), Error> {
Command::new("swapon")
.arg(Path::new("/dev/zvol").join(pool_name).join("swap"))
@@ -146,6 +153,7 @@ pub async fn use_swap(pool_name: &str) -> Result<(), Error> {
Ok(())
}
#[instrument]
pub async fn export(pool_name: &str) -> Result<(), Error> {
Command::new("zpool")
.arg("export")
@@ -155,6 +163,7 @@ pub async fn export(pool_name: &str) -> Result<(), Error> {
Ok(())
}
#[instrument]
pub async fn import(guid: &str) -> Result<(), Error> {
Command::new("zpool")
.arg("import")
@@ -165,6 +174,7 @@ pub async fn import(guid: &str) -> Result<(), Error> {
Ok(())
}
#[instrument(skip(datadir))]
pub async fn mount<P: AsRef<Path>>(
pool_name: &str,
datadir: P,

View File

@@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::Command;
use tracing::instrument;
use crate::util::io::from_yaml_async_reader;
use crate::util::{GeneralGuard, Invoke, Version};
@@ -49,6 +50,7 @@ lazy_static::lazy_static! {
static ref PARTITION_REGEX: Regex = Regex::new("-part[0-9]+$").unwrap();
}
#[instrument(skip(path))]
pub async fn get_vendor<P: AsRef<Path>>(path: P) -> Result<Option<String>, Error> {
let vendor = tokio::fs::read_to_string(
Path::new(SYS_BLOCK_PATH)
@@ -69,6 +71,7 @@ pub async fn get_vendor<P: AsRef<Path>>(path: P) -> Result<Option<String>, Error
})
}
#[instrument(skip(path))]
pub async fn get_model<P: AsRef<Path>>(path: P) -> Result<Option<String>, Error> {
let model = tokio::fs::read_to_string(
Path::new(SYS_BLOCK_PATH)
@@ -85,6 +88,7 @@ pub async fn get_model<P: AsRef<Path>>(path: P) -> Result<Option<String>, Error>
Ok(if model.is_empty() { None } else { Some(model) })
}
#[instrument(skip(path))]
pub async fn get_capacity<P: AsRef<Path>>(path: P) -> Result<usize, Error> {
Ok(String::from_utf8(
Command::new("blockdev")
@@ -97,6 +101,7 @@ pub async fn get_capacity<P: AsRef<Path>>(path: P) -> Result<usize, Error> {
.parse()?)
}
#[instrument(skip(path))]
pub async fn get_label<P: AsRef<Path>>(path: P) -> Result<Option<String>, Error> {
let label = String::from_utf8(
Command::new("lsblk")
@@ -111,6 +116,7 @@ pub async fn get_label<P: AsRef<Path>>(path: P) -> Result<Option<String>, Error>
Ok(if label.is_empty() { None } else { Some(label) })
}
#[instrument(skip(path))]
pub async fn get_used<P: AsRef<Path>>(path: P) -> Result<usize, Error> {
Ok(String::from_utf8(
Command::new("df")
@@ -127,6 +133,7 @@ pub async fn get_used<P: AsRef<Path>>(path: P) -> Result<usize, Error> {
.parse()?)
}
#[instrument]
pub async fn list() -> Result<Vec<DiskInfo>, Error> {
if tokio::fs::metadata(TMP_MOUNTPOINT).await.is_err() {
tokio::fs::create_dir_all(TMP_MOUNTPOINT)
@@ -261,6 +268,7 @@ pub async fn list() -> Result<Vec<DiskInfo>, Error> {
Ok(res)
}
#[instrument(skip(logicalname, mount_point))]
pub async fn mount<P0: AsRef<Path>, P1: AsRef<Path>>(
logicalname: P0,
mount_point: P1,
@@ -291,6 +299,7 @@ pub async fn mount<P0: AsRef<Path>, P1: AsRef<Path>>(
Ok(())
}
#[instrument(skip(src, dst, password))]
pub async fn mount_encfs<P0: AsRef<Path>, P1: AsRef<Path>>(
src: P0,
dst: P1,
@@ -320,6 +329,7 @@ pub async fn mount_encfs<P0: AsRef<Path>, P1: AsRef<Path>>(
}
}
#[instrument(skip(src, dst))]
pub async fn bind<P0: AsRef<Path>, P1: AsRef<Path>>(
src: P0,
dst: P1,
@@ -363,6 +373,7 @@ pub async fn bind<P0: AsRef<Path>, P1: AsRef<Path>>(
Ok(())
}
#[instrument(skip(mount_point))]
pub async fn unmount<P: AsRef<Path>>(mount_point: P) -> Result<(), Error> {
tracing::info!("Unmounting {}.", mount_point.as_ref().display());
let umount_output = tokio::process::Command::new("umount")

View File

@@ -1,11 +1,13 @@
use digest::Digest;
use tokio::process::Command;
use tracing::instrument;
use crate::util::Invoke;
use crate::{Error, ErrorKind, ResultExt};
pub const PRODUCT_KEY_PATH: &'static str = "/embassy-os/product_key.txt";
#[instrument]
pub async fn get_hostname() -> Result<String, Error> {
let out = Command::new("hostname")
.invoke(ErrorKind::ParseSysInfo)
@@ -14,6 +16,7 @@ pub async fn get_hostname() -> Result<String, Error> {
Ok(out_string.trim().to_owned())
}
#[instrument]
pub async fn set_hostname(hostname: &str) -> Result<(), Error> {
let _out = Command::new("hostnamectl")
.arg("set-hostname")
@@ -23,6 +26,7 @@ pub async fn set_hostname(hostname: &str) -> Result<(), Error> {
Ok(())
}
#[instrument]
pub async fn get_product_key() -> Result<String, Error> {
let out = tokio::fs::read_to_string(PRODUCT_KEY_PATH)
.await
@@ -30,6 +34,7 @@ pub async fn get_product_key() -> Result<String, Error> {
Ok(out.trim().to_owned())
}
#[instrument]
pub async fn get_id() -> Result<String, Error> {
let key = get_product_key().await?;
let mut hasher = sha2::Sha256::new();
@@ -39,6 +44,7 @@ pub async fn get_id() -> Result<String, Error> {
}
// cat /embassy-os/product_key.txt | shasum -a 256 | head -c 8 | awk '{print "start9-"$1}' | xargs hostnamectl set-hostname && systemctl restart avahi-daemon
#[instrument]
pub async fn sync_hostname() -> Result<(), Error> {
set_hostname(&format!("start9-{}", get_id().await?)).await?;
Command::new("systemctl")

View File

@@ -2,6 +2,7 @@ use std::collections::{BTreeMap, HashMap};
use bollard::image::ListImagesOptions;
use patch_db::{DbHandle, PatchDbHandle};
use tracing::instrument;
use super::PKG_DOCKER_DIR;
use crate::context::RpcContext;
@@ -10,6 +11,7 @@ use crate::s9pk::manifest::PackageId;
use crate::util::Version;
use crate::Error;
#[instrument(skip(ctx, db, deps))]
pub async fn update_dependents<'a, Db: DbHandle, I: IntoIterator<Item = &'a PackageId>>(
ctx: &RpcContext,
db: &mut Db,
@@ -67,6 +69,7 @@ pub async fn update_dependents<'a, Db: DbHandle, I: IntoIterator<Item = &'a Pack
Ok(())
}
#[instrument(skip(ctx))]
pub async fn cleanup(ctx: &RpcContext, id: &PackageId, version: &Version) -> Result<(), Error> {
ctx.managers.remove(&(id.clone(), version.clone())).await;
// docker images start9/$APP_ID/*:$VERSION -q | xargs docker rmi
@@ -103,6 +106,7 @@ pub async fn cleanup(ctx: &RpcContext, id: &PackageId, version: &Version) -> Res
Ok(())
}
#[instrument(skip(ctx, db))]
pub async fn cleanup_failed<Db: DbHandle>(
ctx: &RpcContext,
db: &mut Db,
@@ -166,6 +170,7 @@ pub async fn cleanup_failed<Db: DbHandle>(
Ok(())
}
#[instrument(skip(db, current_dependencies))]
pub async fn remove_current_dependents<'a, Db: DbHandle, I: IntoIterator<Item = &'a PackageId>>(
db: &mut Db,
id: &PackageId,
@@ -193,6 +198,7 @@ pub async fn remove_current_dependents<'a, Db: DbHandle, I: IntoIterator<Item =
Ok(())
}
#[instrument(skip(ctx, db))]
pub async fn uninstall(
ctx: &RpcContext,
db: &mut PatchDbHandle,

View File

@@ -16,6 +16,7 @@ use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt};
use tokio::process::Command;
use tokio_stream::wrappers::ReadDirStream;
use tracing::instrument;
use self::cleanup::cleanup_failed;
use crate::context::RpcContext;
@@ -47,6 +48,7 @@ pub const PKG_DOCKER_DIR: &'static str = "package-data/docker";
pub const PKG_WASM_DIR: &'static str = "package-data/wasm";
#[command(display(display_none))]
#[instrument(skip(ctx))]
pub async fn install(
#[context] ctx: RpcContext,
#[arg] id: String,
@@ -132,6 +134,7 @@ pub async fn uninstall(#[arg] id: PackageId) -> Result<PackageId, Error> {
}
#[command(rename = "dry", display(display_serializable))]
#[instrument(skip(ctx))]
pub async fn uninstall_dry(
#[context] ctx: RpcContext,
#[parent_data] id: PackageId,
@@ -145,6 +148,7 @@ pub async fn uninstall_dry(
Ok(BreakageRes(breakages))
}
#[instrument(skip(ctx))]
pub async fn uninstall_impl(ctx: RpcContext, id: PackageId) -> Result<WithRevision<()>, Error> {
let mut handle = ctx.db.handle();
let mut tx = handle.begin().await?;
@@ -178,6 +182,7 @@ pub async fn uninstall_impl(ctx: RpcContext, id: PackageId) -> Result<WithRevisi
tokio::spawn(async move {
if let Err(e) = cleanup::uninstall(&ctx, &mut ctx.db.handle(), &installed).await {
tracing::error!("Uninstall of {} Failed: {}", id, e);
tracing::debug!("{:?}", e);
}
});
@@ -187,6 +192,7 @@ pub async fn uninstall_impl(ctx: RpcContext, id: PackageId) -> Result<WithRevisi
})
}
#[instrument(skip(ctx))]
pub async fn download_install_s9pk(
ctx: &RpcContext,
temp_manifest: &Manifest,
@@ -286,6 +292,7 @@ pub async fn download_install_s9pk(
}
}
#[instrument(skip(ctx, rdr))]
pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
ctx: &RpcContext,
pkg_id: &PackageId,
@@ -717,6 +724,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
Ok(())
}
#[instrument(skip(ctx, tx))]
async fn handle_recovered_package(
recovered: Option<crate::db::model::RecoveredPackageInfo>,
manifest: Manifest,
@@ -755,6 +763,7 @@ async fn handle_recovered_package(
})
}
#[instrument(skip(datadir))]
pub async fn load_images<P: AsRef<Path>>(datadir: P) -> Result<(), Error> {
let docker_dir = datadir.as_ref().join(PKG_DOCKER_DIR);
if tokio::fs::metadata(&docker_dir).await.is_ok() {

View File

@@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio_stream::wrappers::LinesStream;
use tracing::instrument;
use crate::action::docker::DockerAction;
use crate::error::ResultExt;
@@ -59,6 +60,7 @@ impl JournalctlEntry {
}
}
#[derive(Debug)]
pub enum LogSource {
Service(&'static str),
Container(PackageId),
@@ -86,6 +88,7 @@ pub async fn logs(
.await?)
}
#[instrument]
pub async fn fetch_logs(
id: LogSource,
limit: Option<usize>,

View File

@@ -12,6 +12,7 @@ use tokio::sync::watch::error::RecvError;
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use torut::onion::TorSecretKeyV3;
use tracing::instrument;
use crate::action::docker::DockerAction;
use crate::context::RpcContext;
@@ -24,6 +25,7 @@ use crate::Error;
#[derive(Default)]
pub struct ManagerMap(RwLock<BTreeMap<(PackageId, Version), Arc<Manager>>>);
impl ManagerMap {
#[instrument(skip(self, ctx, db, secrets))]
pub async fn init<Db: DbHandle, Ex>(
&self,
ctx: &RpcContext,
@@ -62,6 +64,7 @@ impl ManagerMap {
Ok(())
}
#[instrument(skip(self, ctx))]
pub async fn add(
&self,
ctx: RpcContext,
@@ -82,6 +85,7 @@ impl ManagerMap {
Ok(())
}
#[instrument(skip(self))]
pub async fn remove(&self, id: &(PackageId, Version)) {
if let Some(man) = self.0.write().await.remove(id) {
if let Err(e) = man.exit().await {
@@ -90,6 +94,7 @@ impl ManagerMap {
}
}
#[instrument(skip(self))]
pub async fn empty(&self) -> Result<(), Error> {
let res = futures::future::join_all(
std::mem::take(&mut *self.0.write().await)
@@ -104,6 +109,7 @@ impl ManagerMap {
})
}
#[instrument(skip(self))]
pub async fn get(&self, id: &(PackageId, Version)) -> Option<Arc<Manager>> {
self.0.read().await.get(id).cloned()
}
@@ -136,6 +142,7 @@ pub enum OnStop {
Exit,
}
#[instrument(skip(state))]
async fn run_main(state: &Arc<ManagerSharedState>) -> Result<Result<(), (i32, String)>, Error> {
let rt_state = state.clone();
let mut runtime = tokio::spawn(async move {
@@ -236,6 +243,7 @@ async fn run_main(state: &Arc<ManagerSharedState>) -> Result<Result<(), (i32, St
}
impl Manager {
#[instrument(skip(ctx))]
async fn create(
ctx: RpcContext,
manifest: Manifest,
@@ -302,6 +310,7 @@ impl Manager {
Ok(Err(e)) => {
let res = thread_shared.ctx.notification_manager
.notify(
&mut thread_shared.ctx.db.handle(),
Some(thread_shared.manifest.id.clone()),
NotificationLevel::Warning,
String::from("Service Crashed"),
@@ -339,6 +348,7 @@ impl Manager {
}
}
#[instrument(skip(self))]
pub async fn stop(&self) -> Result<(), Error> {
self.shared.on_stop.send(OnStop::Sleep).map_err(|_| {
Error::new(
@@ -371,6 +381,7 @@ impl Manager {
Ok(())
}
#[instrument(skip(self))]
pub async fn start(&self) -> Result<(), Error> {
self.shared.on_stop.send(OnStop::Restart).map_err(|_| {
Error::new(
@@ -385,6 +396,7 @@ impl Manager {
Ok(())
}
#[instrument(skip(self))]
pub async fn pause(&self) -> Result<(), Error> {
self.shared
.ctx
@@ -397,6 +409,7 @@ impl Manager {
Ok(())
}
#[instrument(skip(self))]
pub async fn resume(&self) -> Result<(), Error> {
self.shared
.ctx
@@ -410,6 +423,7 @@ impl Manager {
Ok(())
}
#[instrument(skip(self))]
async fn exit(&self) -> Result<(), Error> {
let _ = self.shared.on_stop.send(OnStop::Exit);
match self

View File

@@ -3,6 +3,7 @@ use emver::VersionRange;
use indexmap::IndexMap;
use patch_db::HasModel;
use serde::{Deserialize, Serialize};
use tracing::instrument;
use crate::action::ActionImplementation;
use crate::context::RpcContext;
@@ -18,6 +19,7 @@ pub struct Migrations {
pub to: IndexMap<VersionRange, ActionImplementation>,
}
impl Migrations {
#[instrument(skip(ctx))]
pub async fn from(
&self,
ctx: &RpcContext,
@@ -53,6 +55,7 @@ impl Migrations {
},
)
}
#[instrument(skip(ctx))]
pub async fn to(
&self,
ctx: &RpcContext,

View File

@@ -8,6 +8,7 @@ use itertools::Either;
use serde::{Deserialize, Deserializer, Serialize};
use sqlx::{Executor, Sqlite};
use torut::onion::TorSecretKeyV3;
use tracing::instrument;
use crate::db::model::{InterfaceAddressMap, InterfaceAddresses};
use crate::id::Id;
@@ -19,6 +20,7 @@ use crate::Error;
#[serde(rename_all = "kebab-case")]
pub struct Interfaces(pub BTreeMap<InterfaceId, Interface>); // TODO
impl Interfaces {
#[instrument(skip(secrets))]
pub async fn install<Ex>(
&self,
secrets: &mut Ex,
@@ -68,6 +70,7 @@ impl Interfaces {
Ok(interface_addresses)
}
#[instrument(skip(secrets))]
pub async fn tor_keys<Ex>(
&self,
secrets: &mut Ex,

View File

@@ -4,6 +4,7 @@ use std::path::PathBuf;
use rpc_toolkit::command;
use sqlx::SqlitePool;
use torut::onion::{OnionAddressV3, TorSecretKeyV3};
use tracing::instrument;
use self::interface::{Interface, InterfaceId};
#[cfg(feature = "avahi")]
@@ -35,6 +36,7 @@ pub struct NetController {
pub nginx: NginxController,
}
impl NetController {
#[instrument(skip(db))]
pub async fn init(
embassyd_addr: SocketAddr,
embassyd_tor_key: TorSecretKeyV3,
@@ -49,6 +51,7 @@ impl NetController {
})
}
#[instrument(skip(self, interfaces))]
pub async fn add<
'a,
I: IntoIterator<Item = (InterfaceId, &'a Interface, TorSecretKeyV3)> + Clone,
@@ -105,6 +108,7 @@ impl NetController {
Ok(())
}
#[instrument(skip(self, interfaces))]
pub async fn remove<I: IntoIterator<Item = InterfaceId> + Clone>(
&self,
pkg_id: &PackageId,

View File

@@ -7,6 +7,7 @@ use futures::FutureExt;
use indexmap::IndexSet;
use sqlx::SqlitePool;
use tokio::sync::Mutex;
use tracing::instrument;
use super::interface::{InterfaceId, LanPortConfig};
use super::ssl::SslManager;
@@ -55,6 +56,7 @@ pub struct NginxControllerInner {
ssl_manager: SslManager,
}
impl NginxControllerInner {
#[instrument(skip(db))]
async fn init(nginx_root: &Path, db: SqlitePool) -> Result<Self, Error> {
let inner = NginxControllerInner {
interfaces: BTreeMap::new(),
@@ -77,6 +79,7 @@ impl NginxControllerInner {
)?;
Ok(inner)
}
#[instrument(skip(self, interfaces))]
async fn add<I: IntoIterator<Item = (InterfaceId, InterfaceMetadata)>>(
&mut self,
nginx_root: &Path,
@@ -182,6 +185,8 @@ impl NginxControllerInner {
self.hup().await?;
Ok(())
}
#[instrument(skip(self))]
async fn remove(&mut self, nginx_root: &Path, package: &PackageId) -> Result<(), Error> {
let removed = self.interfaces.remove(package);
if let Some(net_info) = removed {
@@ -207,6 +212,8 @@ impl NginxControllerInner {
self.hup().await?;
Ok(())
}
#[instrument(skip(self))]
async fn hup(&self) -> Result<(), Error> {
let _ = tokio::process::Command::new("systemctl")
.arg("reload")

View File

@@ -11,6 +11,7 @@ use openssl::x509::{X509Builder, X509Extension, X509NameBuilder, X509};
use openssl::*;
use sqlx::SqlitePool;
use tokio::sync::Mutex;
use tracing::instrument;
use crate::{Error, ErrorKind};
@@ -30,12 +31,14 @@ impl SslStore {
fn new(db: SqlitePool) -> Result<Self, Error> {
Ok(SslStore { secret_store: db })
}
#[instrument(skip(self))]
async fn save_root_certificate(&self, key: &PKey<Private>, cert: &X509) -> Result<(), Error> {
let key_str = String::from_utf8(key.private_key_to_pem_pkcs8()?)?;
let cert_str = String::from_utf8(cert.to_pem()?)?;
let _n = sqlx::query!("INSERT INTO certificates (id, priv_key_pem, certificate_pem, lookup_string, created_at, updated_at) VALUES (0, ?, ?, NULL, datetime('now'), datetime('now'))", key_str, cert_str).execute(&self.secret_store).await?;
Ok(())
}
#[instrument(skip(self))]
async fn load_root_certificate(&self) -> Result<Option<(PKey<Private>, X509)>, Error> {
let m_row =
sqlx::query!("SELECT priv_key_pem, certificate_pem FROM certificates WHERE id = 0;")
@@ -50,6 +53,7 @@ impl SslStore {
}
}
}
#[instrument(skip(self))]
async fn save_intermediate_certificate(
&self,
key: &PKey<Private>,
@@ -74,6 +78,7 @@ impl SslStore {
}
}
}
#[instrument(skip(self))]
async fn save_certificate(
&self,
key: &PKey<Private>,
@@ -104,6 +109,7 @@ impl SslStore {
}
}
}
#[instrument(skip(self))]
async fn update_certificate(
&self,
key: &PKey<Private>,
@@ -133,6 +139,7 @@ lazy_static::lazy_static! {
}
impl SslManager {
#[instrument(skip(db))]
pub async fn init(db: SqlitePool) -> Result<Self, Error> {
let store = SslStore::new(db)?;
let (root_key, root_cert) = match store.load_root_certificate().await? {
@@ -163,6 +170,7 @@ impl SslManager {
})
}
#[instrument(skip(self))]
pub async fn certificate_for(
&self,
dns_base: &str,
@@ -193,17 +201,20 @@ impl SslManager {
}
}
#[instrument]
fn rand_serial() -> Result<Asn1Integer, Error> {
let mut bn = BigNum::new()?;
bn.rand(64, MsbOption::MAYBE_ZERO, false)?;
let asn1 = Asn1Integer::from_bn(&bn)?;
Ok(asn1)
}
#[instrument]
fn generate_key() -> Result<PKey<Private>, Error> {
let new_key = EcKey::generate(EC_GROUP.as_ref())?;
let key = PKey::from_ec_key(new_key)?;
Ok(key)
}
#[instrument]
fn make_root_cert(root_key: &PKey<Private>) -> Result<X509, Error> {
let mut builder = X509Builder::new()?;
builder.set_version(CERTIFICATE_VERSION)?;
@@ -254,6 +265,7 @@ fn make_root_cert(root_key: &PKey<Private>) -> Result<X509, Error> {
let cert = builder.build();
Ok(cert)
}
#[instrument]
fn make_int_cert(
signer: (&PKey<Private>, &X509),
applicant: &PKey<Private>,
@@ -315,6 +327,7 @@ fn make_int_cert(
Ok(cert)
}
#[instrument]
fn make_leaf_cert(
signer: (&PKey<Private>, &X509),
applicant: (&PKey<Private>, &str),

View File

@@ -14,6 +14,7 @@ use tokio::net::TcpStream;
use tokio::sync::Mutex;
use torut::control::{AsyncEvent, AuthenticatedConn, ConnError};
use torut::onion::{OnionAddressV3, TorSecretKeyV3};
use tracing::instrument;
use super::interface::{InterfaceId, TorConfig};
use crate::context::RpcContext;
@@ -56,6 +57,7 @@ pub async fn list_services(
ctx.net_controller.tor.list_services().await
}
#[instrument(skip(secrets))]
pub async fn os_key<Ex>(secrets: &mut Ex) -> Result<TorSecretKeyV3, Error>
where
for<'a> &'a mut Ex: Executor<'a, Database = Sqlite>,
@@ -139,6 +141,7 @@ pub struct TorControllerInner {
services: BTreeMap<(PackageId, InterfaceId), (TorSecretKeyV3, TorConfig, Ipv4Addr)>,
}
impl TorControllerInner {
#[instrument(skip(self, interfaces))]
async fn add<'a, I: IntoIterator<Item = (InterfaceId, TorConfig, TorSecretKeyV3)>>(
&mut self,
pkg_id: &PackageId,
@@ -180,6 +183,7 @@ impl TorControllerInner {
Ok(())
}
#[instrument(skip(self, interfaces))]
async fn remove<I: IntoIterator<Item = InterfaceId>>(
&mut self,
pkg_id: &PackageId,
@@ -203,6 +207,7 @@ impl TorControllerInner {
Ok(())
}
#[instrument]
async fn init(
embassyd_addr: SocketAddr,
embassyd_tor_key: TorSecretKeyV3,
@@ -232,6 +237,7 @@ impl TorControllerInner {
Ok(controller)
}
#[instrument(skip(self))]
async fn add_embassyd_onion(&mut self) -> Result<(), Error> {
tracing::info!(
"Registering Main Tor Service: {}",
@@ -256,6 +262,7 @@ impl TorControllerInner {
Ok(())
}
#[instrument(skip(self))]
async fn replace(&mut self) -> Result<bool, Error> {
let connection = self.connection.take();
let uptime = if let Some(mut c) = connection {
@@ -332,6 +339,7 @@ impl TorControllerInner {
self.embassyd_tor_key.public().get_onion_address()
}
#[instrument(skip(self))]
async fn list_services(&mut self) -> Result<Vec<OnionAddressV3>, Error> {
self.connection
.as_mut()

View File

@@ -6,6 +6,7 @@ use clap::ArgMatches;
use isocountry::CountryCode;
use rpc_toolkit::command;
use tokio::process::Command;
use tracing::instrument;
use crate::context::RpcContext;
use crate::util::{display_none, display_serializable, Invoke, IoFormat};
@@ -17,6 +18,7 @@ pub async fn wifi() -> Result<(), Error> {
}
#[command(display(display_none))]
#[instrument(skip(ctx))]
pub async fn add(
#[context] ctx: RpcContext,
#[arg] ssid: String,
@@ -74,6 +76,7 @@ pub async fn add(
}
#[command(display(display_none))]
#[instrument(skip(ctx))]
pub async fn connect(#[context] ctx: RpcContext, #[arg] ssid: String) -> Result<(), Error> {
if !ssid.is_ascii() {
return Err(Error::new(
@@ -112,6 +115,7 @@ pub async fn connect(#[context] ctx: RpcContext, #[arg] ssid: String) -> Result<
}
#[command(display(display_none))]
#[instrument(skip(ctx))]
pub async fn delete(#[context] ctx: RpcContext, #[arg] ssid: String) -> Result<(), Error> {
if !ssid.is_ascii() {
return Err(Error::new(
@@ -195,6 +199,7 @@ fn display_wifi_info(info: WiFiInfo, matches: &ArgMatches<'_>) {
}
#[command(display(display_wifi_info))]
#[instrument(skip(ctx))]
pub async fn get(
#[context] ctx: RpcContext,
#[allow(unused_variables)]
@@ -239,6 +244,7 @@ pub async fn get(
}
#[command(display(display_none))]
#[instrument(skip(ctx))]
pub async fn set_country(
#[context] ctx: RpcContext,
#[arg(parse(country_code_parse))] country: CountryCode,
@@ -247,6 +253,7 @@ pub async fn set_country(
wpa_supplicant.set_country_low(country.alpha2()).await
}
#[derive(Debug)]
pub struct WpaCli {
datadir: PathBuf,
interface: String,
@@ -372,6 +379,7 @@ impl WpaCli {
.await?;
Ok(())
}
#[instrument]
pub async fn list_networks_low(&self) -> Result<BTreeMap<String, NetworkId>, Error> {
let r = Command::new("wpa_cli")
.arg("-i")
@@ -411,6 +419,7 @@ impl WpaCli {
.await?;
Ok(())
}
#[instrument]
pub async fn signal_poll_low(&self) -> Result<Option<isize>, Error> {
let r = Command::new("wpa_cli")
.arg("-i")
@@ -447,6 +456,7 @@ impl WpaCli {
pub async fn check_network(&self, ssid: &str) -> Result<Option<NetworkId>, Error> {
Ok(self.list_networks_low().await?.remove(ssid))
}
#[instrument]
pub async fn select_network(&self, ssid: &str) -> Result<bool, Error> {
let m_id = self.check_network(ssid).await?;
match m_id {
@@ -485,6 +495,7 @@ impl WpaCli {
}
}
}
#[instrument]
pub async fn get_current_network(&self) -> Result<Option<String>, Error> {
let r = Command::new("iwgetid")
.arg(&self.interface)
@@ -500,6 +511,7 @@ impl WpaCli {
Ok(Some(network.to_owned()))
}
}
#[instrument]
pub async fn remove_network(&self, ssid: &str) -> Result<bool, Error> {
match self.check_network(ssid).await? {
None => Ok(false),
@@ -511,6 +523,7 @@ impl WpaCli {
}
}
}
#[instrument]
pub async fn add_network(&self, ssid: &str, psk: &str, priority: isize) -> Result<(), Error> {
use NetworkAttr::*;
let nid = match self.check_network(ssid).await? {
@@ -533,6 +546,7 @@ impl WpaCli {
}
}
#[instrument]
pub async fn interface_connected(interface: &str) -> Result<bool, Error> {
let out = Command::new("ifconfig")
.arg(interface)
@@ -552,6 +566,7 @@ pub fn country_code_parse(code: &str, _matches: &ArgMatches<'_>) -> Result<Count
)))
}
#[instrument(skip(main_datadir))]
pub async fn synchronize_wpa_supplicant_conf<P: AsRef<Path>>(main_datadir: P) -> Result<(), Error> {
let persistent = main_datadir.as_ref().join("wpa_supplicant.conf");
tracing::debug!("persistent: {:?}", persistent);

View File

@@ -4,10 +4,11 @@ use std::str::FromStr;
use chrono::{DateTime, Utc};
use color_eyre::eyre::eyre;
use patch_db::PatchDb;
use patch_db::{DbHandle, PatchDb};
use rpc_toolkit::command;
use sqlx::SqlitePool;
use tokio::sync::Mutex;
use tracing::instrument;
use crate::context::RpcContext;
use crate::db::util::WithRevision;
@@ -21,6 +22,7 @@ pub async fn notification() -> Result<(), Error> {
}
#[command(display(display_serializable))]
#[instrument(skip(ctx))]
pub async fn list(
#[context] ctx: RpcContext,
#[arg] before: Option<u32>,
@@ -142,11 +144,18 @@ pub async fn create(
#[arg] message: String,
) -> Result<(), Error> {
ctx.notification_manager
.notify(package, level, title, message, NotificationSubtype::General)
.notify(
&mut ctx.db.handle(),
package,
level,
title,
message,
NotificationSubtype::General,
)
.await
}
#[derive(Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum NotificationLevel {
Success,
@@ -190,7 +199,7 @@ impl fmt::Display for InvalidNotificationLevel {
write!(f, "Invalid Notification Level: {}", self.0)
}
}
#[derive(serde::Serialize, serde::Deserialize)]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Notification {
id: u32,
@@ -203,6 +212,7 @@ pub struct Notification {
data: serde_json::Value,
}
#[derive(Debug)]
pub enum NotificationSubtype {
General,
BackupReport {
@@ -254,21 +264,21 @@ impl NotificationSubtype {
pub struct NotificationManager {
sqlite: SqlitePool,
patchdb: PatchDb,
cache: Mutex<HashMap<(Option<PackageId>, NotificationLevel, String), i64>>,
debounce_interval: u32,
}
impl NotificationManager {
pub fn new(sqlite: SqlitePool, patchdb: PatchDb, debounce_interval: u32) -> Self {
pub fn new(sqlite: SqlitePool, debounce_interval: u32) -> Self {
NotificationManager {
sqlite,
patchdb,
cache: Mutex::new(HashMap::new()),
debounce_interval,
}
}
pub async fn notify(
#[instrument(skip(self, db))]
pub async fn notify<Db: DbHandle>(
&self,
db: &mut Db,
package_id: Option<PackageId>,
level: NotificationLevel,
title: String,
@@ -278,11 +288,10 @@ impl NotificationManager {
if !self.should_notify(&package_id, &level, &title).await {
return Ok(());
}
let mut handle = self.patchdb.handle();
let mut count = crate::db::DatabaseModel::new()
.server_info()
.unread_notification_count()
.get_mut(&mut handle)
.get_mut(db)
.await?;
let sql_package_id = package_id.map::<String, _>(|p| p.into());
let sql_code = subtype.code();
@@ -298,7 +307,7 @@ impl NotificationManager {
sql_data
).execute(&self.sqlite).await?;
*count += 1;
count.save(&mut handle).await?;
count.save(db).await?;
Ok(())
}
async fn gc(&self) {

View File

@@ -2,6 +2,7 @@ use clap::ArgMatches;
use color_eyre::eyre::eyre;
use rpc_toolkit::command;
use serde_json::Value;
use tracing::instrument;
use crate::context::RpcContext;
use crate::s9pk::manifest::{Manifest, PackageId};
@@ -16,6 +17,7 @@ pub async fn properties(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Res
Ok(fetch_properties(ctx, id).await?)
}
#[instrument(skip(ctx))]
pub async fn fetch_properties(ctx: RpcContext, id: PackageId) -> Result<Value, Error> {
let mut db = ctx.db.handle();
let manifest: Manifest = crate::db::DatabaseModel::new()

View File

@@ -2,6 +2,7 @@ use std::io::{Read, Seek, SeekFrom, Write};
use digest::Digest;
use sha2::Sha512;
use tracing::instrument;
use typed_builder::TypedBuilder;
use super::header::{FileSection, Header};
@@ -39,6 +40,7 @@ impl<
> S9pkPacker<'a, W, RLicense, RInstructions, RIcon, RDockerImages, RAssets>
{
/// BLOCKING
#[instrument(skip(self))]
pub fn pack(mut self, key: &ed25519_dalek::Keypair) -> Result<(), Error> {
let header_pos = self.writer.stream_position()?;
if header_pos != 0 {

View File

@@ -2,6 +2,7 @@ use std::path::PathBuf;
use color_eyre::eyre::eyre;
use rpc_toolkit::command;
use tracing::instrument;
use crate::context::SdkContext;
use crate::s9pk::builder::S9pkPacker;
@@ -19,6 +20,7 @@ pub mod reader;
pub const SIG_CONTEXT: &'static [u8] = b"s9pk";
#[command(cli_only, display(display_none), blocking)]
#[instrument(skip(ctx))]
pub fn pack(#[context] ctx: SdkContext, #[arg] path: Option<PathBuf>) -> Result<(), Error> {
use std::fs::File;
use std::io::Read;

View File

@@ -7,6 +7,7 @@ use digest::Output;
use sha2::{Digest, Sha512};
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, ReadBuf, Take};
use tracing::instrument;
use super::header::{FileSection, Header, TableOfContents};
use super::manifest::Manifest;
@@ -66,10 +67,12 @@ impl<R: AsyncRead + AsyncSeek + Unpin> S9pkReader<InstallProgressTracker<R>> {
}
}
impl<R: AsyncRead + AsyncSeek + Unpin> S9pkReader<R> {
#[instrument(skip(self))]
pub async fn validate(&mut self) -> Result<(), Error> {
self.rdr.seek(SeekFrom::Start(0)).await?;
Ok(())
}
#[instrument(skip(rdr))]
pub async fn from_reader(mut rdr: R) -> Result<Self, Error> {
let header = Header::deserialize(&mut rdr).await?;

View File

@@ -13,6 +13,7 @@ use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use torut::onion::TorSecretKeyV3;
use tracing::instrument;
use crate::context::SetupContext;
use crate::db::model::RecoveredPackageInfo;
@@ -101,11 +102,13 @@ pub async fn execute(
}
Err(e) => {
tracing::error!("Error Setting Up Embassy: {}", e);
tracing::debug!("{:?}", e);
Err(e)
}
}
}
#[instrument(skip(ctx))]
pub async fn complete_setup(ctx: SetupContext, guid: String) -> Result<(), Error> {
let mut guid_file = File::create("/embassy-os/disk.guid").await?;
guid_file.write_all(guid.as_bytes()).await?;
@@ -114,6 +117,7 @@ pub async fn complete_setup(ctx: SetupContext, guid: String) -> Result<(), Error
Ok(())
}
#[instrument(skip(ctx))]
pub async fn execute_inner(
ctx: SetupContext,
embassy_logicalname: PathBuf,
@@ -189,6 +193,7 @@ pub async fn execute_inner(
Ok(tor_key.public().get_onion_address().to_string())
}
#[instrument(skip(ctx))]
async fn recover(
ctx: SetupContext,
guid: String,
@@ -220,7 +225,7 @@ fn dir_size<'a, P: AsRef<Path> + 'a + Send + Sync>(
) -> BoxFuture<'a, Result<(), std::io::Error>> {
async move {
tokio_stream::wrappers::ReadDirStream::new(tokio::fs::read_dir(path.as_ref()).await?)
.try_for_each_concurrent(Some(8), |e| async move {
.try_for_each(|e| async move {
let m = e.metadata().await?;
if m.is_file() {
res.fetch_add(m.len(), Ordering::Relaxed);
@@ -243,7 +248,7 @@ fn dir_copy<'a, P0: AsRef<Path> + 'a + Send + Sync, P1: AsRef<Path> + 'a + Send
let dst_path = dst.as_ref();
tokio_stream::wrappers::ReadDirStream::new(tokio::fs::read_dir(src.as_ref()).await?)
.map_err(|e| Error::new(e, crate::ErrorKind::Filesystem))
.try_for_each_concurrent(Some(8), |e| async move {
.try_for_each(|e| async move {
let m = e.metadata().await?;
let src_path = e.path();
let dst_path = dst_path.join(e.file_name());
@@ -305,6 +310,7 @@ fn dir_copy<'a, P0: AsRef<Path> + 'a + Send + Sync, P1: AsRef<Path> + 'a + Send
.boxed()
}
#[instrument(skip(ctx))]
async fn recover_v2(ctx: &SetupContext, recovery_drive: DiskInfo) -> Result<(), Error> {
let tmp_mountpoint = Path::new("/mnt/recovery");
mount(
@@ -430,6 +436,7 @@ async fn recover_v2(ctx: &SetupContext, recovery_drive: DiskInfo) -> Result<(),
.with_kind(crate::ErrorKind::Unknown)?
}
#[instrument(skip(ctx))]
async fn recover_v3(
ctx: &SetupContext,
recovery_drive: DiskInfo,

View File

@@ -5,6 +5,7 @@ use std::time::Duration;
use divrem::DivRem;
use proptest_derive::Arbitrary;
use tokio::sync::{Mutex, MutexGuard};
use tracing::instrument;
use crate::{Error, ErrorKind, ResultExt};
@@ -24,6 +25,7 @@ pub const SOUND_LOCK_FILE: &'static str = "/etc/embassy/sound.lock";
struct SoundInterface(Option<MutexGuard<'static, Option<fd_lock_rs::FdLock<tokio::fs::File>>>>);
impl SoundInterface {
#[instrument]
pub async fn lease() -> Result<Self, Error> {
let mut guard = SOUND_MUTEX.lock().await;
let sound_file = tokio::fs::File::create(SOUND_LOCK_FILE)
@@ -54,6 +56,7 @@ impl SoundInterface {
.with_ctx(|_| (ErrorKind::SoundError, EXPORT_FILE.to_string_lossy()))?;
Ok(SoundInterface(Some(guard)))
}
#[instrument(skip(self))]
pub async fn play(&mut self, note: &Note) -> Result<(), Error> {
let curr_period = tokio::fs::read_to_string(&*PERIOD_FILE)
.await
@@ -78,6 +81,7 @@ impl SoundInterface {
.with_ctx(|_| (ErrorKind::SoundError, SWITCH_FILE.to_string_lossy()))?;
Ok(())
}
#[instrument(skip(self))]
pub async fn play_for_time_slice(
&mut self,
tempo_qpm: u16,
@@ -97,6 +101,7 @@ impl SoundInterface {
Err(e)
})
}
#[instrument(skip(self))]
pub async fn stop(&mut self) -> Result<(), Error> {
tokio::fs::write(&*SWITCH_FILE, "0")
.await
@@ -112,6 +117,7 @@ impl<'a, T: 'a> Song<T>
where
&'a T: IntoIterator<Item = &'a (Option<Note>, TimeSlice)>,
{
#[instrument(skip(self))]
pub async fn play(&'a self) -> Result<(), Error> {
#[cfg(feature = "sound")]
{
@@ -213,7 +219,7 @@ impl Semitone {
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Arbitrary)]
pub struct Interval(isize);
#[derive(Clone, Copy)]
#[derive(Debug, Clone, Copy)]
pub enum TimeSlice {
Sixteenth,
Eighth,

View File

@@ -5,6 +5,7 @@ use clap::ArgMatches;
use color_eyre::eyre::eyre;
use rpc_toolkit::command;
use sqlx::{Pool, Sqlite};
use tracing::instrument;
use crate::context::RpcContext;
use crate::util::{display_none, display_serializable, IoFormat};
@@ -12,7 +13,7 @@ use crate::{Error, ErrorKind};
static SSH_AUTHORIZED_KEYS_FILE: &str = "/root/.ssh/authorized_keys";
#[derive(serde::Deserialize, serde::Serialize)]
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct PubKey(
#[serde(serialize_with = "crate::util::serialize_display")]
#[serde(deserialize_with = "crate::util::deserialize_from_str")]
@@ -54,6 +55,7 @@ pub fn ssh() -> Result<(), Error> {
}
#[command(display(display_none))]
#[instrument(skip(ctx))]
pub async fn add(#[context] ctx: RpcContext, #[arg] key: PubKey) -> Result<SshKeyResponse, Error> {
let pool = &ctx.secret_store;
// check fingerprint for duplicates
@@ -88,6 +90,7 @@ pub async fn add(#[context] ctx: RpcContext, #[arg] key: PubKey) -> Result<SshKe
}
}
#[command(display(display_none))]
#[instrument(skip(ctx))]
pub async fn delete(#[context] ctx: RpcContext, #[arg] fingerprint: String) -> Result<(), Error> {
let pool = &ctx.secret_store;
// check if fingerprint is in DB
@@ -137,6 +140,7 @@ fn display_all_ssh_keys(all: Vec<SshKeyResponse>, matches: &ArgMatches<'_>) {
}
#[command(display(display_all_ssh_keys))]
#[instrument(skip(ctx))]
pub async fn list(
#[context] ctx: RpcContext,
#[allow(unused_variables)]
@@ -166,6 +170,7 @@ pub async fn list(
.collect())
}
#[instrument(skip(pool, dest))]
pub async fn sync_keys_from_db<P: AsRef<Path>>(pool: &Pool<Sqlite>, dest: P) -> Result<(), Error> {
let dest = dest.as_ref();
let keys = sqlx::query!("SELECT openssh_pubkey FROM ssh_keys")

View File

@@ -3,6 +3,7 @@ use std::path::Path;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Deserializer, Serialize};
use tracing::instrument;
use crate::action::{ActionImplementation, NoOutput};
use crate::context::RpcContext;
@@ -73,6 +74,7 @@ pub struct HealthCheck {
pub critical: bool,
}
impl HealthCheck {
#[instrument(skip(ctx))]
pub async fn check(
&self,
ctx: &RpcContext,

View File

@@ -6,6 +6,7 @@ use color_eyre::eyre::eyre;
use futures::{FutureExt, StreamExt};
use patch_db::{DbHandle, HasModel, Map, ModelData};
use serde::{Deserialize, Serialize};
use tracing::instrument;
use self::health_check::HealthCheckId;
use crate::context::RpcContext;
@@ -20,6 +21,7 @@ use crate::Error;
pub mod health_check;
// Assume docker for now
#[instrument(skip(ctx))]
pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> {
let pkg_ids = crate::db::DatabaseModel::new()
.package_data()
@@ -27,6 +29,7 @@ pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> {
.await?;
futures::stream::iter(pkg_ids)
.for_each_concurrent(None, |id| async move {
#[instrument(skip(ctx))]
async fn status(ctx: &RpcContext, id: PackageId) -> Result<(), Error> {
let mut db = ctx.db.handle();
// TODO: DRAGONS!!
@@ -75,6 +78,7 @@ pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> {
}
if let Err(e) = status(ctx, id.clone()).await {
tracing::error!("Error syncronizing status of {}: {}", id, e);
tracing::debug!("{:?}", e);
}
})
.await;
@@ -82,6 +86,7 @@ pub async fn synchronize_all(ctx: &RpcContext) -> Result<(), Error> {
Ok(())
}
#[instrument(skip(ctx))]
pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> {
let mut db = ctx.db.handle();
// TODO: DRAGONS!!
@@ -133,6 +138,7 @@ pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> {
}
}
drop(db);
#[instrument(skip(ctx, db))]
async fn main_status<Db: DbHandle>(
ctx: RpcContext,
status_model: StatusModel,
@@ -141,7 +147,7 @@ pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> {
) -> Result<MainStatus, Error> {
let mut status = status_model.get_mut(&mut db).await?;
status.main.check(&ctx, &*manifest).await?;
status.main.check(&ctx, &mut db, &*manifest).await?;
let res = status.main.clone();
@@ -176,6 +182,7 @@ pub async fn check_all(ctx: &RpcContext) -> Result<(), Error> {
statuses.insert(id, status);
}
let statuses = Arc::new(statuses);
#[instrument(skip(db))]
async fn dependency_status<Db: DbHandle>(
id: &PackageId,
statuses: Arc<BTreeMap<PackageId, MainStatus>>,
@@ -273,6 +280,7 @@ pub enum MainStatus {
},
}
impl MainStatus {
#[instrument(skip(manager))]
pub async fn synchronize(&mut self, manager: &Manager) -> Result<(), Error> {
match manager.status() {
ManagerStatus::Stopped => match self {
@@ -308,7 +316,13 @@ impl MainStatus {
}
Ok(())
}
pub async fn check(&mut self, ctx: &RpcContext, manifest: &Manifest) -> Result<(), Error> {
#[instrument(skip(ctx, db))]
pub async fn check<Db: DbHandle>(
&mut self,
ctx: &RpcContext,
db: &mut Db,
manifest: &Manifest,
) -> Result<(), Error> {
match self {
MainStatus::Running { started, health } => {
*health = manifest
@@ -333,6 +347,7 @@ impl MainStatus {
.unwrap_or_default() =>
{
ctx.notification_manager.notify(
db,
Some(manifest.id.clone()),
NotificationLevel::Error,
String::from("Critical Health Check Failed"),

View File

@@ -5,6 +5,7 @@ use rpc_toolkit::command;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tokio::sync::broadcast::Receiver;
use tokio::sync::RwLock;
use tracing::instrument;
use crate::context::RpcContext;
use crate::logs::{display_logs, fetch_logs, LogResponse, LogSource};
@@ -231,6 +232,7 @@ pub async fn launch_metrics_task<F: FnMut() -> Receiver<Option<Shutdown>>>(
}
Err(e) => {
tracing::error!("Could not get initial temperature: {}", e);
tracing::debug!("{:?}", e);
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
@@ -248,10 +250,12 @@ pub async fn launch_metrics_task<F: FnMut() -> Receiver<Option<Shutdown>>>(
}
Err(e) => {
tracing::error!("Could not get initial cpu info: {}", e);
tracing::debug!("{:?}", e);
}
},
Err(e) => {
tracing::error!("Could not get initial proc stat: {}", e);
tracing::debug!("{:?}", e);
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
@@ -266,6 +270,7 @@ pub async fn launch_metrics_task<F: FnMut() -> Receiver<Option<Shutdown>>>(
}
Err(e) => {
tracing::error!("Could not get initial mem info: {}", e);
tracing::debug!("{:?}", e);
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
@@ -280,6 +285,7 @@ pub async fn launch_metrics_task<F: FnMut() -> Receiver<Option<Shutdown>>>(
}
Err(e) => {
tracing::error!("Could not get initial disk info: {}", e);
tracing::debug!("{:?}", e);
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
@@ -332,6 +338,7 @@ async fn launch_temp_task(
}
Err(e) => {
tracing::error!("Could not get new temperature: {}", e);
tracing::debug!("{:?}", e);
}
}
tokio::select! {
@@ -355,6 +362,7 @@ async fn launch_cpu_task(
}
Err(e) => {
tracing::error!("Could not get new CPU Metrics: {}", e);
tracing::debug!("{:?}", e);
}
}
tokio::select! {
@@ -377,6 +385,7 @@ async fn launch_mem_task(
}
Err(e) => {
tracing::error!("Could not get new Memory Metrics: {}", e);
tracing::debug!("{:?}", e);
}
}
tokio::select! {
@@ -398,6 +407,7 @@ async fn launch_disk_task(
}
Err(e) => {
tracing::error!("Could not get new Disk Metrics: {}", e);
tracing::debug!("{:?}", e);
}
}
tokio::select! {
@@ -407,6 +417,7 @@ async fn launch_disk_task(
}
}
#[instrument]
async fn get_temp() -> Result<Celsius, Error> {
let milli = tokio::fs::read_to_string("/sys/class/thermal/thermal_zone0/temp")
.await?
@@ -444,6 +455,7 @@ impl ProcStat {
}
}
#[instrument]
async fn get_proc_stat() -> Result<ProcStat, Error> {
use tokio::io::AsyncBufReadExt;
let mut cpu_line = String::new();
@@ -485,6 +497,7 @@ async fn get_proc_stat() -> Result<ProcStat, Error> {
}
}
#[instrument]
async fn get_cpu_info(last: &mut ProcStat) -> Result<MetricsCpu, Error> {
let new = get_proc_stat().await?;
let total_old = last.total();
@@ -511,6 +524,7 @@ pub struct MemInfo {
swap_total: Option<u64>,
swap_free: Option<u64>,
}
#[instrument]
async fn get_mem_info() -> Result<MetricsMemory, Error> {
let contents = tokio::fs::read_to_string("/proc/meminfo").await?;
let mut mem_info = MemInfo {
@@ -584,6 +598,7 @@ async fn get_mem_info() -> Result<MetricsMemory, Error> {
})
}
#[instrument]
async fn get_disk_info() -> Result<MetricsDisk, Error> {
use crate::util::Invoke;
let mut size_cmd = tokio::process::Command::new("zpool");

View File

@@ -1,5 +1,6 @@
use std::future::Future;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
@@ -14,12 +15,12 @@ use regex::Regex;
use reqwest::Url;
use rpc_toolkit::command;
use sha2::Sha256;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::io::AsyncWriteExt;
use tokio::pin;
use tokio::process::Command;
use tokio::time::Instant;
use tokio_stream::StreamExt;
use tracing::instrument;
use crate::context::RpcContext;
use crate::db::model::{ServerStatus, UpdateProgress};
@@ -35,6 +36,7 @@ lazy_static! {
/// An user/ daemon would call this to update the system to the latest version and do the updates available,
/// and this will return something if there is an update, and in that case there will need to be a restart.
#[command(rename = "update", display(display_properties))]
#[instrument(skip(ctx))]
pub async fn update_system(#[context] ctx: RpcContext) -> Result<UpdateSystem, Error> {
if UPDATED.load(Ordering::SeqCst) {
return Ok(UpdateSystem::NoUpdates);
@@ -79,7 +81,7 @@ struct Boot;
/// we need to know the labels for those types. These labels
/// are the labels that are shipping with the embassy, blue/ green
/// are where the os sits and will do a swap during update.
trait FileType: Copy + Send + Sync + 'static {
trait FileType: std::fmt::Debug + Copy + Send + Sync + 'static {
fn mount_folder(&self) -> PathBuf {
Path::new("/media").join(self.label())
}
@@ -110,6 +112,7 @@ impl FileType for Boot {
}
/// Proven data that this is mounted, should be consumed in an unmount
#[derive(Debug)]
struct MountedResource<X: FileType> {
value: X,
mounted: bool,
@@ -121,6 +124,7 @@ impl<X: FileType> MountedResource<X> {
mounted: true,
}
}
#[instrument]
async fn unmount(value: X) -> Result<(), Error> {
let folder = value.mount_folder();
Command::new("umount")
@@ -132,6 +136,7 @@ impl<X: FileType> MountedResource<X> {
.with_ctx(|_| (crate::ErrorKind::Filesystem, folder.display().to_string()))?;
Ok(())
}
#[instrument]
async fn unmount_label(&mut self) -> Result<(), Error> {
Self::unmount(self.value).await?;
self.mounted = false;
@@ -148,7 +153,7 @@ impl<X: FileType> Drop for MountedResource<X> {
}
/// This will be where we are going to be putting the new update
#[derive(Clone, Copy)]
#[derive(Debug, Clone, Copy)]
struct NewLabel(WritableDrives);
/// This is our current label where the os is running
@@ -158,6 +163,7 @@ lazy_static! {
static ref PARSE_COLOR: Regex = Regex::new("#LABEL=(\\w+) /media/root-ro/").unwrap();
}
#[instrument(skip(ctx))]
async fn maybe_do_update(ctx: RpcContext) -> Result<Option<Arc<Revision>>, Error> {
let mut db = ctx.db.handle();
let latest_version = reqwest::get(format!("{}/eos/latest", ctx.eos_registry_url().await?))
@@ -235,9 +241,9 @@ async fn maybe_do_update(ctx: RpcContext) -> Result<Option<Arc<Revision>>, Error
Err(e) => {
info.status = ServerStatus::Running;
info.save(&mut db).await.expect("could not save status");
drop(db);
ctx.notification_manager
.notify(
&mut db,
None,
NotificationLevel::Error,
"EmbassyOS Update Failed".to_owned(),
@@ -252,6 +258,7 @@ async fn maybe_do_update(ctx: RpcContext) -> Result<Option<Arc<Revision>>, Error
Ok(rev)
}
#[instrument(skip(download))]
async fn do_update(
download: impl Future<Output = Result<(), Error>>,
new_label: NewLabel,
@@ -265,6 +272,7 @@ async fn do_update(
Ok(())
}
#[instrument]
async fn query_mounted_label() -> Result<(NewLabel, CurrentLabel), Error> {
let output = tokio::fs::read_to_string("/etc/fstab")
.await
@@ -294,6 +302,7 @@ async fn query_mounted_label() -> Result<(NewLabel, CurrentLabel), Error> {
}
}
#[derive(Debug)]
struct EosUrl {
base: Url,
version: Version,
@@ -304,6 +313,7 @@ impl std::fmt::Display for EosUrl {
}
}
#[instrument(skip(db))]
async fn download_file<'a, Db: DbHandle + 'a>(
mut db: Db,
eos_url: &EosUrl,
@@ -333,6 +343,7 @@ async fn download_file<'a, Db: DbHandle + 'a>(
}))
}
#[instrument(skip(db, stream_download))]
async fn write_stream_to_label<Db: DbHandle>(
db: &mut Db,
size: Option<u64>,
@@ -370,6 +381,7 @@ async fn write_stream_to_label<Db: DbHandle>(
Ok(hasher.finalize().to_vec())
}
#[instrument]
async fn check_download(hash_from_header: &str, file_digest: Vec<u8>) -> Result<(), Error> {
// if hex::decode(hash_from_header).with_kind(ErrorKind::Network)? != file_digest {
// return Err(Error::new(
@@ -379,6 +391,8 @@ async fn check_download(hash_from_header: &str, file_digest: Vec<u8>) -> Result<
// }
Ok(())
}
#[instrument]
async fn swap_boot_label(
new_label: NewLabel,
mounted_boot: &MountedResource<Boot>,
@@ -409,6 +423,7 @@ async fn swap_boot_label(
Ok(())
}
#[instrument]
async fn mount_label<F>(file_type: F) -> Result<MountedResource<F>, Error>
where
F: FileType,

View File

@@ -35,6 +35,7 @@ sudo umount /tmp/eos-mnt
sudo mount ${OUTPUT_DEVICE}p3 /tmp/eos-mnt
sudo sed -i 's/LABEL=writable/LABEL=green/g' /tmp/eos-mnt/etc/fstab
sudo sed -i 's/LABEL=system-boot \(\S\+\) \(\S\+\) defaults/LABEL=system-boot \1 \2 defaults,ro/g' /tmp/eos-mnt/etc/fstab
# Enter the appmgr directory, copy over the built EmbassyOS binaries and systemd services, edit the nginx config, then create the .ssh directory
cd appmgr/