implements error log reporting (#464)

* implements error log reporting

* changes api post variables, includes warnings
This commit is contained in:
Keagan McClelland
2021-09-13 11:49:12 -06:00
committed by Aiden McClelland
parent 561e09016d
commit 6cb706785d
10 changed files with 175 additions and 15 deletions

View File

@@ -13,6 +13,7 @@ use embassy::status::{check_all, synchronize_all};
use embassy::util::daemon;
use embassy::{Error, ErrorKind, ResultExt};
use futures::{FutureExt, TryFutureExt};
use log::LevelFilter;
use patch_db::json_ptr::JsonPointer;
use reqwest::{Client, Proxy};
use rpc_toolkit::hyper::{Body, Response, Server, StatusCode};
@@ -31,8 +32,11 @@ fn err_to_500(e: Error) -> Response<Body> {
.unwrap()
}
async fn inner_main(cfg_path: Option<&str>) -> Result<Option<Shutdown>, Error> {
let rpc_ctx = RpcContext::init(cfg_path).await?;
async fn inner_main(
cfg_path: Option<&str>,
log_level: LevelFilter,
) -> Result<Option<Shutdown>, Error> {
let rpc_ctx = RpcContext::init(cfg_path, log_level).await?;
let mut shutdown_recv = rpc_ctx.shutdown.subscribe();
let sig_handler_ctx = rpc_ctx.clone();
@@ -231,14 +235,15 @@ fn main() {
)
.get_matches();
simple_logging::log_to_stderr(match matches.occurrences_of("verbosity") {
0 => log::LevelFilter::Off,
1 => log::LevelFilter::Error,
2 => log::LevelFilter::Warn,
3 => log::LevelFilter::Info,
4 => log::LevelFilter::Debug,
// initializes the bootstrap logger, this will be replaced with the EmbassyLogger later
let filter = match matches.occurrences_of("verbosity") {
0 => log::LevelFilter::Error,
1 => log::LevelFilter::Warn,
2 => log::LevelFilter::Info,
3 => log::LevelFilter::Debug,
_ => log::LevelFilter::Trace,
});
};
simple_logging::log_to_stderr(filter);
let cfg_path = matches.value_of("config");
let res = {
@@ -246,7 +251,7 @@ fn main() {
.enable_all()
.build()
.expect("failed to initialize runtime");
rt.block_on(inner_main(cfg_path))
rt.block_on(inner_main(cfg_path, filter))
};
match res {

View File

@@ -7,6 +7,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize};
use std::sync::Arc;
use bollard::Docker;
use log::LevelFilter;
use patch_db::json_ptr::JsonPointer;
use patch_db::{PatchDb, Revision};
use reqwest::Url;
@@ -25,6 +26,7 @@ use crate::manager::ManagerMap;
use crate::net::tor::os_key;
use crate::net::NetController;
use crate::shutdown::Shutdown;
use crate::util::logger::EmbassyLogger;
use crate::util::{from_toml_async_reader, AsyncFileExt};
use crate::{Error, ResultExt};
@@ -37,6 +39,7 @@ pub struct RpcContextConfig {
pub revision_cache_size: Option<usize>,
pub zfs_pool_name: Option<String>,
pub datadir: Option<PathBuf>,
pub log_server: Option<Url>,
}
impl RpcContextConfig {
pub async fn load<P: AsRef<Path>>(path: Option<P>) -> Result<Self, Error> {
@@ -116,17 +119,33 @@ pub struct RpcContextSeed {
pub metrics_cache: RwLock<Option<crate::system::Metrics>>,
pub shutdown: Sender<Option<Shutdown>>,
pub websocket_count: AtomicUsize,
pub session_id: AtomicU64,
pub logger: EmbassyLogger,
pub log_epoch: Arc<AtomicU64>,
}
#[derive(Clone)]
pub struct RpcContext(Arc<RpcContextSeed>);
impl RpcContext {
pub async fn init<P: AsRef<Path>>(cfg_path: Option<P>) -> Result<Self, Error> {
pub async fn init<P: AsRef<Path>>(
cfg_path: Option<P>,
log_level: LevelFilter,
) -> Result<Self, Error> {
let base = RpcContextConfig::load(cfg_path).await?;
let (shutdown, _) = tokio::sync::broadcast::channel(1);
let secret_store = base.secret_store().await?;
let db = base.db(&secret_store).await?;
let share = crate::db::DatabaseModel::new()
.server_info()
.share_stats()
.get(&mut db.handle(), true)
.await?;
let log_epoch = Arc::new(AtomicU64::new(rand::random()));
let logger = EmbassyLogger::new(
log_level,
log_epoch.clone(),
base.log_server.clone(),
*share,
);
let docker = Docker::connect_with_unix_defaults()?;
let net_controller = NetController::init(
([127, 0, 0, 1], 80).into(),
@@ -151,7 +170,8 @@ impl RpcContext {
metrics_cache: RwLock::new(None),
shutdown,
websocket_count: AtomicUsize::new(0),
session_id: AtomicU64::new(rand::random()),
logger,
log_epoch,
});
let res = Self(seed);
res.managers

View File

@@ -44,7 +44,7 @@ async fn ws_handler<
.websocket_count
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
if new_count == 0 {
ctx.session_id
ctx.log_epoch
.store(rand::random(), std::sync::atomic::Ordering::SeqCst)
}
()

View File

@@ -55,6 +55,7 @@ impl Database {
tor: Vec::new(),
clearnet: Vec::new(),
},
share_stats: false,
},
package_data: AllPackageData::default(),
broken_packages: Vec::new(),
@@ -82,6 +83,7 @@ pub struct ServerInfo {
unread_notification_count: u64,
specs: ServerSpecs,
connection_addresses: ConnectionAddresses,
share_stats: bool,
}
#[derive(Debug, Deserialize, Serialize)]

View File

@@ -68,7 +68,13 @@ pub fn main_api() -> Result<(), RpcError> {
Ok(())
}
#[command(subcommands(system::logs, system::metrics, shutdown::shutdown, shutdown::restart))]
#[command(subcommands(
system::config,
system::logs,
system::metrics,
shutdown::shutdown,
shutdown::restart
))]
pub fn server() -> Result<(), RpcError> {
Ok(())
}

View File

@@ -455,6 +455,22 @@ async fn get_disk_info() -> Result<MetricsDisk, Error> {
})?
}
#[command(subcommands(share_stats))]
pub async fn config() -> Result<(), Error> {
Ok(())
}
#[command(display(display_serializable))]
async fn share_stats(#[context] ctx: RpcContext, #[arg] value: bool) -> Result<(), Error> {
crate::db::DatabaseModel::new()
.server_info()
.share_stats()
.put(&mut ctx.db.handle(), &value)
.await?;
ctx.logger.set_sharing(value);
Ok(())
}
#[tokio::test]
pub async fn test_get_temp() {
println!("{}", get_temp().await.unwrap())

85
appmgr/src/util/logger.rs Normal file
View File

@@ -0,0 +1,85 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use log::{set_boxed_logger, LevelFilter, Metadata, Record};
use reqwest::{Client, Url};
use stderrlog::{StdErrLog, Timestamp};
#[derive(Clone)]
pub struct EmbassyLogger {
log_level: log::LevelFilter,
log_epoch: Arc<AtomicU64>,
logger: StdErrLog,
sharing: Arc<AtomicBool>,
share_dest: Url,
}
impl EmbassyLogger {
pub fn new(
log_level: log::LevelFilter,
log_epoch: Arc<AtomicU64>,
share_dest: Option<Url>,
share_errors: bool,
) -> Self {
let share_dest = match share_dest {
None => Url::parse("https://beta-registry-0-3.start9labs.com/error-logs").unwrap(), // TODO
Some(a) => a,
};
let mut logger = stderrlog::new();
logger
.module(module_path!())
.timestamp(Timestamp::Millisecond);
match log_level {
LevelFilter::Off => logger.quiet(true),
LevelFilter::Error => logger.verbosity(0),
LevelFilter::Warn => logger.verbosity(1),
LevelFilter::Info => logger.verbosity(2),
LevelFilter::Debug => logger.verbosity(3),
LevelFilter::Trace => logger.verbosity(4),
};
let embassy_logger = EmbassyLogger {
log_level,
log_epoch,
logger,
sharing: Arc::new(AtomicBool::new(share_errors)),
share_dest: share_dest,
};
set_boxed_logger(Box::new(embassy_logger.clone())).unwrap();
embassy_logger
}
pub fn set_sharing(&self, sharing: bool) {
self.sharing.store(sharing, Ordering::SeqCst)
}
}
impl log::Log for EmbassyLogger {
fn enabled(&self, metadata: &Metadata) -> bool {
self.logger.enabled(metadata)
}
fn log(&self, record: &Record) {
self.logger.log(record);
if self.sharing.load(Ordering::SeqCst) {
if record.level() <= log::Level::Warn {
let mut body = HashMap::new();
body.insert(
"log-epoch",
format!("{}", self.log_epoch.load(Ordering::SeqCst)),
);
body.insert("log-message", format!("{}", record.args()));
// we don't care about the result and need it to be fast
tokio::spawn(
Client::new()
.post(self.share_dest.clone())
.json(&body)
.send(),
);
}
}
}
fn flush(&self) {}
}
#[tokio::test]
pub async fn order_level() {
assert!(log::Level::Warn > log::Level::Error)
}

View File

@@ -21,6 +21,8 @@ use tokio::task::{JoinError, JoinHandle};
use crate::shutdown::Shutdown;
use crate::{Error, ResultExt as _};
pub mod logger;
#[derive(Clone, Copy, Debug)]
pub enum Never {}
impl Never {}