diff --git a/appmgr/src/bin/embassy-init.rs b/appmgr/src/bin/embassy-init.rs index 8f883b25c..b9cd00b3e 100644 --- a/appmgr/src/bin/embassy-init.rs +++ b/appmgr/src/bin/embassy-init.rs @@ -1,14 +1,53 @@ -use embassy::context::rpc::RpcContextConfig; -use embassy::Error; +use std::path::Path; -async fn init(cfg: &RpcContextConfig) -> Result<(), Error> { - // mount disk - if embassy::disk::main::importable().await? { - embassy::disk::main::load("password").await?; +use embassy::context::rpc::RpcContextConfig; +use embassy::context::{RecoveryContext, SetupContext}; +use embassy::{Error, ResultExt}; +use http::StatusCode; +use rpc_toolkit::rpc_server; + +fn status_fn(_: i32) -> StatusCode { + StatusCode::OK +} + +async fn init(cfg_path: Option<&str>) -> Result<(), Error> { + let cfg = RpcContextConfig::load(cfg_path).await?; + if tokio::fs::metadata("/boot/embassy-os/disk.guid") + .await + .is_ok() + { + embassy::disk::main::load( + &cfg, + tokio::fs::read_to_string("/boot/embassy-os/disk.guid") + .await? + .trim(), + "password", + ) + .await?; } else { - // embassy::setup::host_setup().await?; + let ctx = SetupContext::init(cfg_path).await?; + rpc_server!({ + command: embassy::setup_api, + context: ctx.clone(), + status: status_fn, + middleware: [ ] + }) + .with_graceful_shutdown({ + let mut shutdown = ctx.shutdown.subscribe(); + async move { + shutdown.recv().await.expect("context dropped"); + } + }) + .await + .with_kind(embassy::ErrorKind::Network)?; } - embassy::disk::util::bind("/embassy-data/main/logs", "/var/log/journal", false).await?; + + embassy::disk::util::bind( + cfg.datadir().join("main").join("logs"), + "/var/log/journal", + false, + ) + .await?; embassy::ssh::sync_keys_from_db(todo!(), "/root/.ssh/authorized_keys").await?; todo!("sync wifi"); embassy::hostname::sync_hostname().await?; @@ -16,13 +55,45 @@ async fn init(cfg: &RpcContextConfig) -> Result<(), Error> { Ok(()) } +// BLOCKING +fn run_script_if_exists>(path: P) { + use std::process::Command; + + let script = path.as_ref(); + if script.exists() { + match Command::new("/bin/bash").arg(script).spawn() { + Ok(mut c) => { + if let Err(e) = c.wait() { + log::error!("Error Running {}: {}", script.display(), e) + } + } + Err(e) => log::error!("Error Running {}: {}", script.display(), e), + } + } +} + async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> { - if let Err(e) = init(&RpcContextConfig::load(cfg_path).await?).await { + if let Err(e) = init(cfg_path).await { embassy::sound::BEETHOVEN.play().await?; log::error!("{}", e.source); - log::debug!("{}", e.source) + log::debug!("{}", e.source); + let ctx = RecoveryContext::init(cfg_path).await?; + rpc_server!({ + command: embassy::recovery_api, + context: ctx.clone(), + status: status_fn, + middleware: [ ] + }) + .with_graceful_shutdown({ + let mut shutdown = ctx.shutdown.subscribe(); + async move { + shutdown.recv().await.expect("context dropped"); + } + }) + .await + .with_kind(embassy::ErrorKind::Network)?; } else { - embassy::sound::MARIO_COIN.play().await? + embassy::sound::MARIO_COIN.play().await?; } Ok(()) @@ -53,11 +124,19 @@ fn main() { _ => log::LevelFilter::Trace, }); let cfg_path = matches.value_of("config"); - let rt = tokio::runtime::Runtime::new().expect("failed to initialize runtime"); - match rt.block_on(inner_main(cfg_path)) { + + run_script_if_exists("/boot/embassy-os/preinit.sh"); + + let res = { + let rt = tokio::runtime::Runtime::new().expect("failed to initialize runtime"); + rt.block_on(inner_main(cfg_path)) + }; + + run_script_if_exists("/boot/embassy-os/postinit.sh"); + + match res { Ok(_) => (), Err(e) => { - drop(rt); eprintln!("{}", e.source); log::debug!("{:?}", e.source); drop(e.source); diff --git a/appmgr/src/bin/embassyd.rs b/appmgr/src/bin/embassyd.rs index e4911b239..f25d3f924 100644 --- a/appmgr/src/bin/embassyd.rs +++ b/appmgr/src/bin/embassyd.rs @@ -8,6 +8,7 @@ use embassy::hostname::{get_hostname, get_id}; use embassy::middleware::auth::auth; use embassy::middleware::cors::cors; use embassy::net::tor::{os_key, tor_health_check}; +use embassy::shutdown::Shutdown; use embassy::status::{check_all, synchronize_all}; use embassy::util::daemon; use embassy::{Error, ErrorKind, ResultExt}; @@ -30,10 +31,9 @@ fn err_to_500(e: Error) -> Response { .unwrap() } -async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> { - let (shutdown, _) = tokio::sync::broadcast::channel(1); - - let rpc_ctx = RpcContext::init(cfg_path, shutdown).await?; +async fn inner_main(cfg_path: Option<&str>) -> Result, Error> { + let rpc_ctx = RpcContext::init(cfg_path).await?; + let mut shutdown_recv = rpc_ctx.shutdown.subscribe(); let sig_handler_ctx = rpc_ctx.clone(); let sig_handler = tokio::spawn(async move { @@ -223,7 +223,10 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> { sig_handler.abort(); - Ok(()) + Ok(shutdown_recv + .recv() + .await + .with_kind(crate::ErrorKind::Unknown)?) } fn main() { @@ -251,11 +254,16 @@ fn main() { _ => log::LevelFilter::Trace, }); let cfg_path = matches.value_of("config"); - let rt = tokio::runtime::Runtime::new().expect("failed to initialize runtime"); - match rt.block_on(inner_main(cfg_path)) { - Ok(_) => (), + + let res = { + let rt = tokio::runtime::Runtime::new().expect("failed to initialize runtime"); + rt.block_on(inner_main(cfg_path)) + }; + + match res { + Ok(None) => (), + Ok(Some(s)) => s.execute(), Err(e) => { - drop(rt); eprintln!("{}", e.source); log::debug!("{:?}", e.source); drop(e.source); diff --git a/appmgr/src/context/mod.rs b/appmgr/src/context/mod.rs index a3417a634..28175f7f1 100644 --- a/appmgr/src/context/mod.rs +++ b/appmgr/src/context/mod.rs @@ -1,16 +1,30 @@ pub mod cli; +pub mod recovery; pub mod rpc; +pub mod setup; pub use cli::CliContext; +pub use recovery::RecoveryContext; pub use rpc::RpcContext; +pub use setup::SetupContext; impl From for () { fn from(_: CliContext) -> Self { () } } +impl From for () { + fn from(_: RecoveryContext) -> Self { + () + } +} impl From for () { fn from(_: RpcContext) -> Self { () } } +impl From for () { + fn from(_: SetupContext) -> Self { + () + } +} diff --git a/appmgr/src/context/recovery.rs b/appmgr/src/context/recovery.rs new file mode 100644 index 000000000..1731e5bb1 --- /dev/null +++ b/appmgr/src/context/recovery.rs @@ -0,0 +1,73 @@ +use std::net::{IpAddr, SocketAddr}; +use std::ops::Deref; +use std::path::Path; +use std::sync::Arc; + +use rpc_toolkit::Context; +use serde::Deserialize; +use tokio::fs::File; +use tokio::sync::broadcast::Sender; +use url::Host; + +use crate::util::{from_toml_async_reader, AsyncFileExt}; +use crate::{Error, ResultExt}; + +#[derive(Debug, Default, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub struct RecoveryContextConfig { + pub bind_rpc: Option, +} +impl RecoveryContextConfig { + pub async fn load>(path: Option

) -> Result { + let cfg_path = path + .as_ref() + .map(|p| p.as_ref()) + .unwrap_or(Path::new(crate::CONFIG_PATH)); + if let Some(f) = File::maybe_open(cfg_path) + .await + .with_ctx(|_| (crate::ErrorKind::Filesystem, cfg_path.display().to_string()))? + { + from_toml_async_reader(f).await + } else { + Ok(Self::default()) + } + } +} + +pub struct RecoveryContextSeed { + pub bind_rpc: SocketAddr, + pub shutdown: Sender<()>, +} + +#[derive(Clone)] +pub struct RecoveryContext(Arc); +impl RecoveryContext { + pub async fn init>(path: Option

) -> Result { + let cfg = RecoveryContextConfig::load(path).await?; + + let (shutdown, _) = tokio::sync::broadcast::channel(1); + + Ok(Self(Arc::new(RecoveryContextSeed { + bind_rpc: cfg.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()), + shutdown, + }))) + } +} + +impl Context for RecoveryContext { + fn host(&self) -> Host<&str> { + match self.0.bind_rpc.ip() { + IpAddr::V4(a) => Host::Ipv4(a), + IpAddr::V6(a) => Host::Ipv6(a), + } + } + fn port(&self) -> u16 { + self.0.bind_rpc.port() + } +} +impl Deref for RecoveryContext { + type Target = RecoveryContextSeed; + fn deref(&self) -> &Self::Target { + &*self.0 + } +} diff --git a/appmgr/src/context/rpc.rs b/appmgr/src/context/rpc.rs index 9f2b69b07..8e7cccb4c 100644 --- a/appmgr/src/context/rpc.rs +++ b/appmgr/src/context/rpc.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::collections::VecDeque; use std::net::{IpAddr, SocketAddr}; use std::ops::Deref; @@ -29,6 +30,7 @@ pub struct RpcContextConfig { pub bind_ws: Option, pub tor_control: Option, pub revision_cache_size: Option, + pub zfs_pool_name: Option, pub datadir: Option, } impl RpcContextConfig { @@ -43,14 +45,20 @@ impl RpcContextConfig { { from_toml_async_reader(f).await } else { - Ok(RpcContextConfig::default()) + Ok(Self::default()) } } - pub fn datadir(&self) -> &Path { + pub fn zfs_pool_name(&self) -> &str { + self.zfs_pool_name + .as_ref() + .map(|s| s.as_str()) + .unwrap_or("embassy-data") + } + pub fn datadir(&self) -> Cow<'_, Path> { self.datadir .as_ref() - .map(|a| a.as_path()) - .unwrap_or_else(|| Path::new("/embassy-data")) + .map(|a| Cow::Borrowed(a.as_path())) + .unwrap_or_else(|| Cow::Owned(Path::new("/").join(self.zfs_pool_name()))) } pub async fn db(&self) -> Result { PatchDb::open(self.datadir().join("main").join("embassy.db")) @@ -78,6 +86,7 @@ pub struct RpcContextSeed { pub bind_rpc: SocketAddr, pub bind_ws: SocketAddr, pub datadir: PathBuf, + pub zfs_pool_name: Arc, pub db: PatchDb, pub secret_store: SqlitePool, pub docker: Docker, @@ -92,11 +101,9 @@ pub struct RpcContextSeed { #[derive(Clone)] pub struct RpcContext(Arc); impl RpcContext { - pub async fn init>( - cfg_path: Option

, - shutdown: Sender>, - ) -> Result { + pub async fn init>(cfg_path: Option

) -> Result { let base = RpcContextConfig::load(cfg_path).await?; + let (shutdown, _) = tokio::sync::broadcast::channel(1); let db = base.db().await?; let secret_store = base.secret_store().await?; let docker = Docker::connect_with_unix_defaults()?; @@ -112,6 +119,7 @@ impl RpcContext { bind_rpc: base.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()), bind_ws: base.bind_ws.unwrap_or(([127, 0, 0, 1], 5960).into()), datadir: base.datadir().to_path_buf(), + zfs_pool_name: Arc::new(base.zfs_pool_name().to_owned()), db, secret_store, docker, diff --git a/appmgr/src/context/setup.rs b/appmgr/src/context/setup.rs new file mode 100644 index 000000000..a0d93c2b6 --- /dev/null +++ b/appmgr/src/context/setup.rs @@ -0,0 +1,71 @@ +use std::net::{IpAddr, SocketAddr}; +use std::ops::Deref; +use std::path::Path; +use std::sync::Arc; + +use rpc_toolkit::Context; +use serde::Deserialize; +use tokio::fs::File; +use tokio::sync::broadcast::Sender; +use url::Host; + +use crate::util::{from_toml_async_reader, AsyncFileExt}; +use crate::{Error, ResultExt}; + +#[derive(Debug, Default, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub struct SetupContextConfig { + pub bind_rpc: Option, +} +impl SetupContextConfig { + pub async fn load>(path: Option

) -> Result { + let cfg_path = path + .as_ref() + .map(|p| p.as_ref()) + .unwrap_or(Path::new(crate::CONFIG_PATH)); + if let Some(f) = File::maybe_open(cfg_path) + .await + .with_ctx(|_| (crate::ErrorKind::Filesystem, cfg_path.display().to_string()))? + { + from_toml_async_reader(f).await + } else { + Ok(Self::default()) + } + } +} + +pub struct SetupContextSeed { + pub bind_rpc: SocketAddr, + pub shutdown: Sender<()>, +} + +#[derive(Clone)] +pub struct SetupContext(Arc); +impl SetupContext { + pub async fn init>(path: Option

) -> Result { + let cfg = SetupContextConfig::load(path).await?; + let (shutdown, _) = tokio::sync::broadcast::channel(1); + Ok(Self(Arc::new(SetupContextSeed { + bind_rpc: cfg.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()), + shutdown, + }))) + } +} + +impl Context for SetupContext { + fn host(&self) -> Host<&str> { + match self.0.bind_rpc.ip() { + IpAddr::V4(a) => Host::Ipv4(a), + IpAddr::V6(a) => Host::Ipv6(a), + } + } + fn port(&self) -> u16 { + self.0.bind_rpc.port() + } +} +impl Deref for SetupContext { + type Target = SetupContextSeed; + fn deref(&self) -> &Self::Target { + &*self.0 + } +} diff --git a/appmgr/src/disk/main.rs b/appmgr/src/disk/main.rs index 669590d3c..13153cb22 100644 --- a/appmgr/src/disk/main.rs +++ b/appmgr/src/disk/main.rs @@ -1,45 +1,194 @@ +use std::path::Path; + +use anyhow::anyhow; use tokio::process::Command; +use crate::context::rpc::RpcContextConfig; use crate::util::Invoke; -use crate::Error; +use crate::{Error, ResultExt}; -pub async fn importable() -> Result { - todo!() +pub const PASSWORD_PATH: &'static str = "/etc/embassy/password"; + +pub async fn create(cfg: &RpcContextConfig, disks: &[&str]) -> Result { + let guid = create_pool(cfg, disks).await?; + create_fs(cfg).await?; + export(cfg).await?; + Ok(guid) } -pub async fn create(disks: &[&str]) -> Result<(), Error> { - todo!() +pub async fn load(cfg: &RpcContextConfig, guid: &str, password: &str) -> Result<(), Error> { + import(guid).await?; + mount(cfg, password).await?; + Ok(()) } -pub async fn load(password: &str) -> Result<(), Error> { - todo!() -} - -pub async fn create_pool(disks: &[&str]) -> Result<(), Error> { +pub async fn create_pool(cfg: &RpcContextConfig, disks: &[&str]) -> Result { Command::new("zpool") .arg("create") - .arg("embassy-data") + .arg(cfg.zfs_pool_name()) .args(disks) .invoke(crate::ErrorKind::Zfs) .await?; - Ok(()) + Ok(String::from_utf8( + Command::new("zpool") + .arg("get") + .arg("-H") + .arg("-ovalue") + .arg("guid") + .arg(cfg.zfs_pool_name()) + .invoke(crate::ErrorKind::Zfs) + .await?, + )?) } -pub async fn create_fs() -> Result<(), Error> { - todo!() -} - -pub async fn import() -> Result<(), Error> { - Command::new("zpool") - .arg("import") - .arg("-f") - .arg("embassy-data") +pub async fn create_fs(cfg: &RpcContextConfig) -> Result<(), Error> { + Command::new("zfs") + .arg("create") + .arg("-o") + .arg("reservation=5G") + .arg("-o") + .arg("encryption=on") + .arg("-o") + .arg("keylocation=file:///etc/embassy/password") + .arg("-o") + .arg("keyformat=password") + .arg(format!("{}/main", cfg.zfs_pool_name())) + .invoke(crate::ErrorKind::Zfs) + .await?; + Command::new("zfs") + .arg("create") + .arg("-o") + .arg("encryption=on") + .arg("-o") + .arg("keylocation=file:///etc/embassy/password") + .arg("-o") + .arg("keyformat=password") + .arg(format!("{}/package-data", cfg.zfs_pool_name())) .invoke(crate::ErrorKind::Zfs) .await?; Ok(()) } -pub async fn mount(password: &str) -> Result<(), Error> { - // zfs get -H -ovalue mountpoint embassy-data - todo!() +pub async fn create_swap(cfg: &RpcContextConfig) -> Result<(), Error> { + let pagesize = String::from_utf8( + Command::new("getconf") + .arg("PAGESIZE") + .invoke(crate::ErrorKind::Zfs) + .await?, + )?; + Command::new("zfs") + .arg("create") + .arg("-V8G") + .arg("-b") + .arg(pagesize) + .arg("-o") + .arg("logbias=throughput") + .arg("-o") + .arg("sync=always") + .arg("-o") + .arg("primarycache=metadata") + .arg("-o") + .arg("com.sun:auto-snapshot=false") + .invoke(crate::ErrorKind::Zfs) + .await?; + Command::new("mkswap") + .arg("-f") + .arg( + Path::new("/dev/zvol") + .join(cfg.zfs_pool_name()) + .join("swap"), + ) + .invoke(crate::ErrorKind::Zfs) + .await?; + Ok(()) +} + +pub async fn use_swap(cfg: &RpcContextConfig) -> Result<(), Error> { + Command::new("swapon") + .arg( + Path::new("/dev/zvol") + .join(cfg.zfs_pool_name()) + .join("swap"), + ) + .invoke(crate::ErrorKind::Zfs) + .await?; + Ok(()) +} + +pub async fn export(cfg: &RpcContextConfig) -> Result<(), Error> { + Command::new("zpool") + .arg("export") + .arg(cfg.zfs_pool_name()) + .invoke(crate::ErrorKind::Zfs) + .await?; + Ok(()) +} + +/// BLOCKING +pub fn export_blocking(pool: &str) -> Result<(), Error> { + let output = std::process::Command::new("zpool") + .arg("export") + .arg(pool) + .output()?; + if !output.status.success() { + Err(Error::new( + anyhow!("{}", String::from_utf8(output.stderr)?), + crate::ErrorKind::Zfs, + )) + } else { + Ok(()) + } +} + +pub async fn import(guid: &str) -> Result<(), Error> { + Command::new("zpool") + .arg("import") + .arg("-f") + .arg(guid) + .invoke(crate::ErrorKind::Zfs) + .await?; + Ok(()) +} + +pub async fn mount(cfg: &RpcContextConfig, password: &str) -> Result<(), Error> { + let mountpoint = String::from_utf8( + Command::new("zfs") + .arg("get") + .arg("-H") + .arg("-ovalue") + .arg("mountpoint") + .arg(cfg.zfs_pool_name()) + .invoke(crate::ErrorKind::Zfs) + .await?, + )?; + if Path::new(mountpoint.trim()) != &cfg.datadir() { + Command::new("zfs") + .arg("set") + .arg(format!("mountpoint={}", cfg.datadir().display())) + .arg(cfg.zfs_pool_name()) + .invoke(crate::ErrorKind::Zfs) + .await?; + } + tokio::fs::write(PASSWORD_PATH, password) + .await + .with_ctx(|_| (crate::ErrorKind::Filesystem, PASSWORD_PATH))?; + Command::new("zfs") + .arg("load-key") + .arg(format!("{}/main", cfg.zfs_pool_name())) + .invoke(crate::ErrorKind::Zfs) + .await?; + tokio::fs::remove_file(PASSWORD_PATH) + .await + .with_ctx(|_| (crate::ErrorKind::Filesystem, PASSWORD_PATH))?; + Command::new("zfs") + .arg("mount") + .arg(format!("{}/main", cfg.zfs_pool_name())) + .invoke(crate::ErrorKind::Zfs) + .await?; + Command::new("zfs") + .arg("mount") + .arg(format!("{}/package-data", cfg.zfs_pool_name())) + .invoke(crate::ErrorKind::Zfs) + .await?; + Ok(()) } diff --git a/appmgr/src/hostname.rs b/appmgr/src/hostname.rs index 8e4b9e1c7..88eadea4f 100644 --- a/appmgr/src/hostname.rs +++ b/appmgr/src/hostname.rs @@ -2,7 +2,9 @@ use digest::Digest; use tokio::process::Command; use crate::util::Invoke; -use crate::{Error, ErrorKind}; +use crate::{Error, ErrorKind, ResultExt}; + +pub const PRODUCT_KEY_PATH: &'static str = "/boot/embassy-os/product_key.txt"; pub async fn get_hostname() -> Result { let out = Command::new("hostname") @@ -22,7 +24,9 @@ pub async fn set_hostname(hostname: &str) -> Result<(), Error> { } pub async fn get_product_key() -> Result { - let out = tokio::fs::read_to_string("/boot/embassy-os/product_key.txt").await?; + let out = tokio::fs::read_to_string(PRODUCT_KEY_PATH) + .await + .with_ctx(|_| (crate::ErrorKind::Filesystem, PRODUCT_KEY_PATH))?; Ok(out.trim().to_owned()) } diff --git a/appmgr/src/install/mod.rs b/appmgr/src/install/mod.rs index 277d6a262..50bc7d5ab 100644 --- a/appmgr/src/install/mod.rs +++ b/appmgr/src/install/mod.rs @@ -36,8 +36,8 @@ use crate::{Error, ResultExt}; pub mod cleanup; pub mod progress; -pub const PKG_CACHE: &'static str = "main/cache/packages"; -pub const PKG_PUBLIC_DIR: &'static str = "main/public/package-data"; +pub const PKG_CACHE: &'static str = "package-data/cache"; +pub const PKG_PUBLIC_DIR: &'static str = "package-data/public"; #[command(display(display_none))] pub async fn install( diff --git a/appmgr/src/lib.rs b/appmgr/src/lib.rs index 898c51b6b..b8c8fa171 100644 --- a/appmgr/src/lib.rs +++ b/appmgr/src/lib.rs @@ -65,6 +65,7 @@ pub fn echo(#[arg] message: String) -> Result { s9pk::pack, s9pk::verify, inspect::inspect, + server, package, net::net, auth::auth, @@ -74,6 +75,11 @@ pub fn main_api() -> Result<(), RpcError> { Ok(()) } +#[command(subcommands(system::logs, system::metrics, shutdown::shutdown, shutdown::restart))] +pub fn server() -> Result<(), RpcError> { + Ok(()) +} + #[command(subcommands( install::install, install::uninstall, @@ -96,3 +102,13 @@ pub fn package() -> Result<(), RpcError> { pub fn portable_api() -> Result<(), RpcError> { Ok(()) } + +#[command(subcommands(version::git_info, echo,))] +pub fn recovery_api() -> Result<(), RpcError> { + Ok(()) +} + +#[command(subcommands(version::git_info, echo,))] +pub fn setup_api() -> Result<(), RpcError> { + Ok(()) +} diff --git a/appmgr/src/logs.rs b/appmgr/src/logs.rs index a4224de09..bb8c84895 100644 --- a/appmgr/src/logs.rs +++ b/appmgr/src/logs.rs @@ -1,4 +1,3 @@ -use std::ascii::AsciiExt; use std::process::Stdio; use std::time::{Duration, UNIX_EPOCH}; @@ -13,9 +12,7 @@ use tokio::process::Command; use tokio_stream::wrappers::LinesStream; use crate::action::docker::DockerAction; -use crate::context::RpcContext; use crate::error::ResultExt; -use crate::id::Id; use crate::s9pk::manifest::PackageId; use crate::util::Reversible; use crate::Error; @@ -67,7 +64,7 @@ pub enum LogSource { Container(PackageId), } -fn display_logs(all: LogResponse, _: &ArgMatches<'_>) { +pub fn display_logs(all: LogResponse, _: &ArgMatches<'_>) { for entry in all.entries.iter() { println!("{}", entry); } @@ -75,13 +72,18 @@ fn display_logs(all: LogResponse, _: &ArgMatches<'_>) { #[command(display(display_logs))] pub async fn logs( - #[context] _: RpcContext, #[arg] id: PackageId, #[arg] limit: Option, #[arg] cursor: Option, #[arg] before_flag: Option, ) -> Result { - Ok(fetch_logs(LogSource::Container(id), limit, cursor, before_flag.unwrap_or(false)).await?) + Ok(fetch_logs( + LogSource::Container(id), + limit, + cursor, + before_flag.unwrap_or(false), + ) + .await?) } pub async fn fetch_logs( @@ -93,13 +95,15 @@ pub async fn fetch_logs( let limit = limit.unwrap_or(50); let limit_formatted = format!("-n{}", limit); - let mut args = vec!["--output=json","--output-fields=MESSAGE",&limit_formatted,]; + let mut args = vec!["--output=json", "--output-fields=MESSAGE", &limit_formatted]; let id_formatted = match id { - LogSource::Service(id)=> { + LogSource::Service(id) => { args.push("-u"); id.to_owned() - }, - LogSource::Container(id) => format!("CONTAINER_NAME={}", DockerAction::container_name(&id, None)) + } + LogSource::Container(id) => { + format!("CONTAINER_NAME={}", DockerAction::container_name(&id, None)) + } }; args.push(&id_formatted); @@ -169,18 +173,19 @@ pub async fn fetch_logs( #[tokio::test] pub async fn test_logs() { - let response = - fetch_logs( - // change `tor.service` to an actual journald unit on your machine - // LogSource::Service("tor.service"), - // first run `docker run --name=hello-world.embassy --log-driver=journald hello-world` - LogSource::Container("hello-world".parse().unwrap()), - // Some(5), - None, - None, - // Some("s=1b8c418e28534400856c27b211dd94fd;i=5a7;b=97571c13a1284f87bc0639b5cff5acbe;m=740e916;t=5ca073eea3445;x=f45bc233ca328348".to_owned()), - false, - ).await.unwrap(); + let response = fetch_logs( + // change `tor.service` to an actual journald unit on your machine + // LogSource::Service("tor.service"), + // first run `docker run --name=hello-world.embassy --log-driver=journald hello-world` + LogSource::Container("hello-world".parse().unwrap()), + // Some(5), + None, + None, + // Some("s=1b8c418e28534400856c27b211dd94fd;i=5a7;b=97571c13a1284f87bc0639b5cff5acbe;m=740e916;t=5ca073eea3445;x=f45bc233ca328348".to_owned()), + false, + ) + .await + .unwrap(); let serialized = serde_json::to_string_pretty(&response).unwrap(); println!("{}", serialized); } diff --git a/appmgr/src/shutdown.rs b/appmgr/src/shutdown.rs index 3392edf9a..558922397 100644 --- a/appmgr/src/shutdown.rs +++ b/appmgr/src/shutdown.rs @@ -1,4 +1,52 @@ +use std::sync::Arc; + +use rpc_toolkit::command; + +use crate::context::RpcContext; +use crate::disk::main::export_blocking; +use crate::util::display_none; +use crate::Error; + #[derive(Debug, Clone)] pub struct Shutdown { + zfs_pool: Arc, restart: bool, } +impl Shutdown { + /// BLOCKING + pub fn execute(&self) { + use std::process::Command; + + if let Err(e) = export_blocking(&self.zfs_pool) { + log::error!("Error Exporting ZFS Pool: {}", e); + } + if self.restart { + Command::new("reboot").spawn().unwrap().wait().unwrap(); + } else { + Command::new("shutdown") + .arg("now") + .spawn() + .unwrap() + .wait() + .unwrap(); + } + } +} + +#[command(display(display_none))] +pub async fn shutdown(#[context] ctx: RpcContext) -> Result<(), Error> { + ctx.shutdown.send(Some(Shutdown { + zfs_pool: ctx.zfs_pool_name.clone(), + restart: false, + })); + Ok(()) +} + +#[command(display(display_none))] +pub async fn restart(#[context] ctx: RpcContext) -> Result<(), Error> { + ctx.shutdown.send(Some(Shutdown { + zfs_pool: ctx.zfs_pool_name.clone(), + restart: true, + })); + Ok(()) +} diff --git a/appmgr/src/system.rs b/appmgr/src/system.rs index 05643e775..78f7e0b99 100644 --- a/appmgr/src/system.rs +++ b/appmgr/src/system.rs @@ -1,43 +1,50 @@ use std::fmt; use rpc_toolkit::command; +use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; use crate::context::RpcContext; -use crate::logs::{LogResponse, LogSource, fetch_logs}; -use crate::{Error, ErrorKind, ResultExt}; +use crate::logs::{display_logs, fetch_logs, LogResponse, LogSource}; +use crate::util::{display_serializable, IoFormat}; +use crate::{Error, ErrorKind}; pub const SYSTEMD_UNIT: &'static str = "embassyd"; -#[command(rpc_only)] +#[command(display(display_logs))] pub async fn logs( - #[context] ctx: RpcContext, #[arg] limit: Option, #[arg] cursor: Option, #[arg] before_flag: Option, ) -> Result { - Ok(fetch_logs(LogSource::Service(SYSTEMD_UNIT), limit, cursor, before_flag.unwrap_or(false)).await?) + Ok(fetch_logs( + LogSource::Service(SYSTEMD_UNIT), + limit, + cursor, + before_flag.unwrap_or(false), + ) + .await?) } -#[derive(serde::Serialize, Clone, Debug)] +#[derive(Deserialize, Serialize, Clone, Debug)] pub struct Celsius(f64); impl fmt::Display for Celsius { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{:.1}°C", self.0) } } -#[derive(serde::Serialize, Clone, Debug)] +#[derive(Deserialize, Serialize, Clone, Debug)] pub struct Percentage(f64); -#[derive(serde::Serialize, Clone, Debug)] +#[derive(Deserialize, Serialize, Clone, Debug)] pub struct MebiBytes(f64); -#[derive(serde::Serialize, Clone, Debug)] +#[derive(Deserialize, Serialize, Clone, Debug)] pub struct GigaBytes(f64); -#[derive(serde::Serialize, Clone, Debug)] +#[derive(Deserialize, Serialize, Clone, Debug)] pub struct MetricsGeneral { temperature: Celsius, } -#[derive(serde::Serialize, Clone, Debug)] +#[derive(Deserialize, Serialize, Clone, Debug)] pub struct MetricsMemory { percentage_used: Percentage, total: MebiBytes, @@ -47,7 +54,7 @@ pub struct MetricsMemory { swap_free: MebiBytes, swap_used: MebiBytes, } -#[derive(serde::Serialize, Clone, Debug)] +#[derive(Deserialize, Serialize, Clone, Debug)] pub struct MetricsCpu { user_space: Percentage, kernel_space: Percentage, @@ -55,14 +62,14 @@ pub struct MetricsCpu { idle: Percentage, usage: Percentage, } -#[derive(serde::Serialize, Clone, Debug)] +#[derive(Deserialize, Serialize, Clone, Debug)] pub struct MetricsDisk { size: GigaBytes, used: GigaBytes, available: GigaBytes, used_percentage: Percentage, } -#[derive(serde::Serialize, Clone, Debug)] +#[derive(Deserialize, Serialize, Clone, Debug)] pub struct Metrics { general: MetricsGeneral, memory: MetricsMemory, @@ -70,8 +77,13 @@ pub struct Metrics { disk: MetricsDisk, } -#[command(rpc_only)] -pub async fn metrics(#[context] ctx: RpcContext) -> Result { +#[command(display(display_serializable))] +pub async fn metrics( + #[context] ctx: RpcContext, + #[allow(unused_variables)] + #[arg(long = "format")] + format: Option, +) -> Result { match ctx.metrics_cache.read().await.clone() { None => Err(Error { source: anyhow::anyhow!("No Metrics Found"), diff --git a/appmgr/src/volume.rs b/appmgr/src/volume.rs index 06f934057..7e27140dd 100644 --- a/appmgr/src/volume.rs +++ b/appmgr/src/volume.rs @@ -13,7 +13,7 @@ use crate::s9pk::manifest::PackageId; use crate::util::Version; use crate::Error; -pub const PKG_VOLUME_DIR: &'static str = "main/volumes/package-data"; +pub const PKG_VOLUME_DIR: &'static str = "package-data/volumes"; pub const BACKUP_DIR: &'static str = "/mnt/embassy-os-backups/EmbassyBackups"; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]