add boot param to logs request

This commit is contained in:
Aiden McClelland
2024-07-10 13:00:02 -06:00
parent 1b5cf2d272
commit e6cedc257e
2 changed files with 130 additions and 7 deletions

View File

@@ -1,9 +1,12 @@
use std::convert::Infallible;
use std::ops::{Deref, DerefMut};
use std::process::Stdio;
use std::str::FromStr;
use std::time::{Duration, UNIX_EPOCH};
use axum::extract::ws::{self, WebSocket};
use chrono::{DateTime, Utc};
use clap::builder::ValueParserFactory;
use clap::{Args, FromArgMatches, Parser};
use color_eyre::eyre::eyre;
use futures::stream::BoxStream;
@@ -14,7 +17,7 @@ use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::{
from_fn_async, CallRemote, Context, Empty, HandlerArgs, HandlerExt, HandlerFor, ParentHandler,
};
use serde::de::DeserializeOwned;
use serde::de::{self, DeserializeOwned};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
@@ -27,6 +30,7 @@ use crate::error::ResultExt;
use crate::lxc::ContainerId;
use crate::prelude::*;
use crate::rpc_continuations::{Guid, RpcContinuation, RpcContinuations};
use crate::util::clap::FromStrParser;
use crate::util::serde::Reversible;
use crate::util::Invoke;
@@ -227,6 +231,91 @@ pub struct PackageIdParams {
id: PackageId,
}
#[derive(Debug, Clone)]
pub enum BootIdentifier {
Index(i32),
Id(String),
}
impl FromStr for BootIdentifier {
type Err = Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s.parse() {
Ok(i) => Self::Index(i),
Err(_) => Self::Id(s.to_owned()),
})
}
}
impl ValueParserFactory for BootIdentifier {
type Parser = FromStrParser<Self>;
fn value_parser() -> Self::Parser {
Self::Parser::new()
}
}
impl Serialize for BootIdentifier {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self {
Self::Index(i) => serializer.serialize_i32(*i),
Self::Id(i) => serializer.serialize_str(i),
}
}
}
impl<'de> Deserialize<'de> for BootIdentifier {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct Visitor;
impl<'de> de::Visitor<'de> for Visitor {
type Value = BootIdentifier;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "a string or integer")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(Self::Value::Id(v.to_owned()))
}
fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(Self::Value::Id(v))
}
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(Self::Value::Index(v as i32))
}
fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(Self::Value::Index(v as i32))
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(Self::Value::Index(v as i32))
}
}
deserializer.deserialize_any(Visitor)
}
}
impl From<BootIdentifier> for String {
fn from(value: BootIdentifier) -> Self {
match value {
BootIdentifier::Index(i) => i.to_string(),
BootIdentifier::Id(i) => i,
}
}
}
#[derive(Deserialize, Serialize, Parser)]
#[serde(rename_all = "camelCase")]
#[command(rename_all = "kebab-case")]
@@ -238,6 +327,9 @@ pub struct LogsParams<Extra: FromArgMatches + Args = Empty> {
limit: Option<usize>,
#[arg(short = 'c', long = "cursor", conflicts_with = "follow")]
cursor: Option<String>,
#[arg(short = 'b', long = "boot")]
#[serde(default)]
boot: Option<BootIdentifier>,
#[arg(short = 'B', long = "before", conflicts_with = "follow")]
#[serde(default)]
before: bool,
@@ -352,12 +444,22 @@ where
extra,
limit,
cursor,
boot,
before,
},
..
}: HandlerArgs<C, Empty, LogsParams<Extra>>| {
let f = f.clone();
async move { fetch_logs(f.call(&context, extra).await?, limit, cursor, before).await }
async move {
fetch_logs(
f.call(&context, extra).await?,
limit,
cursor,
boot.map(String::from),
before,
)
.await
}
},
)
}
@@ -377,13 +479,16 @@ fn logs_follow<
from_fn_async(
move |HandlerArgs {
context,
inherited_params: LogsParams { extra, limit, .. },
inherited_params:
LogsParams {
extra, limit, boot, ..
},
..
}: HandlerArgs<C, Empty, LogsParams<Extra>>| {
let f = f.clone();
async move {
let src = f.call(&context, extra).await?;
follow_logs(context, src, limit).await
follow_logs(context, src, limit, boot.map(String::from)).await
}
},
)
@@ -416,6 +521,7 @@ pub async fn journalctl(
id: LogSource,
limit: usize,
cursor: Option<&str>,
boot: Option<&str>,
before: bool,
follow: bool,
) -> Result<LogStream, Error> {
@@ -431,6 +537,12 @@ pub async fn journalctl(
}
}
if let Some(boot) = boot {
cmd.arg(format!("--boot={boot}"));
} else {
cmd.arg("--boot=all");
}
let deserialized_entries = String::from_utf8(cmd.invoke(ErrorKind::Journald).await?)?
.lines()
.map(serde_json::from_str::<JournalctlEntry>)
@@ -516,10 +628,12 @@ pub async fn fetch_logs(
id: LogSource,
limit: Option<usize>,
cursor: Option<String>,
boot: Option<String>,
before: bool,
) -> Result<LogResponse, Error> {
let limit = limit.unwrap_or(50);
let mut stream = journalctl(id, limit, cursor.as_deref(), before, false).await?;
let mut stream =
journalctl(id, limit, cursor.as_deref(), boot.as_deref(), before, false).await?;
let mut entries = Vec::with_capacity(limit);
let mut start_cursor = None;
@@ -563,9 +677,10 @@ pub async fn follow_logs<Context: AsRef<RpcContinuations>>(
ctx: Context,
id: LogSource,
limit: Option<usize>,
boot: Option<String>,
) -> Result<LogFollowResponse, Error> {
let limit = limit.unwrap_or(50);
let mut stream = journalctl(id, limit, None, false, true).await?;
let mut stream = journalctl(id, limit, None, boot.as_deref(), false, true).await?;
let mut start_cursor = None;
let mut first_entry = None;

View File

@@ -305,7 +305,15 @@ async fn torctl(
.invoke(ErrorKind::Tor)
.await?;
let logs = journalctl(LogSource::Unit(SYSTEMD_UNIT), 0, None, false, true).await?;
let logs = journalctl(
LogSource::Unit(SYSTEMD_UNIT),
0,
None,
Some("0"),
false,
true,
)
.await?;
let mut tcp_stream = None;
for _ in 0..60 {