mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-03-26 10:21:52 +00:00
package logs rpc endpoint (#427)
* wip: package logs * fixes from code review * fix reverse flag behavior * only embassyd uses -u, packages use CONTAINER_NAME * make id param an enum, clean up code * changes from rebase * change package IDs to be of type PackageId Co-authored-by: Aiden McClelland <me@drbonez.dev>
This commit is contained in:
committed by
Aiden McClelland
parent
4be36f5968
commit
ee381ebce7
@@ -52,6 +52,7 @@ pub enum ErrorKind {
|
||||
ParseTimestamp = 44,
|
||||
ParseSysInfo = 45,
|
||||
WifiError = 46,
|
||||
Journald = 47,
|
||||
}
|
||||
impl ErrorKind {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
@@ -103,6 +104,7 @@ impl ErrorKind {
|
||||
ParseTimestamp => "Timestamp Parsing Error",
|
||||
ParseSysInfo => "System Info Parsing Error",
|
||||
WifiError => "Wifi Internal Error",
|
||||
Journald => "Journald Error",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ pub mod hostname;
|
||||
pub mod id;
|
||||
pub mod inspect;
|
||||
pub mod install;
|
||||
pub mod logs;
|
||||
pub mod manager;
|
||||
pub mod middleware;
|
||||
pub mod migration;
|
||||
@@ -77,7 +78,8 @@ pub fn main_api() -> Result<(), RpcError> {
|
||||
install::uninstall,
|
||||
config::config,
|
||||
control::start,
|
||||
control::stop
|
||||
control::stop,
|
||||
logs::logs,
|
||||
))]
|
||||
pub fn package() -> Result<(), RpcError> {
|
||||
Ok(())
|
||||
|
||||
186
appmgr/src/logs.rs
Normal file
186
appmgr/src/logs.rs
Normal file
@@ -0,0 +1,186 @@
|
||||
use std::ascii::AsciiExt;
|
||||
use std::process::Stdio;
|
||||
use std::time::{Duration, UNIX_EPOCH};
|
||||
|
||||
use anyhow::anyhow;
|
||||
use chrono::{DateTime, Utc};
|
||||
use clap::ArgMatches;
|
||||
use futures::TryStreamExt;
|
||||
use rpc_toolkit::command;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||
use tokio::process::Command;
|
||||
use tokio_stream::wrappers::LinesStream;
|
||||
|
||||
use crate::action::docker::DockerAction;
|
||||
use crate::context::RpcContext;
|
||||
use crate::error::ResultExt;
|
||||
use crate::id::Id;
|
||||
use crate::s9pk::manifest::PackageId;
|
||||
use crate::util::Reversible;
|
||||
use crate::Error;
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct LogResponse {
|
||||
entries: Reversible<LogEntry>,
|
||||
start_cursor: Option<String>,
|
||||
end_cursor: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
|
||||
pub struct LogEntry {
|
||||
timestamp: DateTime<Utc>,
|
||||
message: String,
|
||||
}
|
||||
impl std::fmt::Display for LogEntry {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{} {}", self.timestamp, self.message)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct JournalctlEntry {
|
||||
#[serde(rename = "__REALTIME_TIMESTAMP")]
|
||||
timestamp: String,
|
||||
#[serde(rename = "MESSAGE")]
|
||||
message: String,
|
||||
#[serde(rename = "__CURSOR")]
|
||||
cursor: String,
|
||||
}
|
||||
impl JournalctlEntry {
|
||||
fn log_entry(self) -> Result<(String, LogEntry), Error> {
|
||||
Ok((
|
||||
self.cursor,
|
||||
LogEntry {
|
||||
timestamp: DateTime::<Utc>::from(
|
||||
UNIX_EPOCH + Duration::from_micros(self.timestamp.parse::<u64>()?),
|
||||
),
|
||||
message: self.message,
|
||||
},
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub enum LogSource {
|
||||
Service(&'static str),
|
||||
Container(PackageId),
|
||||
}
|
||||
|
||||
fn display_logs(all: LogResponse, _: &ArgMatches<'_>) {
|
||||
for entry in all.entries.iter() {
|
||||
println!("{}", entry);
|
||||
}
|
||||
}
|
||||
|
||||
#[command(display(display_logs))]
|
||||
pub async fn logs(
|
||||
#[context] _: RpcContext,
|
||||
#[arg] id: PackageId,
|
||||
#[arg] limit: Option<usize>,
|
||||
#[arg] cursor: Option<String>,
|
||||
#[arg] before_flag: Option<bool>,
|
||||
) -> Result<LogResponse, Error> {
|
||||
Ok(fetch_logs(LogSource::Container(id), limit, cursor, before_flag.unwrap_or(false)).await?)
|
||||
}
|
||||
|
||||
pub async fn fetch_logs(
|
||||
id: LogSource,
|
||||
limit: Option<usize>,
|
||||
cursor: Option<String>,
|
||||
before_flag: bool,
|
||||
) -> Result<LogResponse, Error> {
|
||||
let limit = limit.unwrap_or(50);
|
||||
let limit_formatted = format!("-n{}", limit);
|
||||
|
||||
let mut args = vec!["--output=json","--output-fields=MESSAGE",&limit_formatted,];
|
||||
let id_formatted = match id {
|
||||
LogSource::Service(id)=> {
|
||||
args.push("-u");
|
||||
id.to_owned()
|
||||
},
|
||||
LogSource::Container(id) => format!("CONTAINER_NAME={}", DockerAction::container_name(&id, None))
|
||||
};
|
||||
args.push(&id_formatted);
|
||||
|
||||
let cursor_formatted = format!("--after-cursor={}", cursor.clone().unwrap_or("".to_owned()));
|
||||
let mut get_prev_logs_and_reverse = false;
|
||||
if cursor.is_some() {
|
||||
args.push(&cursor_formatted);
|
||||
if before_flag {
|
||||
get_prev_logs_and_reverse = true;
|
||||
}
|
||||
}
|
||||
if get_prev_logs_and_reverse {
|
||||
args.push("--reverse");
|
||||
}
|
||||
|
||||
let mut child = Command::new("journalctl")
|
||||
.args(args)
|
||||
.stdout(Stdio::piped())
|
||||
.spawn()?;
|
||||
let out =
|
||||
BufReader::new(child.stdout.take().ok_or_else(|| {
|
||||
Error::new(anyhow!("No stdout available"), crate::ErrorKind::Journald)
|
||||
})?);
|
||||
|
||||
let journalctl_entries = LinesStream::new(out.lines());
|
||||
|
||||
let mut deserialized_entries = journalctl_entries
|
||||
.map_err(|e| Error::new(e, crate::ErrorKind::Journald))
|
||||
.and_then(|s| {
|
||||
futures::future::ready(
|
||||
serde_json::from_str::<JournalctlEntry>(&s)
|
||||
.with_kind(crate::ErrorKind::Deserialization),
|
||||
)
|
||||
});
|
||||
|
||||
let mut entries = Vec::with_capacity(limit);
|
||||
let mut start_cursor = None;
|
||||
|
||||
if let Some(first) = deserialized_entries.try_next().await? {
|
||||
let (cursor, entry) = first.log_entry()?;
|
||||
start_cursor = Some(cursor);
|
||||
entries.push(entry);
|
||||
}
|
||||
|
||||
let (mut end_cursor, entries) = deserialized_entries
|
||||
.try_fold(
|
||||
(start_cursor.clone(), entries),
|
||||
|(_, mut acc), entry| async move {
|
||||
let (cursor, entry) = entry.log_entry()?;
|
||||
acc.push(entry);
|
||||
Ok((Some(cursor), acc))
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let mut entries = Reversible::new(entries);
|
||||
// reverse again so output is always in increasing chronological order
|
||||
if get_prev_logs_and_reverse {
|
||||
entries.reverse();
|
||||
std::mem::swap(&mut start_cursor, &mut end_cursor);
|
||||
}
|
||||
Ok(LogResponse {
|
||||
entries,
|
||||
start_cursor,
|
||||
end_cursor,
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_logs() {
|
||||
let response =
|
||||
fetch_logs(
|
||||
// change `tor.service` to an actual journald unit on your machine
|
||||
// LogSource::Service("tor.service"),
|
||||
// first run `docker run --name=hello-world.embassy --log-driver=journald hello-world`
|
||||
LogSource::Container("hello-world".parse().unwrap()),
|
||||
// Some(5),
|
||||
None,
|
||||
None,
|
||||
// Some("s=1b8c418e28534400856c27b211dd94fd;i=5a7;b=97571c13a1284f87bc0639b5cff5acbe;m=740e916;t=5ca073eea3445;x=f45bc233ca328348".to_owned()),
|
||||
false,
|
||||
).await.unwrap();
|
||||
let serialized = serde_json::to_string_pretty(&response).unwrap();
|
||||
println!("{}", serialized);
|
||||
}
|
||||
@@ -1,61 +1,22 @@
|
||||
use std::fmt;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use clap::ArgMatches;
|
||||
use rpc_toolkit::command;
|
||||
use tokio::process::Command;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::context::RpcContext;
|
||||
use crate::logs::{LogResponse, LogSource, fetch_logs};
|
||||
use crate::{Error, ErrorKind, ResultExt};
|
||||
|
||||
pub const SYSTEMD_UNIT: &'static str = "embassyd";
|
||||
|
||||
fn parse_datetime(text: &str, _matches: &ArgMatches) -> Result<DateTime<Utc>, Error> {
|
||||
text.parse().with_kind(ErrorKind::ParseTimestamp)
|
||||
}
|
||||
|
||||
#[command(rpc_only)]
|
||||
pub async fn logs(
|
||||
#[context] _ctx: RpcContext,
|
||||
#[arg(parse(crate::system::parse_datetime))] before: Option<DateTime<Utc>>,
|
||||
#[context] ctx: RpcContext,
|
||||
#[arg] limit: Option<usize>,
|
||||
) -> Result<Vec<(String, String)>, Error> {
|
||||
let before = before.unwrap_or(Utc::now());
|
||||
let limit = limit.unwrap_or(50);
|
||||
// Journalctl has unexpected behavior where "until" does not play well with "lines" unless the output is reversed.
|
||||
// Keep this in mind if you are changing the code below
|
||||
let out = Command::new("journalctl")
|
||||
.args(&[
|
||||
"-u",
|
||||
SYSTEMD_UNIT,
|
||||
&format!(
|
||||
"-U\"{} {} UTC\"",
|
||||
before.date().naive_utc(),
|
||||
before.time().format("%H:%M:%S")
|
||||
),
|
||||
"--output=short-iso",
|
||||
"--no-hostname",
|
||||
"--utc",
|
||||
"--reverse",
|
||||
&format!("-n{}", limit),
|
||||
])
|
||||
.output()
|
||||
.await?
|
||||
.stdout;
|
||||
let out_string = String::from_utf8(out)?;
|
||||
let lines = out_string.lines();
|
||||
let mut split_lines = lines
|
||||
.skip(1) // ditch the journalctl header
|
||||
.map(|s| {
|
||||
// split the timestamp off from the log line
|
||||
let (ts, l) = s.split_once(" ").unwrap();
|
||||
(ts.to_owned(), l.to_owned())
|
||||
})
|
||||
.collect::<Vec<(String, String)>>();
|
||||
// reverse output again because we reversed it above
|
||||
split_lines.reverse();
|
||||
Ok(split_lines)
|
||||
#[arg] cursor: Option<String>,
|
||||
#[arg] before_flag: Option<bool>,
|
||||
) -> Result<LogResponse, Error> {
|
||||
Ok(fetch_logs(LogSource::Service(SYSTEMD_UNIT), limit, cursor, before_flag.unwrap_or(false)).await?)
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, Clone, Debug)]
|
||||
@@ -482,15 +443,6 @@ async fn get_disk_info() -> Result<MetricsDisk, Error> {
|
||||
})?
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_datetime_output() {
|
||||
println!(
|
||||
"{} {} UTC",
|
||||
Utc::now().date().naive_utc(),
|
||||
Utc::now().time().format("%H:%M:%S")
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_get_temp() {
|
||||
println!("{}", get_temp().await.unwrap())
|
||||
|
||||
@@ -898,3 +898,104 @@ impl Serialize for Port {
|
||||
serialize_display(&self.0, serializer)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait IntoDoubleEndedIterator<T>: IntoIterator<Item = T> {
|
||||
type IntoIter: Iterator<Item = T> + DoubleEndedIterator;
|
||||
fn into_iter(self) -> <Self as IntoDoubleEndedIterator<T>>::IntoIter;
|
||||
}
|
||||
impl<T, U> IntoDoubleEndedIterator<U> for T
|
||||
where
|
||||
T: IntoIterator<Item = U>,
|
||||
<T as IntoIterator>::IntoIter: DoubleEndedIterator,
|
||||
{
|
||||
type IntoIter = <T as IntoIterator>::IntoIter;
|
||||
fn into_iter(self) -> <Self as IntoDoubleEndedIterator<U>>::IntoIter {
|
||||
IntoIterator::into_iter(self)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Reversible<T, Container = Vec<T>>
|
||||
where
|
||||
for<'a> &'a Container: IntoDoubleEndedIterator<&'a T>,
|
||||
{
|
||||
reversed: bool,
|
||||
data: Container,
|
||||
phantom: PhantomData<T>,
|
||||
}
|
||||
impl<T, Container> Reversible<T, Container>
|
||||
where
|
||||
for<'a> &'a Container: IntoDoubleEndedIterator<&'a T>,
|
||||
{
|
||||
pub fn new(data: Container) -> Self {
|
||||
Reversible {
|
||||
reversed: false,
|
||||
data,
|
||||
phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reverse(&mut self) {
|
||||
self.reversed = !self.reversed
|
||||
}
|
||||
|
||||
pub fn iter(
|
||||
&self,
|
||||
) -> itertools::Either<
|
||||
<&Container as IntoDoubleEndedIterator<&T>>::IntoIter,
|
||||
std::iter::Rev<<&Container as IntoDoubleEndedIterator<&T>>::IntoIter>,
|
||||
> {
|
||||
let iter = IntoDoubleEndedIterator::into_iter(&self.data);
|
||||
if self.reversed {
|
||||
itertools::Either::Right(iter.rev())
|
||||
} else {
|
||||
itertools::Either::Left(iter)
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<T, Container> Serialize for Reversible<T, Container>
|
||||
where
|
||||
for<'a> &'a Container: IntoDoubleEndedIterator<&'a T>,
|
||||
T: Serialize,
|
||||
{
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
use serde::ser::SerializeSeq;
|
||||
|
||||
let iter = IntoDoubleEndedIterator::into_iter(&self.data);
|
||||
let mut seq_ser = serializer.serialize_seq(match iter.size_hint() {
|
||||
(lower, Some(upper)) if lower == upper => Some(upper),
|
||||
_ => None,
|
||||
})?;
|
||||
if self.reversed {
|
||||
for elem in iter.rev() {
|
||||
seq_ser.serialize_element(elem)?;
|
||||
}
|
||||
} else {
|
||||
for elem in iter {
|
||||
seq_ser.serialize_element(elem)?;
|
||||
}
|
||||
}
|
||||
seq_ser.end()
|
||||
}
|
||||
}
|
||||
impl<'de, T, Container> Deserialize<'de> for Reversible<T, Container>
|
||||
where
|
||||
for<'a> &'a Container: IntoDoubleEndedIterator<&'a T>,
|
||||
Container: Deserialize<'de>,
|
||||
{
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
Ok(Reversible::new(Deserialize::deserialize(deserializer)?))
|
||||
}
|
||||
fn deserialize_in_place<D>(deserializer: D, place: &mut Self) -> Result<(), D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
Deserialize::deserialize_in_place(deserializer, &mut place.data)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user