Feat/logging (#2602)

* wip: Working on something to help

* chore: Add in some of the logging now

* chore: fix the type to interned instead of id

* wip

* wip

* chore: fix the logging by moving levels

* Apply suggestions from code review

* mount at machine id for journal

* Persistant

* limit log size

* feat: Actually logging and mounting now

* fix: Get the logs from the previous versions of the boot

* Chore: Add the boot id

---------

Co-authored-by: Aiden McClelland <me@drbonez.dev>
This commit is contained in:
Jade
2024-04-17 15:46:10 -06:00
committed by GitHub
parent 711c82472c
commit 9eff920989
10 changed files with 308 additions and 120 deletions

View File

@@ -18,15 +18,21 @@ use tokio_stream::wrappers::LinesStream;
use tokio_tungstenite::tungstenite::Message;
use tracing::instrument;
use crate::context::{CliContext, RpcContext};
use crate::core::rpc_continuations::{RequestGuid, RpcContinuation};
use crate::error::ResultExt;
use crate::prelude::*;
use crate::util::serde::Reversible;
use crate::{
context::{CliContext, RpcContext},
lxc::ContainerId,
};
use crate::{
core::rpc_continuations::{RequestGuid, RpcContinuation},
util::Invoke,
};
#[pin_project::pin_project]
pub struct LogStream {
_child: Child,
_child: Option<Child>,
#[pin]
entries: BoxStream<'static, Result<JournalctlEntry, Error>>,
}
@@ -116,9 +122,11 @@ pub struct LogFollowResponse {
}
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct LogEntry {
timestamp: DateTime<Utc>,
message: String,
boot_id: String,
}
impl std::fmt::Display for LogEntry {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -141,6 +149,8 @@ pub struct JournalctlEntry {
pub message: String,
#[serde(rename = "__CURSOR")]
pub cursor: String,
#[serde(rename = "_BOOT_ID")]
pub boot_id: String,
}
impl JournalctlEntry {
fn log_entry(self) -> Result<(String, LogEntry), Error> {
@@ -151,6 +161,7 @@ impl JournalctlEntry {
UNIX_EPOCH + Duration::from_micros(self.timestamp.parse::<u64>()?),
),
message: self.message,
boot_id: self.boot_id,
},
))
}
@@ -200,12 +211,12 @@ fn deserialize_log_message<'de, D: serde::de::Deserializer<'de>>(
/// --user-unit=UNIT Show logs from the specified user unit))
/// System: Unit is startd, but we also filter on the comm
/// Container: Filtering containers, like podman/docker is done by filtering on the CONTAINER_NAME
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum LogSource {
Kernel,
Unit(&'static str),
System,
Container(PackageId),
Container(ContainerId),
}
pub const SYSTEM_UNIT: &str = "startd";
@@ -276,7 +287,7 @@ pub async fn cli_logs(
}
}
pub async fn logs_nofollow(
_ctx: (),
ctx: RpcContext,
_: Empty,
LogsParam {
id,
@@ -286,14 +297,38 @@ pub async fn logs_nofollow(
..
}: LogsParam,
) -> Result<LogResponse, Error> {
fetch_logs(LogSource::Container(id), limit, cursor, before).await
let container_id = ctx
.services
.get(&id)
.await
.as_ref()
.map(|x| x.container_id())
.ok_or_else(|| {
Error::new(
eyre!("No service found with id: {}", id),
ErrorKind::NotFound,
)
})??;
fetch_logs(LogSource::Container(container_id), limit, cursor, before).await
}
pub async fn logs_follow(
ctx: RpcContext,
_: Empty,
LogsParam { id, limit, .. }: LogsParam,
) -> Result<LogFollowResponse, Error> {
follow_logs(ctx, LogSource::Container(id), limit).await
let container_id = ctx
.services
.get(&id)
.await
.as_ref()
.map(|x| x.container_id())
.ok_or_else(|| {
Error::new(
eyre!("No service found with id: {}", id),
ErrorKind::NotFound,
)
})??;
follow_logs(ctx, LogSource::Container(container_id), limit).await
}
pub async fn cli_logs_generic_nofollow(
@@ -358,7 +393,74 @@ pub async fn journalctl(
before: bool,
follow: bool,
) -> Result<LogStream, Error> {
let mut cmd = Command::new("journalctl");
let mut cmd = gen_journalctl_command(&id, limit);
let cursor_formatted = format!("--after-cursor={}", cursor.unwrap_or(""));
if cursor.is_some() {
cmd.arg(&cursor_formatted);
if before {
cmd.arg("--reverse");
}
}
let deserialized_entries = String::from_utf8(cmd.invoke(ErrorKind::Journald).await?)?
.lines()
.map(serde_json::from_str::<JournalctlEntry>)
.collect::<Result<Vec<_>, _>>()
.with_kind(ErrorKind::Deserialization)?;
if follow {
let mut follow_cmd = gen_journalctl_command(&id, limit);
follow_cmd.arg("-f");
if let Some(last) = deserialized_entries.last() {
cmd.arg(format!("--after-cursor={}", last.cursor));
}
let mut child = cmd.stdout(Stdio::piped()).spawn()?;
let out =
BufReader::new(child.stdout.take().ok_or_else(|| {
Error::new(eyre!("No stdout available"), crate::ErrorKind::Journald)
})?);
let journalctl_entries = LinesStream::new(out.lines());
let follow_deserialized_entries = journalctl_entries
.map_err(|e| Error::new(e, crate::ErrorKind::Journald))
.and_then(|s| {
futures::future::ready(
serde_json::from_str::<JournalctlEntry>(&s)
.with_kind(crate::ErrorKind::Deserialization),
)
});
let entries = futures::stream::iter(deserialized_entries)
.map(Ok)
.chain(follow_deserialized_entries)
.boxed();
Ok(LogStream {
_child: Some(child),
entries,
})
} else {
let entries = futures::stream::iter(deserialized_entries).map(Ok).boxed();
Ok(LogStream {
_child: None,
entries,
})
}
}
fn gen_journalctl_command(id: &LogSource, limit: usize) -> Command {
let mut cmd = match id {
LogSource::Container(container_id) => {
let mut cmd = Command::new("lxc-attach");
cmd.arg(format!("{}", container_id))
.arg("--")
.arg("journalctl");
cmd
}
_ => Command::new("journalctl"),
};
cmd.kill_on_drop(true);
cmd.arg("--output=json");
@@ -377,48 +479,11 @@ pub async fn journalctl(
cmd.arg(SYSTEM_UNIT);
cmd.arg(format!("_COMM={}", SYSTEM_UNIT));
}
LogSource::Container(id) => {
#[cfg(not(feature = "docker"))]
cmd.arg(format!("SYSLOG_IDENTIFIER={}.embassy", id));
#[cfg(feature = "docker")]
cmd.arg(format!("CONTAINER_NAME={}.embassy", id));
LogSource::Container(_container_id) => {
cmd.arg("-u").arg("container-runtime.service");
}
};
let cursor_formatted = format!("--after-cursor={}", cursor.unwrap_or(""));
if cursor.is_some() {
cmd.arg(&cursor_formatted);
if before {
cmd.arg("--reverse");
}
}
if follow {
cmd.arg("--follow");
}
let mut child = cmd.stdout(Stdio::piped()).spawn()?;
let out = BufReader::new(
child
.stdout
.take()
.ok_or_else(|| Error::new(eyre!("No stdout available"), crate::ErrorKind::Journald))?,
);
let journalctl_entries = LinesStream::new(out.lines());
let deserialized_entries = journalctl_entries
.map_err(|e| Error::new(e, crate::ErrorKind::Journald))
.and_then(|s| {
futures::future::ready(
serde_json::from_str::<JournalctlEntry>(&s)
.with_kind(crate::ErrorKind::Deserialization),
)
});
Ok(LogStream {
_child: child,
entries: deserialized_entries.boxed(),
})
cmd
}
#[instrument(skip_all)]