diff --git a/appmgr/src/error.rs b/appmgr/src/error.rs index ddd22713b..c7c4be34e 100644 --- a/appmgr/src/error.rs +++ b/appmgr/src/error.rs @@ -52,6 +52,7 @@ pub enum ErrorKind { ParseTimestamp = 44, ParseSysInfo = 45, WifiError = 46, + Journald = 47, } impl ErrorKind { pub fn as_str(&self) -> &'static str { @@ -103,6 +104,7 @@ impl ErrorKind { ParseTimestamp => "Timestamp Parsing Error", ParseSysInfo => "System Info Parsing Error", WifiError => "Wifi Internal Error", + Journald => "Journald Error", } } } diff --git a/appmgr/src/lib.rs b/appmgr/src/lib.rs index d7b330ca5..02c8b86f0 100644 --- a/appmgr/src/lib.rs +++ b/appmgr/src/lib.rs @@ -31,6 +31,7 @@ pub mod hostname; pub mod id; pub mod inspect; pub mod install; +pub mod logs; pub mod manager; pub mod middleware; pub mod migration; @@ -77,7 +78,8 @@ pub fn main_api() -> Result<(), RpcError> { install::uninstall, config::config, control::start, - control::stop + control::stop, + logs::logs, ))] pub fn package() -> Result<(), RpcError> { Ok(()) diff --git a/appmgr/src/logs.rs b/appmgr/src/logs.rs new file mode 100644 index 000000000..a4224de09 --- /dev/null +++ b/appmgr/src/logs.rs @@ -0,0 +1,186 @@ +use std::ascii::AsciiExt; +use std::process::Stdio; +use std::time::{Duration, UNIX_EPOCH}; + +use anyhow::anyhow; +use chrono::{DateTime, Utc}; +use clap::ArgMatches; +use futures::TryStreamExt; +use rpc_toolkit::command; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::Command; +use tokio_stream::wrappers::LinesStream; + +use crate::action::docker::DockerAction; +use crate::context::RpcContext; +use crate::error::ResultExt; +use crate::id::Id; +use crate::s9pk::manifest::PackageId; +use crate::util::Reversible; +use crate::Error; + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct LogResponse { + entries: Reversible, + start_cursor: Option, + end_cursor: Option, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +pub struct LogEntry { + timestamp: DateTime, + message: String, +} +impl std::fmt::Display for LogEntry { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{} {}", self.timestamp, self.message) + } +} + +#[derive(Serialize, Deserialize, Debug)] +struct JournalctlEntry { + #[serde(rename = "__REALTIME_TIMESTAMP")] + timestamp: String, + #[serde(rename = "MESSAGE")] + message: String, + #[serde(rename = "__CURSOR")] + cursor: String, +} +impl JournalctlEntry { + fn log_entry(self) -> Result<(String, LogEntry), Error> { + Ok(( + self.cursor, + LogEntry { + timestamp: DateTime::::from( + UNIX_EPOCH + Duration::from_micros(self.timestamp.parse::()?), + ), + message: self.message, + }, + )) + } +} + +pub enum LogSource { + Service(&'static str), + Container(PackageId), +} + +fn display_logs(all: LogResponse, _: &ArgMatches<'_>) { + for entry in all.entries.iter() { + println!("{}", entry); + } +} + +#[command(display(display_logs))] +pub async fn logs( + #[context] _: RpcContext, + #[arg] id: PackageId, + #[arg] limit: Option, + #[arg] cursor: Option, + #[arg] before_flag: Option, +) -> Result { + Ok(fetch_logs(LogSource::Container(id), limit, cursor, before_flag.unwrap_or(false)).await?) +} + +pub async fn fetch_logs( + id: LogSource, + limit: Option, + cursor: Option, + before_flag: bool, +) -> Result { + let limit = limit.unwrap_or(50); + let limit_formatted = format!("-n{}", limit); + + let mut args = vec!["--output=json","--output-fields=MESSAGE",&limit_formatted,]; + let id_formatted = match id { + LogSource::Service(id)=> { + args.push("-u"); + id.to_owned() + }, + LogSource::Container(id) => format!("CONTAINER_NAME={}", DockerAction::container_name(&id, None)) + }; + args.push(&id_formatted); + + let cursor_formatted = format!("--after-cursor={}", cursor.clone().unwrap_or("".to_owned())); + let mut get_prev_logs_and_reverse = false; + if cursor.is_some() { + args.push(&cursor_formatted); + if before_flag { + get_prev_logs_and_reverse = true; + } + } + if get_prev_logs_and_reverse { + args.push("--reverse"); + } + + let mut child = Command::new("journalctl") + .args(args) + .stdout(Stdio::piped()) + .spawn()?; + let out = + BufReader::new(child.stdout.take().ok_or_else(|| { + Error::new(anyhow!("No stdout available"), crate::ErrorKind::Journald) + })?); + + let journalctl_entries = LinesStream::new(out.lines()); + + let mut deserialized_entries = journalctl_entries + .map_err(|e| Error::new(e, crate::ErrorKind::Journald)) + .and_then(|s| { + futures::future::ready( + serde_json::from_str::(&s) + .with_kind(crate::ErrorKind::Deserialization), + ) + }); + + let mut entries = Vec::with_capacity(limit); + let mut start_cursor = None; + + if let Some(first) = deserialized_entries.try_next().await? { + let (cursor, entry) = first.log_entry()?; + start_cursor = Some(cursor); + entries.push(entry); + } + + let (mut end_cursor, entries) = deserialized_entries + .try_fold( + (start_cursor.clone(), entries), + |(_, mut acc), entry| async move { + let (cursor, entry) = entry.log_entry()?; + acc.push(entry); + Ok((Some(cursor), acc)) + }, + ) + .await?; + let mut entries = Reversible::new(entries); + // reverse again so output is always in increasing chronological order + if get_prev_logs_and_reverse { + entries.reverse(); + std::mem::swap(&mut start_cursor, &mut end_cursor); + } + Ok(LogResponse { + entries, + start_cursor, + end_cursor, + }) +} + +#[tokio::test] +pub async fn test_logs() { + let response = + fetch_logs( + // change `tor.service` to an actual journald unit on your machine + // LogSource::Service("tor.service"), + // first run `docker run --name=hello-world.embassy --log-driver=journald hello-world` + LogSource::Container("hello-world".parse().unwrap()), + // Some(5), + None, + None, + // Some("s=1b8c418e28534400856c27b211dd94fd;i=5a7;b=97571c13a1284f87bc0639b5cff5acbe;m=740e916;t=5ca073eea3445;x=f45bc233ca328348".to_owned()), + false, + ).await.unwrap(); + let serialized = serde_json::to_string_pretty(&response).unwrap(); + println!("{}", serialized); +} diff --git a/appmgr/src/system.rs b/appmgr/src/system.rs index 21f3ea699..05643e775 100644 --- a/appmgr/src/system.rs +++ b/appmgr/src/system.rs @@ -1,61 +1,22 @@ use std::fmt; -use chrono::{DateTime, Utc}; -use clap::ArgMatches; use rpc_toolkit::command; -use tokio::process::Command; use tokio::sync::RwLock; use crate::context::RpcContext; +use crate::logs::{LogResponse, LogSource, fetch_logs}; use crate::{Error, ErrorKind, ResultExt}; pub const SYSTEMD_UNIT: &'static str = "embassyd"; -fn parse_datetime(text: &str, _matches: &ArgMatches) -> Result, Error> { - text.parse().with_kind(ErrorKind::ParseTimestamp) -} - #[command(rpc_only)] pub async fn logs( - #[context] _ctx: RpcContext, - #[arg(parse(crate::system::parse_datetime))] before: Option>, + #[context] ctx: RpcContext, #[arg] limit: Option, -) -> Result, Error> { - let before = before.unwrap_or(Utc::now()); - let limit = limit.unwrap_or(50); - // Journalctl has unexpected behavior where "until" does not play well with "lines" unless the output is reversed. - // Keep this in mind if you are changing the code below - let out = Command::new("journalctl") - .args(&[ - "-u", - SYSTEMD_UNIT, - &format!( - "-U\"{} {} UTC\"", - before.date().naive_utc(), - before.time().format("%H:%M:%S") - ), - "--output=short-iso", - "--no-hostname", - "--utc", - "--reverse", - &format!("-n{}", limit), - ]) - .output() - .await? - .stdout; - let out_string = String::from_utf8(out)?; - let lines = out_string.lines(); - let mut split_lines = lines - .skip(1) // ditch the journalctl header - .map(|s| { - // split the timestamp off from the log line - let (ts, l) = s.split_once(" ").unwrap(); - (ts.to_owned(), l.to_owned()) - }) - .collect::>(); - // reverse output again because we reversed it above - split_lines.reverse(); - Ok(split_lines) + #[arg] cursor: Option, + #[arg] before_flag: Option, +) -> Result { + Ok(fetch_logs(LogSource::Service(SYSTEMD_UNIT), limit, cursor, before_flag.unwrap_or(false)).await?) } #[derive(serde::Serialize, Clone, Debug)] @@ -482,15 +443,6 @@ async fn get_disk_info() -> Result { })? } -#[test] -pub fn test_datetime_output() { - println!( - "{} {} UTC", - Utc::now().date().naive_utc(), - Utc::now().time().format("%H:%M:%S") - ) -} - #[tokio::test] pub async fn test_get_temp() { println!("{}", get_temp().await.unwrap()) diff --git a/appmgr/src/util/mod.rs b/appmgr/src/util/mod.rs index 959981606..10532e16d 100644 --- a/appmgr/src/util/mod.rs +++ b/appmgr/src/util/mod.rs @@ -898,3 +898,104 @@ impl Serialize for Port { serialize_display(&self.0, serializer) } } + +pub trait IntoDoubleEndedIterator: IntoIterator { + type IntoIter: Iterator + DoubleEndedIterator; + fn into_iter(self) -> >::IntoIter; +} +impl IntoDoubleEndedIterator for T +where + T: IntoIterator, + ::IntoIter: DoubleEndedIterator, +{ + type IntoIter = ::IntoIter; + fn into_iter(self) -> >::IntoIter { + IntoIterator::into_iter(self) + } +} + +#[derive(Debug, Clone)] +pub struct Reversible> +where + for<'a> &'a Container: IntoDoubleEndedIterator<&'a T>, +{ + reversed: bool, + data: Container, + phantom: PhantomData, +} +impl Reversible +where + for<'a> &'a Container: IntoDoubleEndedIterator<&'a T>, +{ + pub fn new(data: Container) -> Self { + Reversible { + reversed: false, + data, + phantom: PhantomData, + } + } + + pub fn reverse(&mut self) { + self.reversed = !self.reversed + } + + pub fn iter( + &self, + ) -> itertools::Either< + <&Container as IntoDoubleEndedIterator<&T>>::IntoIter, + std::iter::Rev<<&Container as IntoDoubleEndedIterator<&T>>::IntoIter>, + > { + let iter = IntoDoubleEndedIterator::into_iter(&self.data); + if self.reversed { + itertools::Either::Right(iter.rev()) + } else { + itertools::Either::Left(iter) + } + } +} +impl Serialize for Reversible +where + for<'a> &'a Container: IntoDoubleEndedIterator<&'a T>, + T: Serialize, +{ + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + use serde::ser::SerializeSeq; + + let iter = IntoDoubleEndedIterator::into_iter(&self.data); + let mut seq_ser = serializer.serialize_seq(match iter.size_hint() { + (lower, Some(upper)) if lower == upper => Some(upper), + _ => None, + })?; + if self.reversed { + for elem in iter.rev() { + seq_ser.serialize_element(elem)?; + } + } else { + for elem in iter { + seq_ser.serialize_element(elem)?; + } + } + seq_ser.end() + } +} +impl<'de, T, Container> Deserialize<'de> for Reversible +where + for<'a> &'a Container: IntoDoubleEndedIterator<&'a T>, + Container: Deserialize<'de>, +{ + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + Ok(Reversible::new(Deserialize::deserialize(deserializer)?)) + } + fn deserialize_in_place(deserializer: D, place: &mut Self) -> Result<(), D::Error> + where + D: Deserializer<'de>, + { + Deserialize::deserialize_in_place(deserializer, &mut place.data) + } +}