init, shutdown and restart

This commit is contained in:
Aiden McClelland
2021-09-03 18:00:38 -06:00
committed by Aiden McClelland
parent 6e82b843a5
commit 483e02a41b
14 changed files with 585 additions and 98 deletions

View File

@@ -1,14 +1,53 @@
use embassy::context::rpc::RpcContextConfig;
use embassy::Error;
use std::path::Path;
async fn init(cfg: &RpcContextConfig) -> Result<(), Error> {
// mount disk
if embassy::disk::main::importable().await? {
embassy::disk::main::load("password").await?;
use embassy::context::rpc::RpcContextConfig;
use embassy::context::{RecoveryContext, SetupContext};
use embassy::{Error, ResultExt};
use http::StatusCode;
use rpc_toolkit::rpc_server;
fn status_fn(_: i32) -> StatusCode {
StatusCode::OK
}
async fn init(cfg_path: Option<&str>) -> Result<(), Error> {
let cfg = RpcContextConfig::load(cfg_path).await?;
if tokio::fs::metadata("/boot/embassy-os/disk.guid")
.await
.is_ok()
{
embassy::disk::main::load(
&cfg,
tokio::fs::read_to_string("/boot/embassy-os/disk.guid")
.await?
.trim(),
"password",
)
.await?;
} else {
// embassy::setup::host_setup().await?;
let ctx = SetupContext::init(cfg_path).await?;
rpc_server!({
command: embassy::setup_api,
context: ctx.clone(),
status: status_fn,
middleware: [ ]
})
.with_graceful_shutdown({
let mut shutdown = ctx.shutdown.subscribe();
async move {
shutdown.recv().await.expect("context dropped");
}
})
.await
.with_kind(embassy::ErrorKind::Network)?;
}
embassy::disk::util::bind("/embassy-data/main/logs", "/var/log/journal", false).await?;
embassy::disk::util::bind(
cfg.datadir().join("main").join("logs"),
"/var/log/journal",
false,
)
.await?;
embassy::ssh::sync_keys_from_db(todo!(), "/root/.ssh/authorized_keys").await?;
todo!("sync wifi");
embassy::hostname::sync_hostname().await?;
@@ -16,13 +55,45 @@ async fn init(cfg: &RpcContextConfig) -> Result<(), Error> {
Ok(())
}
// BLOCKING
fn run_script_if_exists<P: AsRef<Path>>(path: P) {
use std::process::Command;
let script = path.as_ref();
if script.exists() {
match Command::new("/bin/bash").arg(script).spawn() {
Ok(mut c) => {
if let Err(e) = c.wait() {
log::error!("Error Running {}: {}", script.display(), e)
}
}
Err(e) => log::error!("Error Running {}: {}", script.display(), e),
}
}
}
async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
if let Err(e) = init(&RpcContextConfig::load(cfg_path).await?).await {
if let Err(e) = init(cfg_path).await {
embassy::sound::BEETHOVEN.play().await?;
log::error!("{}", e.source);
log::debug!("{}", e.source)
log::debug!("{}", e.source);
let ctx = RecoveryContext::init(cfg_path).await?;
rpc_server!({
command: embassy::recovery_api,
context: ctx.clone(),
status: status_fn,
middleware: [ ]
})
.with_graceful_shutdown({
let mut shutdown = ctx.shutdown.subscribe();
async move {
shutdown.recv().await.expect("context dropped");
}
})
.await
.with_kind(embassy::ErrorKind::Network)?;
} else {
embassy::sound::MARIO_COIN.play().await?
embassy::sound::MARIO_COIN.play().await?;
}
Ok(())
@@ -53,11 +124,19 @@ fn main() {
_ => log::LevelFilter::Trace,
});
let cfg_path = matches.value_of("config");
let rt = tokio::runtime::Runtime::new().expect("failed to initialize runtime");
match rt.block_on(inner_main(cfg_path)) {
run_script_if_exists("/boot/embassy-os/preinit.sh");
let res = {
let rt = tokio::runtime::Runtime::new().expect("failed to initialize runtime");
rt.block_on(inner_main(cfg_path))
};
run_script_if_exists("/boot/embassy-os/postinit.sh");
match res {
Ok(_) => (),
Err(e) => {
drop(rt);
eprintln!("{}", e.source);
log::debug!("{:?}", e.source);
drop(e.source);

View File

@@ -8,6 +8,7 @@ use embassy::hostname::{get_hostname, get_id};
use embassy::middleware::auth::auth;
use embassy::middleware::cors::cors;
use embassy::net::tor::{os_key, tor_health_check};
use embassy::shutdown::Shutdown;
use embassy::status::{check_all, synchronize_all};
use embassy::util::daemon;
use embassy::{Error, ErrorKind, ResultExt};
@@ -30,10 +31,9 @@ fn err_to_500(e: Error) -> Response<Body> {
.unwrap()
}
async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
let (shutdown, _) = tokio::sync::broadcast::channel(1);
let rpc_ctx = RpcContext::init(cfg_path, shutdown).await?;
async fn inner_main(cfg_path: Option<&str>) -> Result<Option<Shutdown>, Error> {
let rpc_ctx = RpcContext::init(cfg_path).await?;
let mut shutdown_recv = rpc_ctx.shutdown.subscribe();
let sig_handler_ctx = rpc_ctx.clone();
let sig_handler = tokio::spawn(async move {
@@ -223,7 +223,10 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
sig_handler.abort();
Ok(())
Ok(shutdown_recv
.recv()
.await
.with_kind(crate::ErrorKind::Unknown)?)
}
fn main() {
@@ -251,11 +254,16 @@ fn main() {
_ => log::LevelFilter::Trace,
});
let cfg_path = matches.value_of("config");
let rt = tokio::runtime::Runtime::new().expect("failed to initialize runtime");
match rt.block_on(inner_main(cfg_path)) {
Ok(_) => (),
let res = {
let rt = tokio::runtime::Runtime::new().expect("failed to initialize runtime");
rt.block_on(inner_main(cfg_path))
};
match res {
Ok(None) => (),
Ok(Some(s)) => s.execute(),
Err(e) => {
drop(rt);
eprintln!("{}", e.source);
log::debug!("{:?}", e.source);
drop(e.source);

View File

@@ -1,16 +1,30 @@
pub mod cli;
pub mod recovery;
pub mod rpc;
pub mod setup;
pub use cli::CliContext;
pub use recovery::RecoveryContext;
pub use rpc::RpcContext;
pub use setup::SetupContext;
impl From<CliContext> for () {
fn from(_: CliContext) -> Self {
()
}
}
impl From<RecoveryContext> for () {
fn from(_: RecoveryContext) -> Self {
()
}
}
impl From<RpcContext> for () {
fn from(_: RpcContext) -> Self {
()
}
}
impl From<SetupContext> for () {
fn from(_: SetupContext) -> Self {
()
}
}

View File

@@ -0,0 +1,73 @@
use std::net::{IpAddr, SocketAddr};
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use rpc_toolkit::Context;
use serde::Deserialize;
use tokio::fs::File;
use tokio::sync::broadcast::Sender;
use url::Host;
use crate::util::{from_toml_async_reader, AsyncFileExt};
use crate::{Error, ResultExt};
#[derive(Debug, Default, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct RecoveryContextConfig {
pub bind_rpc: Option<SocketAddr>,
}
impl RecoveryContextConfig {
pub async fn load<P: AsRef<Path>>(path: Option<P>) -> Result<Self, Error> {
let cfg_path = path
.as_ref()
.map(|p| p.as_ref())
.unwrap_or(Path::new(crate::CONFIG_PATH));
if let Some(f) = File::maybe_open(cfg_path)
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, cfg_path.display().to_string()))?
{
from_toml_async_reader(f).await
} else {
Ok(Self::default())
}
}
}
pub struct RecoveryContextSeed {
pub bind_rpc: SocketAddr,
pub shutdown: Sender<()>,
}
#[derive(Clone)]
pub struct RecoveryContext(Arc<RecoveryContextSeed>);
impl RecoveryContext {
pub async fn init<P: AsRef<Path>>(path: Option<P>) -> Result<Self, Error> {
let cfg = RecoveryContextConfig::load(path).await?;
let (shutdown, _) = tokio::sync::broadcast::channel(1);
Ok(Self(Arc::new(RecoveryContextSeed {
bind_rpc: cfg.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()),
shutdown,
})))
}
}
impl Context for RecoveryContext {
fn host(&self) -> Host<&str> {
match self.0.bind_rpc.ip() {
IpAddr::V4(a) => Host::Ipv4(a),
IpAddr::V6(a) => Host::Ipv6(a),
}
}
fn port(&self) -> u16 {
self.0.bind_rpc.port()
}
}
impl Deref for RecoveryContext {
type Target = RecoveryContextSeed;
fn deref(&self) -> &Self::Target {
&*self.0
}
}

View File

@@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::collections::VecDeque;
use std::net::{IpAddr, SocketAddr};
use std::ops::Deref;
@@ -29,6 +30,7 @@ pub struct RpcContextConfig {
pub bind_ws: Option<SocketAddr>,
pub tor_control: Option<SocketAddr>,
pub revision_cache_size: Option<usize>,
pub zfs_pool_name: Option<String>,
pub datadir: Option<PathBuf>,
}
impl RpcContextConfig {
@@ -43,14 +45,20 @@ impl RpcContextConfig {
{
from_toml_async_reader(f).await
} else {
Ok(RpcContextConfig::default())
Ok(Self::default())
}
}
pub fn datadir(&self) -> &Path {
pub fn zfs_pool_name(&self) -> &str {
self.zfs_pool_name
.as_ref()
.map(|s| s.as_str())
.unwrap_or("embassy-data")
}
pub fn datadir(&self) -> Cow<'_, Path> {
self.datadir
.as_ref()
.map(|a| a.as_path())
.unwrap_or_else(|| Path::new("/embassy-data"))
.map(|a| Cow::Borrowed(a.as_path()))
.unwrap_or_else(|| Cow::Owned(Path::new("/").join(self.zfs_pool_name())))
}
pub async fn db(&self) -> Result<PatchDb, Error> {
PatchDb::open(self.datadir().join("main").join("embassy.db"))
@@ -78,6 +86,7 @@ pub struct RpcContextSeed {
pub bind_rpc: SocketAddr,
pub bind_ws: SocketAddr,
pub datadir: PathBuf,
pub zfs_pool_name: Arc<String>,
pub db: PatchDb,
pub secret_store: SqlitePool,
pub docker: Docker,
@@ -92,11 +101,9 @@ pub struct RpcContextSeed {
#[derive(Clone)]
pub struct RpcContext(Arc<RpcContextSeed>);
impl RpcContext {
pub async fn init<P: AsRef<Path>>(
cfg_path: Option<P>,
shutdown: Sender<Option<Shutdown>>,
) -> Result<Self, Error> {
pub async fn init<P: AsRef<Path>>(cfg_path: Option<P>) -> Result<Self, Error> {
let base = RpcContextConfig::load(cfg_path).await?;
let (shutdown, _) = tokio::sync::broadcast::channel(1);
let db = base.db().await?;
let secret_store = base.secret_store().await?;
let docker = Docker::connect_with_unix_defaults()?;
@@ -112,6 +119,7 @@ impl RpcContext {
bind_rpc: base.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()),
bind_ws: base.bind_ws.unwrap_or(([127, 0, 0, 1], 5960).into()),
datadir: base.datadir().to_path_buf(),
zfs_pool_name: Arc::new(base.zfs_pool_name().to_owned()),
db,
secret_store,
docker,

View File

@@ -0,0 +1,71 @@
use std::net::{IpAddr, SocketAddr};
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use rpc_toolkit::Context;
use serde::Deserialize;
use tokio::fs::File;
use tokio::sync::broadcast::Sender;
use url::Host;
use crate::util::{from_toml_async_reader, AsyncFileExt};
use crate::{Error, ResultExt};
#[derive(Debug, Default, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct SetupContextConfig {
pub bind_rpc: Option<SocketAddr>,
}
impl SetupContextConfig {
pub async fn load<P: AsRef<Path>>(path: Option<P>) -> Result<Self, Error> {
let cfg_path = path
.as_ref()
.map(|p| p.as_ref())
.unwrap_or(Path::new(crate::CONFIG_PATH));
if let Some(f) = File::maybe_open(cfg_path)
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, cfg_path.display().to_string()))?
{
from_toml_async_reader(f).await
} else {
Ok(Self::default())
}
}
}
pub struct SetupContextSeed {
pub bind_rpc: SocketAddr,
pub shutdown: Sender<()>,
}
#[derive(Clone)]
pub struct SetupContext(Arc<SetupContextSeed>);
impl SetupContext {
pub async fn init<P: AsRef<Path>>(path: Option<P>) -> Result<Self, Error> {
let cfg = SetupContextConfig::load(path).await?;
let (shutdown, _) = tokio::sync::broadcast::channel(1);
Ok(Self(Arc::new(SetupContextSeed {
bind_rpc: cfg.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()),
shutdown,
})))
}
}
impl Context for SetupContext {
fn host(&self) -> Host<&str> {
match self.0.bind_rpc.ip() {
IpAddr::V4(a) => Host::Ipv4(a),
IpAddr::V6(a) => Host::Ipv6(a),
}
}
fn port(&self) -> u16 {
self.0.bind_rpc.port()
}
}
impl Deref for SetupContext {
type Target = SetupContextSeed;
fn deref(&self) -> &Self::Target {
&*self.0
}
}

View File

@@ -1,45 +1,194 @@
use std::path::Path;
use anyhow::anyhow;
use tokio::process::Command;
use crate::context::rpc::RpcContextConfig;
use crate::util::Invoke;
use crate::Error;
use crate::{Error, ResultExt};
pub async fn importable() -> Result<bool, Error> {
todo!()
pub const PASSWORD_PATH: &'static str = "/etc/embassy/password";
pub async fn create(cfg: &RpcContextConfig, disks: &[&str]) -> Result<String, Error> {
let guid = create_pool(cfg, disks).await?;
create_fs(cfg).await?;
export(cfg).await?;
Ok(guid)
}
pub async fn create(disks: &[&str]) -> Result<(), Error> {
todo!()
pub async fn load(cfg: &RpcContextConfig, guid: &str, password: &str) -> Result<(), Error> {
import(guid).await?;
mount(cfg, password).await?;
Ok(())
}
pub async fn load(password: &str) -> Result<(), Error> {
todo!()
}
pub async fn create_pool(disks: &[&str]) -> Result<(), Error> {
pub async fn create_pool(cfg: &RpcContextConfig, disks: &[&str]) -> Result<String, Error> {
Command::new("zpool")
.arg("create")
.arg("embassy-data")
.arg(cfg.zfs_pool_name())
.args(disks)
.invoke(crate::ErrorKind::Zfs)
.await?;
Ok(())
Ok(String::from_utf8(
Command::new("zpool")
.arg("get")
.arg("-H")
.arg("-ovalue")
.arg("guid")
.arg(cfg.zfs_pool_name())
.invoke(crate::ErrorKind::Zfs)
.await?,
)?)
}
pub async fn create_fs() -> Result<(), Error> {
todo!()
}
pub async fn import() -> Result<(), Error> {
Command::new("zpool")
.arg("import")
.arg("-f")
.arg("embassy-data")
pub async fn create_fs(cfg: &RpcContextConfig) -> Result<(), Error> {
Command::new("zfs")
.arg("create")
.arg("-o")
.arg("reservation=5G")
.arg("-o")
.arg("encryption=on")
.arg("-o")
.arg("keylocation=file:///etc/embassy/password")
.arg("-o")
.arg("keyformat=password")
.arg(format!("{}/main", cfg.zfs_pool_name()))
.invoke(crate::ErrorKind::Zfs)
.await?;
Command::new("zfs")
.arg("create")
.arg("-o")
.arg("encryption=on")
.arg("-o")
.arg("keylocation=file:///etc/embassy/password")
.arg("-o")
.arg("keyformat=password")
.arg(format!("{}/package-data", cfg.zfs_pool_name()))
.invoke(crate::ErrorKind::Zfs)
.await?;
Ok(())
}
pub async fn mount(password: &str) -> Result<(), Error> {
// zfs get -H -ovalue mountpoint embassy-data
todo!()
pub async fn create_swap(cfg: &RpcContextConfig) -> Result<(), Error> {
let pagesize = String::from_utf8(
Command::new("getconf")
.arg("PAGESIZE")
.invoke(crate::ErrorKind::Zfs)
.await?,
)?;
Command::new("zfs")
.arg("create")
.arg("-V8G")
.arg("-b")
.arg(pagesize)
.arg("-o")
.arg("logbias=throughput")
.arg("-o")
.arg("sync=always")
.arg("-o")
.arg("primarycache=metadata")
.arg("-o")
.arg("com.sun:auto-snapshot=false")
.invoke(crate::ErrorKind::Zfs)
.await?;
Command::new("mkswap")
.arg("-f")
.arg(
Path::new("/dev/zvol")
.join(cfg.zfs_pool_name())
.join("swap"),
)
.invoke(crate::ErrorKind::Zfs)
.await?;
Ok(())
}
pub async fn use_swap(cfg: &RpcContextConfig) -> Result<(), Error> {
Command::new("swapon")
.arg(
Path::new("/dev/zvol")
.join(cfg.zfs_pool_name())
.join("swap"),
)
.invoke(crate::ErrorKind::Zfs)
.await?;
Ok(())
}
pub async fn export(cfg: &RpcContextConfig) -> Result<(), Error> {
Command::new("zpool")
.arg("export")
.arg(cfg.zfs_pool_name())
.invoke(crate::ErrorKind::Zfs)
.await?;
Ok(())
}
/// BLOCKING
pub fn export_blocking(pool: &str) -> Result<(), Error> {
let output = std::process::Command::new("zpool")
.arg("export")
.arg(pool)
.output()?;
if !output.status.success() {
Err(Error::new(
anyhow!("{}", String::from_utf8(output.stderr)?),
crate::ErrorKind::Zfs,
))
} else {
Ok(())
}
}
pub async fn import(guid: &str) -> Result<(), Error> {
Command::new("zpool")
.arg("import")
.arg("-f")
.arg(guid)
.invoke(crate::ErrorKind::Zfs)
.await?;
Ok(())
}
pub async fn mount(cfg: &RpcContextConfig, password: &str) -> Result<(), Error> {
let mountpoint = String::from_utf8(
Command::new("zfs")
.arg("get")
.arg("-H")
.arg("-ovalue")
.arg("mountpoint")
.arg(cfg.zfs_pool_name())
.invoke(crate::ErrorKind::Zfs)
.await?,
)?;
if Path::new(mountpoint.trim()) != &cfg.datadir() {
Command::new("zfs")
.arg("set")
.arg(format!("mountpoint={}", cfg.datadir().display()))
.arg(cfg.zfs_pool_name())
.invoke(crate::ErrorKind::Zfs)
.await?;
}
tokio::fs::write(PASSWORD_PATH, password)
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, PASSWORD_PATH))?;
Command::new("zfs")
.arg("load-key")
.arg(format!("{}/main", cfg.zfs_pool_name()))
.invoke(crate::ErrorKind::Zfs)
.await?;
tokio::fs::remove_file(PASSWORD_PATH)
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, PASSWORD_PATH))?;
Command::new("zfs")
.arg("mount")
.arg(format!("{}/main", cfg.zfs_pool_name()))
.invoke(crate::ErrorKind::Zfs)
.await?;
Command::new("zfs")
.arg("mount")
.arg(format!("{}/package-data", cfg.zfs_pool_name()))
.invoke(crate::ErrorKind::Zfs)
.await?;
Ok(())
}

View File

@@ -2,7 +2,9 @@ use digest::Digest;
use tokio::process::Command;
use crate::util::Invoke;
use crate::{Error, ErrorKind};
use crate::{Error, ErrorKind, ResultExt};
pub const PRODUCT_KEY_PATH: &'static str = "/boot/embassy-os/product_key.txt";
pub async fn get_hostname() -> Result<String, Error> {
let out = Command::new("hostname")
@@ -22,7 +24,9 @@ pub async fn set_hostname(hostname: &str) -> Result<(), Error> {
}
pub async fn get_product_key() -> Result<String, Error> {
let out = tokio::fs::read_to_string("/boot/embassy-os/product_key.txt").await?;
let out = tokio::fs::read_to_string(PRODUCT_KEY_PATH)
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, PRODUCT_KEY_PATH))?;
Ok(out.trim().to_owned())
}

View File

@@ -36,8 +36,8 @@ use crate::{Error, ResultExt};
pub mod cleanup;
pub mod progress;
pub const PKG_CACHE: &'static str = "main/cache/packages";
pub const PKG_PUBLIC_DIR: &'static str = "main/public/package-data";
pub const PKG_CACHE: &'static str = "package-data/cache";
pub const PKG_PUBLIC_DIR: &'static str = "package-data/public";
#[command(display(display_none))]
pub async fn install(

View File

@@ -65,6 +65,7 @@ pub fn echo(#[arg] message: String) -> Result<String, RpcError> {
s9pk::pack,
s9pk::verify,
inspect::inspect,
server,
package,
net::net,
auth::auth,
@@ -74,6 +75,11 @@ pub fn main_api() -> Result<(), RpcError> {
Ok(())
}
#[command(subcommands(system::logs, system::metrics, shutdown::shutdown, shutdown::restart))]
pub fn server() -> Result<(), RpcError> {
Ok(())
}
#[command(subcommands(
install::install,
install::uninstall,
@@ -96,3 +102,13 @@ pub fn package() -> Result<(), RpcError> {
pub fn portable_api() -> Result<(), RpcError> {
Ok(())
}
#[command(subcommands(version::git_info, echo,))]
pub fn recovery_api() -> Result<(), RpcError> {
Ok(())
}
#[command(subcommands(version::git_info, echo,))]
pub fn setup_api() -> Result<(), RpcError> {
Ok(())
}

View File

@@ -1,4 +1,3 @@
use std::ascii::AsciiExt;
use std::process::Stdio;
use std::time::{Duration, UNIX_EPOCH};
@@ -13,9 +12,7 @@ use tokio::process::Command;
use tokio_stream::wrappers::LinesStream;
use crate::action::docker::DockerAction;
use crate::context::RpcContext;
use crate::error::ResultExt;
use crate::id::Id;
use crate::s9pk::manifest::PackageId;
use crate::util::Reversible;
use crate::Error;
@@ -67,7 +64,7 @@ pub enum LogSource {
Container(PackageId),
}
fn display_logs(all: LogResponse, _: &ArgMatches<'_>) {
pub fn display_logs(all: LogResponse, _: &ArgMatches<'_>) {
for entry in all.entries.iter() {
println!("{}", entry);
}
@@ -75,13 +72,18 @@ fn display_logs(all: LogResponse, _: &ArgMatches<'_>) {
#[command(display(display_logs))]
pub async fn logs(
#[context] _: RpcContext,
#[arg] id: PackageId,
#[arg] limit: Option<usize>,
#[arg] cursor: Option<String>,
#[arg] before_flag: Option<bool>,
) -> Result<LogResponse, Error> {
Ok(fetch_logs(LogSource::Container(id), limit, cursor, before_flag.unwrap_or(false)).await?)
Ok(fetch_logs(
LogSource::Container(id),
limit,
cursor,
before_flag.unwrap_or(false),
)
.await?)
}
pub async fn fetch_logs(
@@ -93,13 +95,15 @@ pub async fn fetch_logs(
let limit = limit.unwrap_or(50);
let limit_formatted = format!("-n{}", limit);
let mut args = vec!["--output=json","--output-fields=MESSAGE",&limit_formatted,];
let mut args = vec!["--output=json", "--output-fields=MESSAGE", &limit_formatted];
let id_formatted = match id {
LogSource::Service(id)=> {
LogSource::Service(id) => {
args.push("-u");
id.to_owned()
},
LogSource::Container(id) => format!("CONTAINER_NAME={}", DockerAction::container_name(&id, None))
}
LogSource::Container(id) => {
format!("CONTAINER_NAME={}", DockerAction::container_name(&id, None))
}
};
args.push(&id_formatted);
@@ -169,18 +173,19 @@ pub async fn fetch_logs(
#[tokio::test]
pub async fn test_logs() {
let response =
fetch_logs(
// change `tor.service` to an actual journald unit on your machine
// LogSource::Service("tor.service"),
// first run `docker run --name=hello-world.embassy --log-driver=journald hello-world`
LogSource::Container("hello-world".parse().unwrap()),
// Some(5),
None,
None,
// Some("s=1b8c418e28534400856c27b211dd94fd;i=5a7;b=97571c13a1284f87bc0639b5cff5acbe;m=740e916;t=5ca073eea3445;x=f45bc233ca328348".to_owned()),
false,
).await.unwrap();
let response = fetch_logs(
// change `tor.service` to an actual journald unit on your machine
// LogSource::Service("tor.service"),
// first run `docker run --name=hello-world.embassy --log-driver=journald hello-world`
LogSource::Container("hello-world".parse().unwrap()),
// Some(5),
None,
None,
// Some("s=1b8c418e28534400856c27b211dd94fd;i=5a7;b=97571c13a1284f87bc0639b5cff5acbe;m=740e916;t=5ca073eea3445;x=f45bc233ca328348".to_owned()),
false,
)
.await
.unwrap();
let serialized = serde_json::to_string_pretty(&response).unwrap();
println!("{}", serialized);
}

View File

@@ -1,4 +1,52 @@
use std::sync::Arc;
use rpc_toolkit::command;
use crate::context::RpcContext;
use crate::disk::main::export_blocking;
use crate::util::display_none;
use crate::Error;
#[derive(Debug, Clone)]
pub struct Shutdown {
zfs_pool: Arc<String>,
restart: bool,
}
impl Shutdown {
/// BLOCKING
pub fn execute(&self) {
use std::process::Command;
if let Err(e) = export_blocking(&self.zfs_pool) {
log::error!("Error Exporting ZFS Pool: {}", e);
}
if self.restart {
Command::new("reboot").spawn().unwrap().wait().unwrap();
} else {
Command::new("shutdown")
.arg("now")
.spawn()
.unwrap()
.wait()
.unwrap();
}
}
}
#[command(display(display_none))]
pub async fn shutdown(#[context] ctx: RpcContext) -> Result<(), Error> {
ctx.shutdown.send(Some(Shutdown {
zfs_pool: ctx.zfs_pool_name.clone(),
restart: false,
}));
Ok(())
}
#[command(display(display_none))]
pub async fn restart(#[context] ctx: RpcContext) -> Result<(), Error> {
ctx.shutdown.send(Some(Shutdown {
zfs_pool: ctx.zfs_pool_name.clone(),
restart: true,
}));
Ok(())
}

View File

@@ -1,43 +1,50 @@
use std::fmt;
use rpc_toolkit::command;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use crate::context::RpcContext;
use crate::logs::{LogResponse, LogSource, fetch_logs};
use crate::{Error, ErrorKind, ResultExt};
use crate::logs::{display_logs, fetch_logs, LogResponse, LogSource};
use crate::util::{display_serializable, IoFormat};
use crate::{Error, ErrorKind};
pub const SYSTEMD_UNIT: &'static str = "embassyd";
#[command(rpc_only)]
#[command(display(display_logs))]
pub async fn logs(
#[context] ctx: RpcContext,
#[arg] limit: Option<usize>,
#[arg] cursor: Option<String>,
#[arg] before_flag: Option<bool>,
) -> Result<LogResponse, Error> {
Ok(fetch_logs(LogSource::Service(SYSTEMD_UNIT), limit, cursor, before_flag.unwrap_or(false)).await?)
Ok(fetch_logs(
LogSource::Service(SYSTEMD_UNIT),
limit,
cursor,
before_flag.unwrap_or(false),
)
.await?)
}
#[derive(serde::Serialize, Clone, Debug)]
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct Celsius(f64);
impl fmt::Display for Celsius {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:.1}°C", self.0)
}
}
#[derive(serde::Serialize, Clone, Debug)]
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct Percentage(f64);
#[derive(serde::Serialize, Clone, Debug)]
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct MebiBytes(f64);
#[derive(serde::Serialize, Clone, Debug)]
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct GigaBytes(f64);
#[derive(serde::Serialize, Clone, Debug)]
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct MetricsGeneral {
temperature: Celsius,
}
#[derive(serde::Serialize, Clone, Debug)]
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct MetricsMemory {
percentage_used: Percentage,
total: MebiBytes,
@@ -47,7 +54,7 @@ pub struct MetricsMemory {
swap_free: MebiBytes,
swap_used: MebiBytes,
}
#[derive(serde::Serialize, Clone, Debug)]
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct MetricsCpu {
user_space: Percentage,
kernel_space: Percentage,
@@ -55,14 +62,14 @@ pub struct MetricsCpu {
idle: Percentage,
usage: Percentage,
}
#[derive(serde::Serialize, Clone, Debug)]
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct MetricsDisk {
size: GigaBytes,
used: GigaBytes,
available: GigaBytes,
used_percentage: Percentage,
}
#[derive(serde::Serialize, Clone, Debug)]
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct Metrics {
general: MetricsGeneral,
memory: MetricsMemory,
@@ -70,8 +77,13 @@ pub struct Metrics {
disk: MetricsDisk,
}
#[command(rpc_only)]
pub async fn metrics(#[context] ctx: RpcContext) -> Result<Metrics, Error> {
#[command(display(display_serializable))]
pub async fn metrics(
#[context] ctx: RpcContext,
#[allow(unused_variables)]
#[arg(long = "format")]
format: Option<IoFormat>,
) -> Result<Metrics, Error> {
match ctx.metrics_cache.read().await.clone() {
None => Err(Error {
source: anyhow::anyhow!("No Metrics Found"),

View File

@@ -13,7 +13,7 @@ use crate::s9pk::manifest::PackageId;
use crate::util::Version;
use crate::Error;
pub const PKG_VOLUME_DIR: &'static str = "main/volumes/package-data";
pub const PKG_VOLUME_DIR: &'static str = "package-data/volumes";
pub const BACKUP_DIR: &'static str = "/mnt/embassy-os-backups/EmbassyBackups";
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]