diff --git a/appmgr/Cargo.lock b/appmgr/Cargo.lock index e76ecc05a..1497ecd4b 100644 --- a/appmgr/Cargo.lock +++ b/appmgr/Cargo.lock @@ -848,6 +848,7 @@ dependencies = [ "sha2", "simple-logging", "sqlx", + "stderrlog", "tar", "thiserror", "tokio 1.11.0", @@ -2958,6 +2959,19 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stderrlog" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45a53e2eff3e94a019afa6265e8ee04cb05b9d33fe9f5078b14e4e391d155a38" +dependencies = [ + "atty", + "chrono", + "log", + "termcolor", + "thread_local", +] + [[package]] name = "stdweb" version = "0.4.20" @@ -3183,6 +3197,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "thread_local" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" +dependencies = [ + "lazy_static", +] + [[package]] name = "time" version = "0.1.44" diff --git a/appmgr/Cargo.toml b/appmgr/Cargo.toml index ee9310eeb..18e1b7c03 100644 --- a/appmgr/Cargo.toml +++ b/appmgr/Cargo.toml @@ -104,6 +104,7 @@ sqlx = { version = "0.5", features = [ "runtime-tokio-rustls", "sqlite", ] } +stderrlog = "0.5.1" tar = "0.4.35" thiserror = "1.0.24" tokio = { version = "1.11.0", features = ["full"] } diff --git a/appmgr/src/bin/embassyd.rs b/appmgr/src/bin/embassyd.rs index e80555c4e..7de49b6c0 100644 --- a/appmgr/src/bin/embassyd.rs +++ b/appmgr/src/bin/embassyd.rs @@ -13,6 +13,7 @@ use embassy::status::{check_all, synchronize_all}; use embassy::util::daemon; use embassy::{Error, ErrorKind, ResultExt}; use futures::{FutureExt, TryFutureExt}; +use log::LevelFilter; use patch_db::json_ptr::JsonPointer; use reqwest::{Client, Proxy}; use rpc_toolkit::hyper::{Body, Response, Server, StatusCode}; @@ -31,8 +32,11 @@ fn err_to_500(e: Error) -> Response { .unwrap() } -async fn inner_main(cfg_path: Option<&str>) -> Result, Error> { - let rpc_ctx = RpcContext::init(cfg_path).await?; +async fn inner_main( + cfg_path: Option<&str>, + log_level: LevelFilter, +) -> Result, Error> { + let rpc_ctx = RpcContext::init(cfg_path, log_level).await?; let mut shutdown_recv = rpc_ctx.shutdown.subscribe(); let sig_handler_ctx = rpc_ctx.clone(); @@ -231,14 +235,15 @@ fn main() { ) .get_matches(); - simple_logging::log_to_stderr(match matches.occurrences_of("verbosity") { - 0 => log::LevelFilter::Off, - 1 => log::LevelFilter::Error, - 2 => log::LevelFilter::Warn, - 3 => log::LevelFilter::Info, - 4 => log::LevelFilter::Debug, + // initializes the bootstrap logger, this will be replaced with the EmbassyLogger later + let filter = match matches.occurrences_of("verbosity") { + 0 => log::LevelFilter::Error, + 1 => log::LevelFilter::Warn, + 2 => log::LevelFilter::Info, + 3 => log::LevelFilter::Debug, _ => log::LevelFilter::Trace, - }); + }; + simple_logging::log_to_stderr(filter); let cfg_path = matches.value_of("config"); let res = { @@ -246,7 +251,7 @@ fn main() { .enable_all() .build() .expect("failed to initialize runtime"); - rt.block_on(inner_main(cfg_path)) + rt.block_on(inner_main(cfg_path, filter)) }; match res { diff --git a/appmgr/src/context/rpc.rs b/appmgr/src/context/rpc.rs index b47c952ca..e55e9dbaf 100644 --- a/appmgr/src/context/rpc.rs +++ b/appmgr/src/context/rpc.rs @@ -7,6 +7,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize}; use std::sync::Arc; use bollard::Docker; +use log::LevelFilter; use patch_db::json_ptr::JsonPointer; use patch_db::{PatchDb, Revision}; use reqwest::Url; @@ -25,6 +26,7 @@ use crate::manager::ManagerMap; use crate::net::tor::os_key; use crate::net::NetController; use crate::shutdown::Shutdown; +use crate::util::logger::EmbassyLogger; use crate::util::{from_toml_async_reader, AsyncFileExt}; use crate::{Error, ResultExt}; @@ -37,6 +39,7 @@ pub struct RpcContextConfig { pub revision_cache_size: Option, pub zfs_pool_name: Option, pub datadir: Option, + pub log_server: Option, } impl RpcContextConfig { pub async fn load>(path: Option

) -> Result { @@ -116,17 +119,33 @@ pub struct RpcContextSeed { pub metrics_cache: RwLock>, pub shutdown: Sender>, pub websocket_count: AtomicUsize, - pub session_id: AtomicU64, + pub logger: EmbassyLogger, + pub log_epoch: Arc, } #[derive(Clone)] pub struct RpcContext(Arc); impl RpcContext { - pub async fn init>(cfg_path: Option

) -> Result { + pub async fn init>( + cfg_path: Option

, + log_level: LevelFilter, + ) -> Result { let base = RpcContextConfig::load(cfg_path).await?; let (shutdown, _) = tokio::sync::broadcast::channel(1); let secret_store = base.secret_store().await?; let db = base.db(&secret_store).await?; + let share = crate::db::DatabaseModel::new() + .server_info() + .share_stats() + .get(&mut db.handle(), true) + .await?; + let log_epoch = Arc::new(AtomicU64::new(rand::random())); + let logger = EmbassyLogger::new( + log_level, + log_epoch.clone(), + base.log_server.clone(), + *share, + ); let docker = Docker::connect_with_unix_defaults()?; let net_controller = NetController::init( ([127, 0, 0, 1], 80).into(), @@ -151,7 +170,8 @@ impl RpcContext { metrics_cache: RwLock::new(None), shutdown, websocket_count: AtomicUsize::new(0), - session_id: AtomicU64::new(rand::random()), + logger, + log_epoch, }); let res = Self(seed); res.managers diff --git a/appmgr/src/db/mod.rs b/appmgr/src/db/mod.rs index 0bc9fb392..94c720b19 100644 --- a/appmgr/src/db/mod.rs +++ b/appmgr/src/db/mod.rs @@ -44,7 +44,7 @@ async fn ws_handler< .websocket_count .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); if new_count == 0 { - ctx.session_id + ctx.log_epoch .store(rand::random(), std::sync::atomic::Ordering::SeqCst) } () diff --git a/appmgr/src/db/model.rs b/appmgr/src/db/model.rs index af458cbb1..0545527cd 100644 --- a/appmgr/src/db/model.rs +++ b/appmgr/src/db/model.rs @@ -55,6 +55,7 @@ impl Database { tor: Vec::new(), clearnet: Vec::new(), }, + share_stats: false, }, package_data: AllPackageData::default(), broken_packages: Vec::new(), @@ -82,6 +83,7 @@ pub struct ServerInfo { unread_notification_count: u64, specs: ServerSpecs, connection_addresses: ConnectionAddresses, + share_stats: bool, } #[derive(Debug, Deserialize, Serialize)] diff --git a/appmgr/src/lib.rs b/appmgr/src/lib.rs index 31438b281..3bbf73f0c 100644 --- a/appmgr/src/lib.rs +++ b/appmgr/src/lib.rs @@ -68,7 +68,13 @@ pub fn main_api() -> Result<(), RpcError> { Ok(()) } -#[command(subcommands(system::logs, system::metrics, shutdown::shutdown, shutdown::restart))] +#[command(subcommands( + system::config, + system::logs, + system::metrics, + shutdown::shutdown, + shutdown::restart +))] pub fn server() -> Result<(), RpcError> { Ok(()) } diff --git a/appmgr/src/system.rs b/appmgr/src/system.rs index 78f7e0b99..aea9b1e7b 100644 --- a/appmgr/src/system.rs +++ b/appmgr/src/system.rs @@ -455,6 +455,22 @@ async fn get_disk_info() -> Result { })? } +#[command(subcommands(share_stats))] +pub async fn config() -> Result<(), Error> { + Ok(()) +} + +#[command(display(display_serializable))] +async fn share_stats(#[context] ctx: RpcContext, #[arg] value: bool) -> Result<(), Error> { + crate::db::DatabaseModel::new() + .server_info() + .share_stats() + .put(&mut ctx.db.handle(), &value) + .await?; + ctx.logger.set_sharing(value); + Ok(()) +} + #[tokio::test] pub async fn test_get_temp() { println!("{}", get_temp().await.unwrap()) diff --git a/appmgr/src/util/logger.rs b/appmgr/src/util/logger.rs new file mode 100644 index 000000000..5dd3219c3 --- /dev/null +++ b/appmgr/src/util/logger.rs @@ -0,0 +1,85 @@ +use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; + +use log::{set_boxed_logger, LevelFilter, Metadata, Record}; +use reqwest::{Client, Url}; +use stderrlog::{StdErrLog, Timestamp}; + +#[derive(Clone)] +pub struct EmbassyLogger { + log_level: log::LevelFilter, + log_epoch: Arc, + logger: StdErrLog, + sharing: Arc, + share_dest: Url, +} +impl EmbassyLogger { + pub fn new( + log_level: log::LevelFilter, + log_epoch: Arc, + share_dest: Option, + share_errors: bool, + ) -> Self { + let share_dest = match share_dest { + None => Url::parse("https://beta-registry-0-3.start9labs.com/error-logs").unwrap(), // TODO + Some(a) => a, + }; + let mut logger = stderrlog::new(); + logger + .module(module_path!()) + .timestamp(Timestamp::Millisecond); + match log_level { + LevelFilter::Off => logger.quiet(true), + LevelFilter::Error => logger.verbosity(0), + LevelFilter::Warn => logger.verbosity(1), + LevelFilter::Info => logger.verbosity(2), + LevelFilter::Debug => logger.verbosity(3), + LevelFilter::Trace => logger.verbosity(4), + }; + let embassy_logger = EmbassyLogger { + log_level, + log_epoch, + logger, + sharing: Arc::new(AtomicBool::new(share_errors)), + share_dest: share_dest, + }; + set_boxed_logger(Box::new(embassy_logger.clone())).unwrap(); + embassy_logger + } + pub fn set_sharing(&self, sharing: bool) { + self.sharing.store(sharing, Ordering::SeqCst) + } +} + +impl log::Log for EmbassyLogger { + fn enabled(&self, metadata: &Metadata) -> bool { + self.logger.enabled(metadata) + } + fn log(&self, record: &Record) { + self.logger.log(record); + if self.sharing.load(Ordering::SeqCst) { + if record.level() <= log::Level::Warn { + let mut body = HashMap::new(); + body.insert( + "log-epoch", + format!("{}", self.log_epoch.load(Ordering::SeqCst)), + ); + body.insert("log-message", format!("{}", record.args())); + // we don't care about the result and need it to be fast + tokio::spawn( + Client::new() + .post(self.share_dest.clone()) + .json(&body) + .send(), + ); + } + } + } + fn flush(&self) {} +} + +#[tokio::test] +pub async fn order_level() { + assert!(log::Level::Warn > log::Level::Error) +} diff --git a/appmgr/src/util/mod.rs b/appmgr/src/util/mod.rs index cf962ed96..1f5c300fa 100644 --- a/appmgr/src/util/mod.rs +++ b/appmgr/src/util/mod.rs @@ -21,6 +21,8 @@ use tokio::task::{JoinError, JoinHandle}; use crate::shutdown::Shutdown; use crate::{Error, ResultExt as _}; +pub mod logger; + #[derive(Clone, Copy, Debug)] pub enum Never {} impl Never {}