Files
start-os/core/src/logs.rs

772 lines
22 KiB
Rust

use std::convert::Infallible;
use std::ops::{Deref, DerefMut};
use std::path::Path;
use std::process::Stdio;
use std::str::FromStr;
use std::time::{Duration, UNIX_EPOCH};
use axum::extract::ws;
use chrono::{DateTime, Utc};
use clap::builder::ValueParserFactory;
use clap::{Args, FromArgMatches, Parser};
use color_eyre::eyre::eyre;
use futures::stream::BoxStream;
use futures::{Future, Stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::{
CallRemote, Context, Empty, HandlerArgs, HandlerExt, HandlerFor, ParentHandler, from_fn_async,
};
use serde::de::{self, DeserializeOwned};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio_stream::wrappers::LinesStream;
use tokio_tungstenite::tungstenite::Message;
use tracing::instrument;
use crate::PackageId;
use crate::context::{CliContext, RpcContext};
use crate::error::ResultExt;
use crate::prelude::*;
use crate::rpc_continuations::{Guid, RpcContinuation, RpcContinuations};
use crate::util::net::WebSocket;
use crate::util::serde::Reversible;
use crate::util::{FromStrParser, Invoke};
#[pin_project::pin_project]
pub struct LogStream {
_child: Option<Child>,
#[pin]
entries: BoxStream<'static, Result<JournalctlEntry, Error>>,
}
impl Deref for LogStream {
type Target = BoxStream<'static, Result<JournalctlEntry, Error>>;
fn deref(&self) -> &Self::Target {
&self.entries
}
}
impl DerefMut for LogStream {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.entries
}
}
impl Stream for LogStream {
type Item = Result<JournalctlEntry, Error>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
Stream::poll_next(this.entries, cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.entries.size_hint()
}
}
#[instrument(skip_all)]
async fn ws_handler(
first_entry: Option<LogEntry>,
mut logs: LogStream,
mut stream: WebSocket,
) -> Result<(), Error> {
if let Some(first_entry) = first_entry {
stream
.send(ws::Message::Text(
serde_json::to_string(&first_entry)
.with_kind(ErrorKind::Serialization)?
.into(),
))
.await
.with_kind(ErrorKind::Network)?;
}
loop {
tokio::select! {
entry = logs.try_next() => {
if let Some(entry) = entry? {
let (_, log_entry) = entry.log_entry()?;
stream
.send(ws::Message::Text(
serde_json::to_string(&log_entry)
.with_kind(ErrorKind::Serialization)?
.into(),
))
.await
.with_kind(ErrorKind::Network)?;
} else {
return stream.normal_close("complete").await;
}
},
msg = stream.recv() => {
if msg.transpose().with_kind(crate::ErrorKind::Network)?.is_none() {
return Ok(())
}
}
}
}
}
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct LogResponse {
pub entries: Reversible<LogEntry>,
start_cursor: Option<String>,
end_cursor: Option<String>,
}
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct LogFollowResponse {
start_cursor: Option<String>,
guid: Guid,
}
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct LogEntry {
timestamp: DateTime<Utc>,
message: String,
boot_id: String,
}
impl std::fmt::Display for LogEntry {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{} {}",
self.timestamp
.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
self.message
)
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct JournalctlEntry {
#[serde(rename = "__REALTIME_TIMESTAMP")]
pub timestamp: String,
#[serde(rename = "MESSAGE")]
#[serde(deserialize_with = "deserialize_log_message")]
pub message: String,
#[serde(rename = "__CURSOR")]
pub cursor: String,
#[serde(rename = "_BOOT_ID")]
pub boot_id: String,
}
impl JournalctlEntry {
fn log_entry(self) -> Result<(String, LogEntry), Error> {
Ok((
self.cursor,
LogEntry {
timestamp: DateTime::<Utc>::from(
UNIX_EPOCH + Duration::from_micros(self.timestamp.parse::<u64>()?),
),
message: self.message,
boot_id: self.boot_id,
},
))
}
}
fn deserialize_log_message<'de, D: serde::de::Deserializer<'de>>(
deserializer: D,
) -> std::result::Result<String, D::Error> {
struct Visitor;
impl<'de> serde::de::Visitor<'de> for Visitor {
type Value = String;
fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(formatter, "a parsable string")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(v.to_owned())
}
fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(v)
}
fn visit_unit<E>(self) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(String::new())
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
String::from_utf8(
std::iter::repeat_with(|| seq.next_element::<u8>().transpose())
.take_while(|a| a.is_some())
.flatten()
.collect::<Result<Vec<u8>, _>>()?,
)
.map(|s| s.to_owned())
.map_err(serde::de::Error::custom)
}
}
deserializer.deserialize_any(Visitor)
}
/// Defining how we are going to filter on a journalctl cli log.
/// Kernal: (-k --dmesg Show kernel message log from the current boot)
/// Unit: ( -u --unit=UNIT Show logs from the specified unit
/// --user-unit=UNIT Show logs from the specified user unit))
/// System: Unit is startd, but we also filter on the comm
/// Container: Filtering containers, like podman/docker is done by filtering on the CONTAINER_NAME
#[derive(Debug, Clone)]
pub enum LogSource {
Kernel,
Unit(&'static str),
Package(PackageId),
}
pub const SYSTEM_UNIT: &str = "startd";
#[derive(Deserialize, Serialize, Parser)]
#[serde(rename_all = "camelCase")]
#[command(rename_all = "kebab-case")]
pub struct PackageIdParams {
#[arg(help = "help.arg.package-id")]
id: PackageId,
}
#[derive(Debug, Clone)]
pub enum BootIdentifier {
Index(i32),
Id(String),
}
impl FromStr for BootIdentifier {
type Err = Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s.parse() {
Ok(i) => Self::Index(i),
Err(_) => Self::Id(s.to_owned()),
})
}
}
impl ValueParserFactory for BootIdentifier {
type Parser = FromStrParser<Self>;
fn value_parser() -> Self::Parser {
Self::Parser::new()
}
}
impl Serialize for BootIdentifier {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self {
Self::Index(i) => serializer.serialize_i32(*i),
Self::Id(i) => serializer.serialize_str(i),
}
}
}
impl<'de> Deserialize<'de> for BootIdentifier {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct Visitor;
impl<'de> de::Visitor<'de> for Visitor {
type Value = BootIdentifier;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "a string or integer")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(Self::Value::Id(v.to_owned()))
}
fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(Self::Value::Id(v))
}
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(Self::Value::Index(v as i32))
}
fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(Self::Value::Index(v as i32))
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(Self::Value::Index(v as i32))
}
}
deserializer.deserialize_any(Visitor)
}
}
impl From<BootIdentifier> for String {
fn from(value: BootIdentifier) -> Self {
match value {
BootIdentifier::Index(i) => i.to_string(),
BootIdentifier::Id(i) => i,
}
}
}
#[derive(Deserialize, Serialize, Parser)]
#[serde(rename_all = "camelCase")]
#[command(rename_all = "kebab-case")]
pub struct LogsParams<Extra: FromArgMatches + Args = Empty> {
#[command(flatten)]
#[serde(flatten)]
extra: Extra,
#[arg(short = 'l', long = "limit", help = "help.arg.log-limit")]
limit: Option<usize>,
#[arg(
short = 'c',
long = "cursor",
conflicts_with = "follow",
help = "help.arg.log-cursor"
)]
cursor: Option<String>,
#[arg(short = 'b', long = "boot", help = "help.arg.log-boot")]
#[serde(default)]
boot: Option<BootIdentifier>,
#[arg(
short = 'B',
long = "before",
conflicts_with = "follow",
help = "help.arg.log-before"
)]
#[serde(default)]
before: bool,
}
#[derive(Deserialize, Serialize, Parser)]
#[serde(rename_all = "camelCase")]
#[command(rename_all = "kebab-case")]
pub struct CliLogsParams<Extra: FromArgMatches + Args = Empty> {
#[command(flatten)]
#[serde(flatten)]
rpc_params: LogsParams<Extra>,
#[arg(short = 'f', long = "follow", help = "help.arg.log-follow")]
#[serde(default)]
follow: bool,
}
#[allow(private_bounds)]
pub fn logs<
C: Context + AsRef<RpcContinuations>,
Extra: FromArgMatches + Serialize + DeserializeOwned + Args + Send + Sync + 'static,
>(
source: impl for<'a> LogSourceFn<'a, C, Extra>,
) -> ParentHandler<C, LogsParams<Extra>> {
ParentHandler::new()
.root_handler(logs_nofollow::<C, Extra>(source.clone()).no_cli())
.subcommand(
"follow",
logs_follow::<C, Extra>(source)
.with_inherited(|params, _| params)
.no_cli(),
)
}
pub async fn cli_logs<RemoteContext, Extra>(
HandlerArgs {
context: ctx,
parent_method,
method,
params: CliLogsParams { rpc_params, follow },
..
}: HandlerArgs<CliContext, CliLogsParams<Extra>>,
) -> Result<(), RpcError>
where
CliContext: CallRemote<RemoteContext>,
Extra: FromArgMatches + Args + Serialize + Send + Sync,
{
let method = parent_method
.into_iter()
.chain(method)
.chain(follow.then_some("follow"))
.join(".");
if follow {
let res = from_value::<LogFollowResponse>(
ctx.call_remote::<RemoteContext>(&method, to_value(&rpc_params)?)
.await?,
)?;
let mut stream = ctx.ws_continuation(res.guid).await?;
while let Some(log) = stream.try_next().await? {
if let Message::Text(log) = log {
println!("{}", serde_json::from_str::<LogEntry>(&log)?);
}
}
} else {
let res = from_value::<LogResponse>(
ctx.call_remote::<RemoteContext>(&method, to_value(&rpc_params)?)
.await?,
)?;
for entry in res.entries.iter() {
println!("{}", entry);
}
}
Ok(())
}
trait LogSourceFn<'a, Context, Extra>: Clone + Send + Sync + 'static {
type Fut: Future<Output = Result<LogSource, Error>> + Send + 'a;
fn call(&self, ctx: &'a Context, extra: Extra) -> Self::Fut;
}
impl<'a, C: Context, Extra, F, Fut> LogSourceFn<'a, C, Extra> for F
where
F: Fn(&'a C, Extra) -> Fut + Clone + Send + Sync + 'static,
Fut: Future<Output = Result<LogSource, Error>> + Send + 'a,
{
type Fut = Fut;
fn call(&self, ctx: &'a C, extra: Extra) -> Self::Fut {
self(ctx, extra)
}
}
fn logs_nofollow<C, Extra>(
f: impl for<'a> LogSourceFn<'a, C, Extra>,
) -> impl HandlerFor<C, Params = LogsParams<Extra>, InheritedParams = Empty, Ok = LogResponse, Err = Error>
where
C: Context,
Extra: FromArgMatches + Args + Send + Sync + 'static,
{
from_fn_async(
move |HandlerArgs {
context,
params:
LogsParams {
extra,
limit,
cursor,
boot,
before,
},
..
}: HandlerArgs<C, LogsParams<Extra>>| {
let f = f.clone();
async move {
fetch_logs(
f.call(&context, extra).await?,
limit,
cursor,
boot.map(String::from),
before,
)
.await
}
},
)
}
fn logs_follow<
C: Context + AsRef<RpcContinuations>,
Extra: FromArgMatches + Args + Send + Sync + 'static,
>(
f: impl for<'a> LogSourceFn<'a, C, Extra>,
) -> impl HandlerFor<
C,
Params = Empty,
InheritedParams = LogsParams<Extra>,
Ok = LogFollowResponse,
Err = Error,
> {
from_fn_async(
move |HandlerArgs {
context,
inherited_params:
LogsParams {
extra,
cursor,
limit,
boot,
..
},
..
}: HandlerArgs<C, Empty, LogsParams<Extra>>| {
let f = f.clone();
async move {
let src = f.call(&context, extra).await?;
follow_logs(context, src, cursor, limit, boot.map(String::from)).await
}
},
)
}
async fn get_package_id(
_: &RpcContext,
PackageIdParams { id }: PackageIdParams,
) -> Result<LogSource, Error> {
Ok(LogSource::Package(id))
}
pub fn package_logs() -> ParentHandler<RpcContext, LogsParams<PackageIdParams>> {
logs::<RpcContext, PackageIdParams>(get_package_id)
}
pub async fn journalctl(
id: LogSource,
limit: Option<usize>,
cursor: Option<&str>,
boot: Option<&str>,
before: bool,
follow: bool,
) -> Result<LogStream, Error> {
let mut cmd = gen_journalctl_command(&id);
if let Some(limit) = limit {
cmd.arg(format!("--lines={}", limit));
}
if let Some(cursor) = cursor {
cmd.arg(&format!("--after-cursor={}", cursor));
if before {
cmd.arg("--reverse");
}
}
if let Some(boot) = boot {
cmd.arg(format!("--boot={boot}"));
} else {
cmd.arg("--boot=all");
}
let deserialized_entries = String::from_utf8(cmd.invoke(ErrorKind::Journald).await?)?
.lines()
.map(serde_json::from_str::<JournalctlEntry>)
.filter_map(|e| e.ok())
.collect::<Vec<_>>();
if follow {
let mut follow_cmd = gen_journalctl_command(&id);
follow_cmd.arg("-f");
if let Some(last) = deserialized_entries.last() {
follow_cmd.arg(format!("--after-cursor={}", last.cursor));
follow_cmd.arg("--lines=all");
} else {
follow_cmd.arg("--lines=0");
}
let mut child = follow_cmd.stdout(Stdio::piped()).spawn()?;
let out = BufReader::new(child.stdout.take().ok_or_else(|| {
Error::new(
eyre!("{}", t!("logs.no-stdout-available")),
crate::ErrorKind::Journald,
)
})?);
let journalctl_entries = LinesStream::new(out.lines());
let follow_deserialized_entries = journalctl_entries
.map_err(|e| Error::new(e, crate::ErrorKind::Journald))
.try_filter_map(|s| {
futures::future::ready(Ok(serde_json::from_str::<JournalctlEntry>(&s).ok()))
});
let entries = futures::stream::iter(deserialized_entries)
.map(Ok)
.chain(follow_deserialized_entries)
.boxed();
Ok(LogStream {
_child: Some(child),
entries,
})
} else {
let entries = futures::stream::iter(deserialized_entries).map(Ok).boxed();
Ok(LogStream {
_child: None,
entries,
})
}
}
fn gen_journalctl_command(id: &LogSource) -> Command {
let mut cmd = Command::new("journalctl");
cmd.kill_on_drop(true);
cmd.arg("--output=json");
cmd.arg("--output-fields=MESSAGE");
match id {
LogSource::Kernel => {
cmd.arg("-k");
}
LogSource::Unit(id) => {
cmd.arg("-u");
cmd.arg(id);
}
LogSource::Package(id) => {
cmd.arg("-u")
.arg("container-runtime.service")
.arg("-D")
.arg(Path::new("/media/startos/data/package-data/logs").join(id));
}
};
cmd
}
#[instrument(skip_all)]
pub async fn fetch_logs(
id: LogSource,
limit: Option<usize>,
cursor: Option<String>,
boot: Option<String>,
before: bool,
) -> Result<LogResponse, Error> {
let limit = limit.unwrap_or(50);
let mut stream = journalctl(
id,
Some(limit),
cursor.as_deref(),
boot.as_deref(),
before,
false,
)
.await?;
let mut entries = Vec::with_capacity(limit);
let mut start_cursor = None;
if let Some(first) = tokio::time::timeout(Duration::from_secs(1), stream.try_next())
.await
.ok()
.transpose()?
.flatten()
{
let (cursor, entry) = first.log_entry()?;
start_cursor = Some(cursor);
entries.push(entry);
}
let (mut end_cursor, entries) = stream
.try_fold(
(start_cursor.clone(), entries),
|(_, mut acc), entry| async move {
let (cursor, entry) = entry.log_entry()?;
acc.push(entry);
Ok((Some(cursor), acc))
},
)
.await?;
let mut entries = Reversible::new(entries);
// reverse again so output is always in increasing chronological order
if cursor.is_some() && before {
entries.reverse();
std::mem::swap(&mut start_cursor, &mut end_cursor);
}
Ok(LogResponse {
entries,
start_cursor,
end_cursor,
})
}
#[instrument(skip_all)]
pub async fn follow_logs<Context: AsRef<RpcContinuations>>(
ctx: Context,
id: LogSource,
cursor: Option<String>,
limit: Option<usize>,
boot: Option<String>,
) -> Result<LogFollowResponse, Error> {
let limit = if cursor.is_some() {
None
} else {
Some(limit.unwrap_or(50))
};
let mut stream = journalctl(id, limit, cursor.as_deref(), boot.as_deref(), false, true).await?;
let mut start_cursor = None;
let mut first_entry = None;
if let Some(first) = tokio::time::timeout(Duration::from_secs(1), stream.try_next())
.await
.ok()
.transpose()?
.flatten()
{
let (cursor, entry) = first.log_entry()?;
start_cursor = Some(cursor);
first_entry = Some(entry);
}
let guid = Guid::new();
ctx.as_ref()
.add(
guid.clone(),
RpcContinuation::ws(
move |socket| async move {
if let Err(e) = ws_handler(first_entry, stream, socket).await {
tracing::error!(
"{}",
t!("logs.error-in-log-stream", error = e.to_string())
);
}
},
Duration::from_secs(30),
),
)
.await;
Ok(LogFollowResponse { start_cursor, guid })
}
// #[tokio::test]
// pub async fn test_logs() {
// let response = fetch_logs(
// // change `tor.service` to an actual journald unit on your machine
// // LogSource::Service("tor.service"),
// // first run `docker run --name=hello-world.embassy --log-driver=journald hello-world`
// LogSource::Container("hello-world".parse().unwrap()),
// // Some(5),
// None,
// None,
// // Some("s=1b8c418e28534400856c27b211dd94fd;i=5a7;b=97571c13a1284f87bc0639b5cff5acbe;m=740e916;t=5ca073eea3445;x=f45bc233ca328348".to_owned()),
// false,
// true,
// )
// .await
// .unwrap();
// let serialized = serde_json::to_string_pretty(&response).unwrap();
// println!("{}", serialized);
// }
// #[tokio::test]
// pub async fn test_logs() {
// let mut cmd = Command::new("journalctl");
// cmd.kill_on_drop(true);
// cmd.arg("-f");
// cmd.arg("CONTAINER_NAME=hello-world.embassy");
// let mut child = cmd.stdout(Stdio::piped()).spawn().unwrap();
// let out = BufReader::new(
// child
// .stdout
// .take()
// .ok_or_else(|| Error::new(eyre!("No stdout available"), crate::ErrorKind::Journald))
// .unwrap(),
// );
// let mut journalctl_entries = LinesStream::new(out.lines());
// while let Some(line) = journalctl_entries.try_next().await.unwrap() {
// dbg!(line);
// }
// }