wip: subcontainer exec log drain via SCM_RIGHTS (reference only)

Implemented pipe FD handoff from exec to launch via Unix socket +
SCM_RIGHTS for grandchild log capture. Superseded by the simpler
PR_SET_DUMPABLE approach which eliminates the need for pipes entirely.
This commit is contained in:
Aiden McClelland
2026-03-22 23:58:14 -06:00
parent 25aa140174
commit b7e4df44bf
2 changed files with 343 additions and 13 deletions

View File

@@ -161,6 +161,7 @@ nix = { version = "0.30.1", features = [
"process", "process",
"sched", "sched",
"signal", "signal",
"socket",
"user", "user",
] } ] }
nom = "8.0.0" nom = "8.0.0"

View File

@@ -1,6 +1,7 @@
use std::ffi::{OsStr, OsString, c_int}; use std::ffi::{OsStr, OsString, c_int};
use std::fs::File; use std::fs::File;
use std::io::{BufRead, BufReader, IsTerminal, Read}; use std::io::{BufRead, BufReader, IsTerminal, Read};
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
use std::os::unix::process::{CommandExt, ExitStatusExt}; use std::os::unix::process::{CommandExt, ExitStatusExt};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::{Command as StdCommand, Stdio}; use std::process::{Command as StdCommand, Stdio};
@@ -8,6 +9,9 @@ use std::sync::Arc;
use nix::errno::Errno; use nix::errno::Errno;
use nix::sched::CloneFlags; use nix::sched::CloneFlags;
use nix::sys::socket::{
recvmsg, sendmsg, ControlMessage, ControlMessageOwned, MsgFlags,
};
use nix::unistd::Pid; use nix::unistd::Pid;
use signal_hook::consts::signal::*; use signal_hook::consts::signal::*;
use termion::raw::IntoRawMode; use termion::raw::IntoRawMode;
@@ -104,6 +108,129 @@ fn open_file_read(path: impl AsRef<Path>) -> Result<File, Error> {
}) })
} }
const LOG_DRAIN_SOCK: &str = ".log-drain.sock";
fn log_drain_sock_path(chroot: &Path) -> PathBuf {
chroot.join(LOG_DRAIN_SOCK)
}
/// Send file descriptors to the log drain socket via SCM_RIGHTS.
/// Best-effort: silently returns on any failure.
fn send_drain_fds(chroot: &Path, fds: &[RawFd]) {
let sock_path = log_drain_sock_path(chroot);
let stream = match std::os::unix::net::UnixStream::connect(&sock_path) {
Ok(s) => s,
Err(e) => {
tracing::debug!("no log drain socket: {e}");
return;
}
};
let cmsg = [ControlMessage::ScmRights(fds)];
let iov = [std::io::IoSlice::new(&[0u8])];
if let Err(e) = sendmsg::<()>(stream.as_raw_fd(), &iov, &cmsg, MsgFlags::empty(), None) {
tracing::warn!("failed to send drain fds: {e}");
}
}
/// Receive file descriptors from a Unix stream via SCM_RIGHTS.
fn recv_drain_fds(stream: &std::os::unix::net::UnixStream) -> Result<Vec<OwnedFd>, nix::Error> {
let mut buf = [0u8; 1];
let mut iov = [std::io::IoSliceMut::new(&mut buf)];
let mut cmsg_buf = nix::cmsg_space!([RawFd; 4]);
let msg = recvmsg::<()>(
stream.as_raw_fd(),
&mut iov,
Some(&mut cmsg_buf),
MsgFlags::empty(),
)?;
let mut fds = Vec::new();
for cmsg in msg.cmsgs()? {
if let ControlMessageOwned::ScmRights(received) = cmsg {
for fd in received {
fds.push(unsafe { OwnedFd::from_raw_fd(fd) });
}
}
}
Ok(fds)
}
/// Non-blocking drain of the pipe/pty buffer into `out`. The dup'd FD shares
/// the same pipe as the I/O thread, so both consumers write to the caller's
/// stdout/stderr — no data is lost.
fn drain_buffer(fd: &OwnedFd, out: &mut impl std::io::Write) {
use std::os::fd::AsFd;
let flags = nix::fcntl::fcntl(fd.as_fd(), nix::fcntl::FcntlArg::F_GETFL).unwrap_or(0);
let _ = nix::fcntl::fcntl(
fd.as_fd(),
nix::fcntl::FcntlArg::F_SETFL(
nix::fcntl::OFlag::from_bits_truncate(flags) | nix::fcntl::OFlag::O_NONBLOCK,
),
);
let mut buf = [0u8; CAP_1_KiB];
loop {
match nix::unistd::read(fd.as_fd(), &mut buf) {
Ok(0) | Err(Errno::EAGAIN) | Err(_) => break,
Ok(n) => {
out.write_all(&buf[..n]).ok();
}
}
}
out.flush().ok();
let _ = nix::fcntl::fcntl(
fd.as_fd(),
nix::fcntl::FcntlArg::F_SETFL(nix::fcntl::OFlag::from_bits_truncate(flags)),
);
}
/// Start a Unix socket listener that accepts FD handoffs from `exec` processes.
/// Received FDs are drained to stderr (which flows to container-runtime's journald).
fn start_drain_listener(chroot: &Path) {
use std::io::{Read, Write};
use std::os::unix::net::UnixListener;
let sock_path = log_drain_sock_path(chroot);
let _ = std::fs::remove_file(&sock_path);
let listener = match UnixListener::bind(&sock_path) {
Ok(l) => l,
Err(e) => {
tracing::warn!("failed to bind log drain socket: {e}");
return;
}
};
std::thread::spawn(move || {
for stream in listener.incoming() {
let stream = match stream {
Ok(s) => s,
Err(_) => break,
};
match recv_drain_fds(&stream) {
Ok(fds) => {
for fd in fds {
std::thread::spawn(move || {
let mut reader = File::from(fd);
let mut buf = [0u8; CAP_1_KiB];
loop {
match reader.read(&mut buf) {
Ok(0) | Err(_) => break,
Ok(n) => {
let stderr = std::io::stderr();
let mut out = stderr.lock();
out.write_all(&buf[..n]).ok();
}
}
}
});
}
}
Err(e) => {
tracing::warn!("failed to receive drain fds: {e}");
}
}
}
});
}
#[derive(Debug, Clone, Serialize, Deserialize, Parser)] #[derive(Debug, Clone, Serialize, Deserialize, Parser)]
#[group(skip)] #[group(skip)]
pub struct ExecParams { pub struct ExecParams {
@@ -331,6 +458,7 @@ pub fn launch(
use std::io::Write; use std::io::Write;
kill_init(Path::new("/proc"), &chroot)?; kill_init(Path::new("/proc"), &chroot)?;
start_drain_listener(&chroot);
let mut sig = signal_hook::iterator::Signals::new(FWD_SIGNALS)?; let mut sig = signal_hook::iterator::Signals::new(FWD_SIGNALS)?;
let (send_pid, recv_pid) = oneshot::channel(); let (send_pid, recv_pid) = oneshot::channel();
std::thread::spawn(move || { std::thread::spawn(move || {
@@ -507,8 +635,6 @@ pub fn launch(
stderr_send stderr_send
.send(Box::new(child.stderr.take().unwrap())) .send(Box::new(child.stderr.take().unwrap()))
.unwrap_or_default(); .unwrap_or_default();
// TODO: subreaping, signal handling
let exit = child let exit = child
.wait() .wait()
.with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?; .with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?;
@@ -628,7 +754,7 @@ pub fn exec(
} }
}); });
let (stdout_send, stdout_recv) = oneshot::channel::<Box<dyn std::io::Read + Send>>(); let (stdout_send, stdout_recv) = oneshot::channel::<Box<dyn std::io::Read + Send>>();
let stdout_thread = std::thread::spawn(move || { let _stdout_thread = std::thread::spawn(move || {
if let Ok(mut cstdout) = stdout_recv.blocking_recv() { if let Ok(mut cstdout) = stdout_recv.blocking_recv() {
if tty { if tty {
let mut stdout = stdout.lock(); let mut stdout = stdout.lock();
@@ -646,7 +772,7 @@ pub fn exec(
} }
}); });
let (stderr_send, stderr_recv) = oneshot::channel::<Box<dyn std::io::Read + Send>>(); let (stderr_send, stderr_recv) = oneshot::channel::<Box<dyn std::io::Read + Send>>();
let stderr_thread = if !stderr_tty { let _stderr_thread = if !stderr_tty {
Some(std::thread::spawn(move || { Some(std::thread::spawn(move || {
if let Ok(mut cstderr) = stderr_recv.blocking_recv() { if let Ok(mut cstderr) = stderr_recv.blocking_recv() {
std::io::copy(&mut cstderr, &mut stderr.lock()).unwrap(); std::io::copy(&mut cstderr, &mut stderr.lock()).unwrap();
@@ -705,6 +831,18 @@ pub fn exec(
}; };
pty.resize(size).with_kind(ErrorKind::Filesystem)?; pty.resize(size).with_kind(ErrorKind::Filesystem)?;
} }
// Dup the PTY master (and separate stderr if piped) for drain handoff
let pty_drain = nix::unistd::dup(&pty)
.with_ctx(|_| (ErrorKind::Filesystem, "dup pty for drain"))?;
let mut drain_fds = vec![pty_drain];
if let Some(stderr) = child.stderr.take() {
let stderr_drain = nix::unistd::dup(&stderr)
.with_ctx(|_| (ErrorKind::Filesystem, "dup stderr for drain"))?;
drain_fds.push(stderr_drain);
stderr_send.send(Box::new(stderr)).unwrap_or_default();
}
let shared = ArcPty(Arc::new(pty)); let shared = ArcPty(Arc::new(pty));
stdin_send stdin_send
.send(Box::new(shared.clone())) .send(Box::new(shared.clone()))
@@ -712,14 +850,23 @@ pub fn exec(
stdout_send stdout_send
.send(Box::new(shared.clone())) .send(Box::new(shared.clone()))
.unwrap_or_default(); .unwrap_or_default();
if let Some(stderr) = child.stderr.take() {
stderr_send.send(Box::new(stderr)).unwrap_or_default();
}
let exit = child let exit = child
.wait() .wait()
.with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?; .with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?;
stdout_thread.join().unwrap();
stderr_thread.map(|t| t.join().unwrap()); // Flush any remaining pipe buffer data to the caller, then hand off
// the dup'd FDs to launch for grandchild log capture.
// We cannot join the I/O threads because grandchildren may hold the
// pipe write ends open indefinitely.
drain_buffer(&drain_fds[0], &mut std::io::stdout().lock());
if drain_fds.len() > 1 {
drain_buffer(&drain_fds[1], &mut std::io::stderr().lock());
}
let raw_fds: Vec<RawFd> = drain_fds.iter().map(|fd| fd.as_raw_fd()).collect();
send_drain_fds(&chroot, &raw_fds);
drop(drain_fds);
if let Some(code) = exit.code() { if let Some(code) = exit.code() {
drop(raw); drop(raw);
std::process::exit(code); std::process::exit(code);
@@ -756,20 +903,40 @@ pub fn exec(
.map_err(color_eyre::eyre::Report::msg) .map_err(color_eyre::eyre::Report::msg)
.with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?; .with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?;
send_pid.send(child.id() as i32).unwrap_or_default(); send_pid.send(child.id() as i32).unwrap_or_default();
// Take pipe handles and dup them for later handoff to the drain listener.
// We dup before sending the originals to I/O threads.
let child_stdout = child.stdout.take().unwrap();
let child_stderr = child.stderr.take().unwrap();
let stdout_drain = nix::unistd::dup(&child_stdout)
.with_ctx(|_| (ErrorKind::Filesystem, "dup stdout for drain"))?;
let stderr_drain = nix::unistd::dup(&child_stderr)
.with_ctx(|_| (ErrorKind::Filesystem, "dup stderr for drain"))?;
stdin_send stdin_send
.send(Box::new(child.stdin.take().unwrap())) .send(Box::new(child.stdin.take().unwrap()))
.unwrap_or_default(); .unwrap_or_default();
stdout_send stdout_send
.send(Box::new(child.stdout.take().unwrap())) .send(Box::new(child_stdout))
.unwrap_or_default(); .unwrap_or_default();
stderr_send stderr_send
.send(Box::new(child.stderr.take().unwrap())) .send(Box::new(child_stderr))
.unwrap_or_default(); .unwrap_or_default();
let exit = child let exit = child
.wait() .wait()
.with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?; .with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?;
stdout_thread.join().unwrap();
stderr_thread.map(|t| t.join().unwrap()); // Flush any remaining pipe buffer data to the caller, then hand off
// the dup'd FDs to launch for grandchild log capture.
// We cannot join the I/O threads because grandchildren may hold the
// pipe write ends open indefinitely.
drain_buffer(&stdout_drain, &mut std::io::stdout().lock());
drain_buffer(&stderr_drain, &mut std::io::stderr().lock());
send_drain_fds(&chroot, &[stdout_drain.as_raw_fd(), stderr_drain.as_raw_fd()]);
drop(stdout_drain);
drop(stderr_drain);
if let Some(code) = exit.code() { if let Some(code) = exit.code() {
std::process::exit(code); std::process::exit(code);
} else if exit.success() || exit.signal() == Some(15) { } else if exit.success() || exit.signal() == Some(15) {
@@ -786,3 +953,165 @@ pub fn exec(
pub fn exec_command(_: ContainerCliContext, params: ExecParams) -> Result<(), Error> { pub fn exec_command(_: ContainerCliContext, params: ExecParams) -> Result<(), Error> {
params.exec() params.exec()
} }
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use std::os::fd::AsFd;
use std::os::unix::net::UnixListener;
use std::sync::atomic::{AtomicU32, Ordering};
static TEST_COUNTER: AtomicU32 = AtomicU32::new(0);
fn test_dir() -> PathBuf {
let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
let dir = std::env::temp_dir().join(format!(
"startos-test-drain-{}-{}",
std::process::id(),
n,
));
std::fs::create_dir_all(&dir).unwrap();
dir
}
/// Create an anonymous pipe, returning (read_end, write_end) as OwnedFds.
fn pipe() -> (OwnedFd, OwnedFd) {
let (r, w) = nix::unistd::pipe().unwrap();
(r, w)
}
#[test]
fn test_send_recv_drain_fds() {
let dir = test_dir();
let sock_path = dir.join(LOG_DRAIN_SOCK);
let listener = UnixListener::bind(&sock_path).unwrap();
let (pipe_r, pipe_w) = pipe();
let raw_fd = pipe_r.as_raw_fd();
// Send the read end of the pipe
let send_thread = std::thread::spawn(move || {
let stream = std::os::unix::net::UnixStream::connect(&sock_path).unwrap();
let cmsg = [ControlMessage::ScmRights(&[raw_fd])];
let iov = [std::io::IoSlice::new(&[0u8])];
sendmsg::<()>(stream.as_raw_fd(), &iov, &cmsg, MsgFlags::empty(), None).unwrap();
drop(pipe_r); // close our copy after sending
});
// Receive on the listener side
let (stream, _) = listener.accept().unwrap();
let fds = recv_drain_fds(&stream).unwrap();
send_thread.join().unwrap();
assert_eq!(fds.len(), 1, "should receive exactly one FD");
// Write through the pipe and read from the received FD
let mut writer = File::from(pipe_w);
writer.write_all(b"hello from pipe").unwrap();
drop(writer);
let mut reader = File::from(fds.into_iter().next().unwrap());
let mut buf = String::new();
std::io::Read::read_to_string(&mut reader, &mut buf).unwrap();
assert_eq!(buf, "hello from pipe");
}
#[test]
fn test_drain_buffer_reads_available_data() {
let (pipe_r, pipe_w) = pipe();
let mut writer = File::from(pipe_w);
writer.write_all(b"buffered data").unwrap();
drop(writer);
let mut output = Vec::new();
drain_buffer(&pipe_r, &mut output);
assert_eq!(output, b"buffered data");
}
#[test]
fn test_drain_buffer_returns_immediately_on_empty_pipe() {
let (pipe_r, _pipe_w) = pipe();
// Pipe is open but empty — should return immediately (EAGAIN)
let mut output = Vec::new();
drain_buffer(&pipe_r, &mut output);
assert!(output.is_empty(), "empty pipe should yield no data");
}
#[test]
fn test_drain_buffer_restores_blocking_mode() {
let (pipe_r, _pipe_w) = pipe();
let flags_before =
nix::fcntl::fcntl(pipe_r.as_fd(), nix::fcntl::FcntlArg::F_GETFL).unwrap();
drain_buffer(&pipe_r, &mut Vec::new());
let flags_after =
nix::fcntl::fcntl(pipe_r.as_fd(), nix::fcntl::FcntlArg::F_GETFL).unwrap();
assert_eq!(flags_before, flags_after, "flags should be restored");
}
#[test]
fn test_dup_preserves_pipe_after_original_closed() {
// Simulates the real flow: dup the pipe, close the original,
// verify data is still readable from the dup'd FD.
let (pipe_r, pipe_w) = pipe();
let drain_fd = nix::unistd::dup(pipe_r.as_fd()).unwrap();
let mut writer = File::from(pipe_w);
writer.write_all(b"child output").unwrap();
drop(writer);
// Close original — pipe stays alive through drain_fd
drop(pipe_r);
let mut reader = File::from(drain_fd);
let mut buf = String::new();
std::io::Read::read_to_string(&mut reader, &mut buf).unwrap();
assert_eq!(buf, "child output");
}
#[test]
fn test_start_drain_listener_end_to_end() {
// Full integration: start the listener, send a pipe FD, write data,
// verify the listener drains it.
let chroot = test_dir();
start_drain_listener(&chroot);
// Give the listener thread a moment to bind
std::thread::sleep(std::time::Duration::from_millis(50));
let (pipe_r, pipe_w) = pipe();
// Send the pipe read end to the drain listener
send_drain_fds(&chroot, &[pipe_r.as_raw_fd()]);
drop(pipe_r); // close our copy; listener has its own
// Write data — the listener should drain it (to stderr in production,
// but we just verify the pipe doesn't block/hang)
let mut writer = File::from(pipe_w);
for _ in 0..100 {
writer.write_all(b"log line\n").unwrap();
}
drop(writer);
// Give the drain thread time to process
std::thread::sleep(std::time::Duration::from_millis(100));
// If we get here without hanging, the test passes.
}
#[test]
fn test_send_drain_fds_no_socket_does_not_panic() {
// When no drain socket exists, send_drain_fds should silently return
let dir = test_dir();
let (pipe_r, _pipe_w) = pipe();
send_drain_fds(&dir, &[pipe_r.as_raw_fd()]);
// No panic, no error — best-effort
}
}