diff --git a/core/Cargo.toml b/core/Cargo.toml index 77c189f93..4566d666e 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -161,6 +161,7 @@ nix = { version = "0.30.1", features = [ "process", "sched", "signal", + "socket", "user", ] } nom = "8.0.0" diff --git a/core/src/service/effects/subcontainer/sync.rs b/core/src/service/effects/subcontainer/sync.rs index c4d27994a..8b91c33a8 100644 --- a/core/src/service/effects/subcontainer/sync.rs +++ b/core/src/service/effects/subcontainer/sync.rs @@ -1,6 +1,7 @@ use std::ffi::{OsStr, OsString, c_int}; use std::fs::File; use std::io::{BufRead, BufReader, IsTerminal, Read}; +use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}; use std::os::unix::process::{CommandExt, ExitStatusExt}; use std::path::{Path, PathBuf}; use std::process::{Command as StdCommand, Stdio}; @@ -8,6 +9,9 @@ use std::sync::Arc; use nix::errno::Errno; use nix::sched::CloneFlags; +use nix::sys::socket::{ + recvmsg, sendmsg, ControlMessage, ControlMessageOwned, MsgFlags, +}; use nix::unistd::Pid; use signal_hook::consts::signal::*; use termion::raw::IntoRawMode; @@ -104,6 +108,129 @@ fn open_file_read(path: impl AsRef) -> Result { }) } +const LOG_DRAIN_SOCK: &str = ".log-drain.sock"; + +fn log_drain_sock_path(chroot: &Path) -> PathBuf { + chroot.join(LOG_DRAIN_SOCK) +} + +/// Send file descriptors to the log drain socket via SCM_RIGHTS. +/// Best-effort: silently returns on any failure. +fn send_drain_fds(chroot: &Path, fds: &[RawFd]) { + let sock_path = log_drain_sock_path(chroot); + let stream = match std::os::unix::net::UnixStream::connect(&sock_path) { + Ok(s) => s, + Err(e) => { + tracing::debug!("no log drain socket: {e}"); + return; + } + }; + let cmsg = [ControlMessage::ScmRights(fds)]; + let iov = [std::io::IoSlice::new(&[0u8])]; + if let Err(e) = sendmsg::<()>(stream.as_raw_fd(), &iov, &cmsg, MsgFlags::empty(), None) { + tracing::warn!("failed to send drain fds: {e}"); + } +} + +/// Receive file descriptors from a Unix stream via SCM_RIGHTS. +fn recv_drain_fds(stream: &std::os::unix::net::UnixStream) -> Result, nix::Error> { + let mut buf = [0u8; 1]; + let mut iov = [std::io::IoSliceMut::new(&mut buf)]; + let mut cmsg_buf = nix::cmsg_space!([RawFd; 4]); + let msg = recvmsg::<()>( + stream.as_raw_fd(), + &mut iov, + Some(&mut cmsg_buf), + MsgFlags::empty(), + )?; + let mut fds = Vec::new(); + for cmsg in msg.cmsgs()? { + if let ControlMessageOwned::ScmRights(received) = cmsg { + for fd in received { + fds.push(unsafe { OwnedFd::from_raw_fd(fd) }); + } + } + } + Ok(fds) +} + +/// Non-blocking drain of the pipe/pty buffer into `out`. The dup'd FD shares +/// the same pipe as the I/O thread, so both consumers write to the caller's +/// stdout/stderr — no data is lost. +fn drain_buffer(fd: &OwnedFd, out: &mut impl std::io::Write) { + use std::os::fd::AsFd; + let flags = nix::fcntl::fcntl(fd.as_fd(), nix::fcntl::FcntlArg::F_GETFL).unwrap_or(0); + let _ = nix::fcntl::fcntl( + fd.as_fd(), + nix::fcntl::FcntlArg::F_SETFL( + nix::fcntl::OFlag::from_bits_truncate(flags) | nix::fcntl::OFlag::O_NONBLOCK, + ), + ); + let mut buf = [0u8; CAP_1_KiB]; + loop { + match nix::unistd::read(fd.as_fd(), &mut buf) { + Ok(0) | Err(Errno::EAGAIN) | Err(_) => break, + Ok(n) => { + out.write_all(&buf[..n]).ok(); + } + } + } + out.flush().ok(); + let _ = nix::fcntl::fcntl( + fd.as_fd(), + nix::fcntl::FcntlArg::F_SETFL(nix::fcntl::OFlag::from_bits_truncate(flags)), + ); +} + +/// Start a Unix socket listener that accepts FD handoffs from `exec` processes. +/// Received FDs are drained to stderr (which flows to container-runtime's journald). +fn start_drain_listener(chroot: &Path) { + use std::io::{Read, Write}; + use std::os::unix::net::UnixListener; + + let sock_path = log_drain_sock_path(chroot); + let _ = std::fs::remove_file(&sock_path); + let listener = match UnixListener::bind(&sock_path) { + Ok(l) => l, + Err(e) => { + tracing::warn!("failed to bind log drain socket: {e}"); + return; + } + }; + + std::thread::spawn(move || { + for stream in listener.incoming() { + let stream = match stream { + Ok(s) => s, + Err(_) => break, + }; + match recv_drain_fds(&stream) { + Ok(fds) => { + for fd in fds { + std::thread::spawn(move || { + let mut reader = File::from(fd); + let mut buf = [0u8; CAP_1_KiB]; + loop { + match reader.read(&mut buf) { + Ok(0) | Err(_) => break, + Ok(n) => { + let stderr = std::io::stderr(); + let mut out = stderr.lock(); + out.write_all(&buf[..n]).ok(); + } + } + } + }); + } + } + Err(e) => { + tracing::warn!("failed to receive drain fds: {e}"); + } + } + } + }); +} + #[derive(Debug, Clone, Serialize, Deserialize, Parser)] #[group(skip)] pub struct ExecParams { @@ -331,6 +458,7 @@ pub fn launch( use std::io::Write; kill_init(Path::new("/proc"), &chroot)?; + start_drain_listener(&chroot); let mut sig = signal_hook::iterator::Signals::new(FWD_SIGNALS)?; let (send_pid, recv_pid) = oneshot::channel(); std::thread::spawn(move || { @@ -507,8 +635,6 @@ pub fn launch( stderr_send .send(Box::new(child.stderr.take().unwrap())) .unwrap_or_default(); - - // TODO: subreaping, signal handling let exit = child .wait() .with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?; @@ -628,7 +754,7 @@ pub fn exec( } }); let (stdout_send, stdout_recv) = oneshot::channel::>(); - let stdout_thread = std::thread::spawn(move || { + let _stdout_thread = std::thread::spawn(move || { if let Ok(mut cstdout) = stdout_recv.blocking_recv() { if tty { let mut stdout = stdout.lock(); @@ -646,7 +772,7 @@ pub fn exec( } }); let (stderr_send, stderr_recv) = oneshot::channel::>(); - let stderr_thread = if !stderr_tty { + let _stderr_thread = if !stderr_tty { Some(std::thread::spawn(move || { if let Ok(mut cstderr) = stderr_recv.blocking_recv() { std::io::copy(&mut cstderr, &mut stderr.lock()).unwrap(); @@ -705,6 +831,18 @@ pub fn exec( }; pty.resize(size).with_kind(ErrorKind::Filesystem)?; } + + // Dup the PTY master (and separate stderr if piped) for drain handoff + let pty_drain = nix::unistd::dup(&pty) + .with_ctx(|_| (ErrorKind::Filesystem, "dup pty for drain"))?; + let mut drain_fds = vec![pty_drain]; + if let Some(stderr) = child.stderr.take() { + let stderr_drain = nix::unistd::dup(&stderr) + .with_ctx(|_| (ErrorKind::Filesystem, "dup stderr for drain"))?; + drain_fds.push(stderr_drain); + stderr_send.send(Box::new(stderr)).unwrap_or_default(); + } + let shared = ArcPty(Arc::new(pty)); stdin_send .send(Box::new(shared.clone())) @@ -712,14 +850,23 @@ pub fn exec( stdout_send .send(Box::new(shared.clone())) .unwrap_or_default(); - if let Some(stderr) = child.stderr.take() { - stderr_send.send(Box::new(stderr)).unwrap_or_default(); - } + let exit = child .wait() .with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?; - stdout_thread.join().unwrap(); - stderr_thread.map(|t| t.join().unwrap()); + + // Flush any remaining pipe buffer data to the caller, then hand off + // the dup'd FDs to launch for grandchild log capture. + // We cannot join the I/O threads because grandchildren may hold the + // pipe write ends open indefinitely. + drain_buffer(&drain_fds[0], &mut std::io::stdout().lock()); + if drain_fds.len() > 1 { + drain_buffer(&drain_fds[1], &mut std::io::stderr().lock()); + } + let raw_fds: Vec = drain_fds.iter().map(|fd| fd.as_raw_fd()).collect(); + send_drain_fds(&chroot, &raw_fds); + drop(drain_fds); + if let Some(code) = exit.code() { drop(raw); std::process::exit(code); @@ -756,20 +903,40 @@ pub fn exec( .map_err(color_eyre::eyre::Report::msg) .with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?; send_pid.send(child.id() as i32).unwrap_or_default(); + + // Take pipe handles and dup them for later handoff to the drain listener. + // We dup before sending the originals to I/O threads. + let child_stdout = child.stdout.take().unwrap(); + let child_stderr = child.stderr.take().unwrap(); + let stdout_drain = nix::unistd::dup(&child_stdout) + .with_ctx(|_| (ErrorKind::Filesystem, "dup stdout for drain"))?; + let stderr_drain = nix::unistd::dup(&child_stderr) + .with_ctx(|_| (ErrorKind::Filesystem, "dup stderr for drain"))?; + stdin_send .send(Box::new(child.stdin.take().unwrap())) .unwrap_or_default(); stdout_send - .send(Box::new(child.stdout.take().unwrap())) + .send(Box::new(child_stdout)) .unwrap_or_default(); stderr_send - .send(Box::new(child.stderr.take().unwrap())) + .send(Box::new(child_stderr)) .unwrap_or_default(); + let exit = child .wait() .with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?; - stdout_thread.join().unwrap(); - stderr_thread.map(|t| t.join().unwrap()); + + // Flush any remaining pipe buffer data to the caller, then hand off + // the dup'd FDs to launch for grandchild log capture. + // We cannot join the I/O threads because grandchildren may hold the + // pipe write ends open indefinitely. + drain_buffer(&stdout_drain, &mut std::io::stdout().lock()); + drain_buffer(&stderr_drain, &mut std::io::stderr().lock()); + send_drain_fds(&chroot, &[stdout_drain.as_raw_fd(), stderr_drain.as_raw_fd()]); + drop(stdout_drain); + drop(stderr_drain); + if let Some(code) = exit.code() { std::process::exit(code); } else if exit.success() || exit.signal() == Some(15) { @@ -786,3 +953,165 @@ pub fn exec( pub fn exec_command(_: ContainerCliContext, params: ExecParams) -> Result<(), Error> { params.exec() } + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Write; + use std::os::fd::AsFd; + use std::os::unix::net::UnixListener; + use std::sync::atomic::{AtomicU32, Ordering}; + + static TEST_COUNTER: AtomicU32 = AtomicU32::new(0); + + fn test_dir() -> PathBuf { + let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed); + let dir = std::env::temp_dir().join(format!( + "startos-test-drain-{}-{}", + std::process::id(), + n, + )); + std::fs::create_dir_all(&dir).unwrap(); + dir + } + + /// Create an anonymous pipe, returning (read_end, write_end) as OwnedFds. + fn pipe() -> (OwnedFd, OwnedFd) { + let (r, w) = nix::unistd::pipe().unwrap(); + (r, w) + } + + #[test] + fn test_send_recv_drain_fds() { + let dir = test_dir(); + let sock_path = dir.join(LOG_DRAIN_SOCK); + let listener = UnixListener::bind(&sock_path).unwrap(); + + let (pipe_r, pipe_w) = pipe(); + let raw_fd = pipe_r.as_raw_fd(); + + // Send the read end of the pipe + let send_thread = std::thread::spawn(move || { + let stream = std::os::unix::net::UnixStream::connect(&sock_path).unwrap(); + let cmsg = [ControlMessage::ScmRights(&[raw_fd])]; + let iov = [std::io::IoSlice::new(&[0u8])]; + sendmsg::<()>(stream.as_raw_fd(), &iov, &cmsg, MsgFlags::empty(), None).unwrap(); + drop(pipe_r); // close our copy after sending + }); + + // Receive on the listener side + let (stream, _) = listener.accept().unwrap(); + let fds = recv_drain_fds(&stream).unwrap(); + send_thread.join().unwrap(); + + assert_eq!(fds.len(), 1, "should receive exactly one FD"); + + // Write through the pipe and read from the received FD + let mut writer = File::from(pipe_w); + writer.write_all(b"hello from pipe").unwrap(); + drop(writer); + + let mut reader = File::from(fds.into_iter().next().unwrap()); + let mut buf = String::new(); + std::io::Read::read_to_string(&mut reader, &mut buf).unwrap(); + assert_eq!(buf, "hello from pipe"); + } + + #[test] + fn test_drain_buffer_reads_available_data() { + let (pipe_r, pipe_w) = pipe(); + + let mut writer = File::from(pipe_w); + writer.write_all(b"buffered data").unwrap(); + drop(writer); + + let mut output = Vec::new(); + drain_buffer(&pipe_r, &mut output); + + assert_eq!(output, b"buffered data"); + } + + #[test] + fn test_drain_buffer_returns_immediately_on_empty_pipe() { + let (pipe_r, _pipe_w) = pipe(); + + // Pipe is open but empty — should return immediately (EAGAIN) + let mut output = Vec::new(); + drain_buffer(&pipe_r, &mut output); + + assert!(output.is_empty(), "empty pipe should yield no data"); + } + + #[test] + fn test_drain_buffer_restores_blocking_mode() { + let (pipe_r, _pipe_w) = pipe(); + + let flags_before = + nix::fcntl::fcntl(pipe_r.as_fd(), nix::fcntl::FcntlArg::F_GETFL).unwrap(); + drain_buffer(&pipe_r, &mut Vec::new()); + let flags_after = + nix::fcntl::fcntl(pipe_r.as_fd(), nix::fcntl::FcntlArg::F_GETFL).unwrap(); + + assert_eq!(flags_before, flags_after, "flags should be restored"); + } + + #[test] + fn test_dup_preserves_pipe_after_original_closed() { + // Simulates the real flow: dup the pipe, close the original, + // verify data is still readable from the dup'd FD. + let (pipe_r, pipe_w) = pipe(); + + let drain_fd = nix::unistd::dup(pipe_r.as_fd()).unwrap(); + + let mut writer = File::from(pipe_w); + writer.write_all(b"child output").unwrap(); + drop(writer); + + // Close original — pipe stays alive through drain_fd + drop(pipe_r); + + let mut reader = File::from(drain_fd); + let mut buf = String::new(); + std::io::Read::read_to_string(&mut reader, &mut buf).unwrap(); + assert_eq!(buf, "child output"); + } + + #[test] + fn test_start_drain_listener_end_to_end() { + // Full integration: start the listener, send a pipe FD, write data, + // verify the listener drains it. + let chroot = test_dir(); + + start_drain_listener(&chroot); + + // Give the listener thread a moment to bind + std::thread::sleep(std::time::Duration::from_millis(50)); + + let (pipe_r, pipe_w) = pipe(); + + // Send the pipe read end to the drain listener + send_drain_fds(&chroot, &[pipe_r.as_raw_fd()]); + drop(pipe_r); // close our copy; listener has its own + + // Write data — the listener should drain it (to stderr in production, + // but we just verify the pipe doesn't block/hang) + let mut writer = File::from(pipe_w); + for _ in 0..100 { + writer.write_all(b"log line\n").unwrap(); + } + drop(writer); + + // Give the drain thread time to process + std::thread::sleep(std::time::Duration::from_millis(100)); + // If we get here without hanging, the test passes. + } + + #[test] + fn test_send_drain_fds_no_socket_does_not_panic() { + // When no drain socket exists, send_drain_fds should silently return + let dir = test_dir(); + let (pipe_r, _pipe_w) = pipe(); + send_drain_fds(&dir, &[pipe_r.as_raw_fd()]); + // No panic, no error — best-effort + } +}