diff --git a/core/Cargo.toml b/core/Cargo.toml index 4566d666e..77c189f93 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -161,7 +161,6 @@ nix = { version = "0.30.1", features = [ "process", "sched", "signal", - "socket", "user", ] } nom = "8.0.0" diff --git a/core/src/service/effects/subcontainer/sync.rs b/core/src/service/effects/subcontainer/sync.rs index 8b91c33a8..e1205a1a1 100644 --- a/core/src/service/effects/subcontainer/sync.rs +++ b/core/src/service/effects/subcontainer/sync.rs @@ -1,7 +1,6 @@ use std::ffi::{OsStr, OsString, c_int}; use std::fs::File; use std::io::{BufRead, BufReader, IsTerminal, Read}; -use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}; use std::os::unix::process::{CommandExt, ExitStatusExt}; use std::path::{Path, PathBuf}; use std::process::{Command as StdCommand, Stdio}; @@ -9,9 +8,6 @@ use std::sync::Arc; use nix::errno::Errno; use nix::sched::CloneFlags; -use nix::sys::socket::{ - recvmsg, sendmsg, ControlMessage, ControlMessageOwned, MsgFlags, -}; use nix::unistd::Pid; use signal_hook::consts::signal::*; use termion::raw::IntoRawMode; @@ -108,129 +104,6 @@ fn open_file_read(path: impl AsRef) -> Result { }) } -const LOG_DRAIN_SOCK: &str = ".log-drain.sock"; - -fn log_drain_sock_path(chroot: &Path) -> PathBuf { - chroot.join(LOG_DRAIN_SOCK) -} - -/// Send file descriptors to the log drain socket via SCM_RIGHTS. -/// Best-effort: silently returns on any failure. -fn send_drain_fds(chroot: &Path, fds: &[RawFd]) { - let sock_path = log_drain_sock_path(chroot); - let stream = match std::os::unix::net::UnixStream::connect(&sock_path) { - Ok(s) => s, - Err(e) => { - tracing::debug!("no log drain socket: {e}"); - return; - } - }; - let cmsg = [ControlMessage::ScmRights(fds)]; - let iov = [std::io::IoSlice::new(&[0u8])]; - if let Err(e) = sendmsg::<()>(stream.as_raw_fd(), &iov, &cmsg, MsgFlags::empty(), None) { - tracing::warn!("failed to send drain fds: {e}"); - } -} - -/// Receive file descriptors from a Unix stream via SCM_RIGHTS. -fn recv_drain_fds(stream: &std::os::unix::net::UnixStream) -> Result, nix::Error> { - let mut buf = [0u8; 1]; - let mut iov = [std::io::IoSliceMut::new(&mut buf)]; - let mut cmsg_buf = nix::cmsg_space!([RawFd; 4]); - let msg = recvmsg::<()>( - stream.as_raw_fd(), - &mut iov, - Some(&mut cmsg_buf), - MsgFlags::empty(), - )?; - let mut fds = Vec::new(); - for cmsg in msg.cmsgs()? { - if let ControlMessageOwned::ScmRights(received) = cmsg { - for fd in received { - fds.push(unsafe { OwnedFd::from_raw_fd(fd) }); - } - } - } - Ok(fds) -} - -/// Non-blocking drain of the pipe/pty buffer into `out`. The dup'd FD shares -/// the same pipe as the I/O thread, so both consumers write to the caller's -/// stdout/stderr — no data is lost. -fn drain_buffer(fd: &OwnedFd, out: &mut impl std::io::Write) { - use std::os::fd::AsFd; - let flags = nix::fcntl::fcntl(fd.as_fd(), nix::fcntl::FcntlArg::F_GETFL).unwrap_or(0); - let _ = nix::fcntl::fcntl( - fd.as_fd(), - nix::fcntl::FcntlArg::F_SETFL( - nix::fcntl::OFlag::from_bits_truncate(flags) | nix::fcntl::OFlag::O_NONBLOCK, - ), - ); - let mut buf = [0u8; CAP_1_KiB]; - loop { - match nix::unistd::read(fd.as_fd(), &mut buf) { - Ok(0) | Err(Errno::EAGAIN) | Err(_) => break, - Ok(n) => { - out.write_all(&buf[..n]).ok(); - } - } - } - out.flush().ok(); - let _ = nix::fcntl::fcntl( - fd.as_fd(), - nix::fcntl::FcntlArg::F_SETFL(nix::fcntl::OFlag::from_bits_truncate(flags)), - ); -} - -/// Start a Unix socket listener that accepts FD handoffs from `exec` processes. -/// Received FDs are drained to stderr (which flows to container-runtime's journald). -fn start_drain_listener(chroot: &Path) { - use std::io::{Read, Write}; - use std::os::unix::net::UnixListener; - - let sock_path = log_drain_sock_path(chroot); - let _ = std::fs::remove_file(&sock_path); - let listener = match UnixListener::bind(&sock_path) { - Ok(l) => l, - Err(e) => { - tracing::warn!("failed to bind log drain socket: {e}"); - return; - } - }; - - std::thread::spawn(move || { - for stream in listener.incoming() { - let stream = match stream { - Ok(s) => s, - Err(_) => break, - }; - match recv_drain_fds(&stream) { - Ok(fds) => { - for fd in fds { - std::thread::spawn(move || { - let mut reader = File::from(fd); - let mut buf = [0u8; CAP_1_KiB]; - loop { - match reader.read(&mut buf) { - Ok(0) | Err(_) => break, - Ok(n) => { - let stderr = std::io::stderr(); - let mut out = stderr.lock(); - out.write_all(&buf[..n]).ok(); - } - } - } - }); - } - } - Err(e) => { - tracing::warn!("failed to receive drain fds: {e}"); - } - } - } - }); -} - #[derive(Debug, Clone, Serialize, Deserialize, Parser)] #[group(skip)] pub struct ExecParams { @@ -396,13 +269,6 @@ impl ExecParams { std::os::unix::fs::chroot(chroot) .with_ctx(|_| (ErrorKind::Filesystem, lazy_format!("chroot {chroot:?}")))?; - if let Ok(uid) = uid { - if uid != 0 { - std::os::unix::fs::chown("/proc/self/fd/0", Some(uid), gid.ok()).ok(); - std::os::unix::fs::chown("/proc/self/fd/1", Some(uid), gid.ok()).ok(); - std::os::unix::fs::chown("/proc/self/fd/2", Some(uid), gid.ok()).ok(); - } - } // Handle credential changes in pre_exec to control the order: // setgroups must happen before setgid/setuid (requires CAP_SETGID) { @@ -426,6 +292,10 @@ impl ExecParams { nix::unistd::setuid(nix::unistd::Uid::from_raw(uid)) .map_err(|e| std::io::Error::from_raw_os_error(e as i32))?; } + // Restore dumpable flag cleared by setuid so that + // /proc/self/fd/* is owned by the current uid and + // /dev/stderr works for the target user. + libc::prctl(libc::PR_SET_DUMPABLE, 1, 0, 0, 0); Ok(()) }); } @@ -458,7 +328,6 @@ pub fn launch( use std::io::Write; kill_init(Path::new("/proc"), &chroot)?; - start_drain_listener(&chroot); let mut sig = signal_hook::iterator::Signals::new(FWD_SIGNALS)?; let (send_pid, recv_pid) = oneshot::channel(); std::thread::spawn(move || { @@ -754,7 +623,7 @@ pub fn exec( } }); let (stdout_send, stdout_recv) = oneshot::channel::>(); - let _stdout_thread = std::thread::spawn(move || { + let stdout_thread = std::thread::spawn(move || { if let Ok(mut cstdout) = stdout_recv.blocking_recv() { if tty { let mut stdout = stdout.lock(); @@ -772,7 +641,7 @@ pub fn exec( } }); let (stderr_send, stderr_recv) = oneshot::channel::>(); - let _stderr_thread = if !stderr_tty { + let stderr_thread = if !stderr_tty { Some(std::thread::spawn(move || { if let Ok(mut cstderr) = stderr_recv.blocking_recv() { std::io::copy(&mut cstderr, &mut stderr.lock()).unwrap(); @@ -831,18 +700,6 @@ pub fn exec( }; pty.resize(size).with_kind(ErrorKind::Filesystem)?; } - - // Dup the PTY master (and separate stderr if piped) for drain handoff - let pty_drain = nix::unistd::dup(&pty) - .with_ctx(|_| (ErrorKind::Filesystem, "dup pty for drain"))?; - let mut drain_fds = vec![pty_drain]; - if let Some(stderr) = child.stderr.take() { - let stderr_drain = nix::unistd::dup(&stderr) - .with_ctx(|_| (ErrorKind::Filesystem, "dup stderr for drain"))?; - drain_fds.push(stderr_drain); - stderr_send.send(Box::new(stderr)).unwrap_or_default(); - } - let shared = ArcPty(Arc::new(pty)); stdin_send .send(Box::new(shared.clone())) @@ -850,23 +707,14 @@ pub fn exec( stdout_send .send(Box::new(shared.clone())) .unwrap_or_default(); - + if let Some(stderr) = child.stderr.take() { + stderr_send.send(Box::new(stderr)).unwrap_or_default(); + } let exit = child .wait() .with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?; - - // Flush any remaining pipe buffer data to the caller, then hand off - // the dup'd FDs to launch for grandchild log capture. - // We cannot join the I/O threads because grandchildren may hold the - // pipe write ends open indefinitely. - drain_buffer(&drain_fds[0], &mut std::io::stdout().lock()); - if drain_fds.len() > 1 { - drain_buffer(&drain_fds[1], &mut std::io::stderr().lock()); - } - let raw_fds: Vec = drain_fds.iter().map(|fd| fd.as_raw_fd()).collect(); - send_drain_fds(&chroot, &raw_fds); - drop(drain_fds); - + stdout_thread.join().unwrap(); + stderr_thread.map(|t| t.join().unwrap()); if let Some(code) = exit.code() { drop(raw); std::process::exit(code); @@ -895,48 +743,14 @@ pub fn exec( } cmd.arg(&chroot); cmd.args(&command); - cmd.stdin(Stdio::piped()); - cmd.stdout(Stdio::piped()); - cmd.stderr(Stdio::piped()); let mut child = cmd .spawn() .map_err(color_eyre::eyre::Report::msg) .with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?; send_pid.send(child.id() as i32).unwrap_or_default(); - - // Take pipe handles and dup them for later handoff to the drain listener. - // We dup before sending the originals to I/O threads. - let child_stdout = child.stdout.take().unwrap(); - let child_stderr = child.stderr.take().unwrap(); - let stdout_drain = nix::unistd::dup(&child_stdout) - .with_ctx(|_| (ErrorKind::Filesystem, "dup stdout for drain"))?; - let stderr_drain = nix::unistd::dup(&child_stderr) - .with_ctx(|_| (ErrorKind::Filesystem, "dup stderr for drain"))?; - - stdin_send - .send(Box::new(child.stdin.take().unwrap())) - .unwrap_or_default(); - stdout_send - .send(Box::new(child_stdout)) - .unwrap_or_default(); - stderr_send - .send(Box::new(child_stderr)) - .unwrap_or_default(); - let exit = child .wait() .with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?; - - // Flush any remaining pipe buffer data to the caller, then hand off - // the dup'd FDs to launch for grandchild log capture. - // We cannot join the I/O threads because grandchildren may hold the - // pipe write ends open indefinitely. - drain_buffer(&stdout_drain, &mut std::io::stdout().lock()); - drain_buffer(&stderr_drain, &mut std::io::stderr().lock()); - send_drain_fds(&chroot, &[stdout_drain.as_raw_fd(), stderr_drain.as_raw_fd()]); - drop(stdout_drain); - drop(stderr_drain); - if let Some(code) = exit.code() { std::process::exit(code); } else if exit.success() || exit.signal() == Some(15) { @@ -953,165 +767,3 @@ pub fn exec( pub fn exec_command(_: ContainerCliContext, params: ExecParams) -> Result<(), Error> { params.exec() } - -#[cfg(test)] -mod tests { - use super::*; - use std::io::Write; - use std::os::fd::AsFd; - use std::os::unix::net::UnixListener; - use std::sync::atomic::{AtomicU32, Ordering}; - - static TEST_COUNTER: AtomicU32 = AtomicU32::new(0); - - fn test_dir() -> PathBuf { - let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed); - let dir = std::env::temp_dir().join(format!( - "startos-test-drain-{}-{}", - std::process::id(), - n, - )); - std::fs::create_dir_all(&dir).unwrap(); - dir - } - - /// Create an anonymous pipe, returning (read_end, write_end) as OwnedFds. - fn pipe() -> (OwnedFd, OwnedFd) { - let (r, w) = nix::unistd::pipe().unwrap(); - (r, w) - } - - #[test] - fn test_send_recv_drain_fds() { - let dir = test_dir(); - let sock_path = dir.join(LOG_DRAIN_SOCK); - let listener = UnixListener::bind(&sock_path).unwrap(); - - let (pipe_r, pipe_w) = pipe(); - let raw_fd = pipe_r.as_raw_fd(); - - // Send the read end of the pipe - let send_thread = std::thread::spawn(move || { - let stream = std::os::unix::net::UnixStream::connect(&sock_path).unwrap(); - let cmsg = [ControlMessage::ScmRights(&[raw_fd])]; - let iov = [std::io::IoSlice::new(&[0u8])]; - sendmsg::<()>(stream.as_raw_fd(), &iov, &cmsg, MsgFlags::empty(), None).unwrap(); - drop(pipe_r); // close our copy after sending - }); - - // Receive on the listener side - let (stream, _) = listener.accept().unwrap(); - let fds = recv_drain_fds(&stream).unwrap(); - send_thread.join().unwrap(); - - assert_eq!(fds.len(), 1, "should receive exactly one FD"); - - // Write through the pipe and read from the received FD - let mut writer = File::from(pipe_w); - writer.write_all(b"hello from pipe").unwrap(); - drop(writer); - - let mut reader = File::from(fds.into_iter().next().unwrap()); - let mut buf = String::new(); - std::io::Read::read_to_string(&mut reader, &mut buf).unwrap(); - assert_eq!(buf, "hello from pipe"); - } - - #[test] - fn test_drain_buffer_reads_available_data() { - let (pipe_r, pipe_w) = pipe(); - - let mut writer = File::from(pipe_w); - writer.write_all(b"buffered data").unwrap(); - drop(writer); - - let mut output = Vec::new(); - drain_buffer(&pipe_r, &mut output); - - assert_eq!(output, b"buffered data"); - } - - #[test] - fn test_drain_buffer_returns_immediately_on_empty_pipe() { - let (pipe_r, _pipe_w) = pipe(); - - // Pipe is open but empty — should return immediately (EAGAIN) - let mut output = Vec::new(); - drain_buffer(&pipe_r, &mut output); - - assert!(output.is_empty(), "empty pipe should yield no data"); - } - - #[test] - fn test_drain_buffer_restores_blocking_mode() { - let (pipe_r, _pipe_w) = pipe(); - - let flags_before = - nix::fcntl::fcntl(pipe_r.as_fd(), nix::fcntl::FcntlArg::F_GETFL).unwrap(); - drain_buffer(&pipe_r, &mut Vec::new()); - let flags_after = - nix::fcntl::fcntl(pipe_r.as_fd(), nix::fcntl::FcntlArg::F_GETFL).unwrap(); - - assert_eq!(flags_before, flags_after, "flags should be restored"); - } - - #[test] - fn test_dup_preserves_pipe_after_original_closed() { - // Simulates the real flow: dup the pipe, close the original, - // verify data is still readable from the dup'd FD. - let (pipe_r, pipe_w) = pipe(); - - let drain_fd = nix::unistd::dup(pipe_r.as_fd()).unwrap(); - - let mut writer = File::from(pipe_w); - writer.write_all(b"child output").unwrap(); - drop(writer); - - // Close original — pipe stays alive through drain_fd - drop(pipe_r); - - let mut reader = File::from(drain_fd); - let mut buf = String::new(); - std::io::Read::read_to_string(&mut reader, &mut buf).unwrap(); - assert_eq!(buf, "child output"); - } - - #[test] - fn test_start_drain_listener_end_to_end() { - // Full integration: start the listener, send a pipe FD, write data, - // verify the listener drains it. - let chroot = test_dir(); - - start_drain_listener(&chroot); - - // Give the listener thread a moment to bind - std::thread::sleep(std::time::Duration::from_millis(50)); - - let (pipe_r, pipe_w) = pipe(); - - // Send the pipe read end to the drain listener - send_drain_fds(&chroot, &[pipe_r.as_raw_fd()]); - drop(pipe_r); // close our copy; listener has its own - - // Write data — the listener should drain it (to stderr in production, - // but we just verify the pipe doesn't block/hang) - let mut writer = File::from(pipe_w); - for _ in 0..100 { - writer.write_all(b"log line\n").unwrap(); - } - drop(writer); - - // Give the drain thread time to process - std::thread::sleep(std::time::Duration::from_millis(100)); - // If we get here without hanging, the test passes. - } - - #[test] - fn test_send_drain_fds_no_socket_does_not_panic() { - // When no drain socket exists, send_drain_fds should silently return - let dir = test_dir(); - let (pipe_r, _pipe_w) = pipe(); - send_drain_fds(&dir, &[pipe_r.as_raw_fd()]); - // No panic, no error — best-effort - } -} diff --git a/sdk/package/lib/backup/Backups.ts b/sdk/package/lib/backup/Backups.ts index 335c8482f..158ca1037 100644 --- a/sdk/package/lib/backup/Backups.ts +++ b/sdk/package/lib/backup/Backups.ts @@ -220,6 +220,10 @@ export class Backups implements InitScript { async (sub) => { console.log('[pg-dump] mounting backup target') await mountBackupTarget(sub.rootfs) + await sub.exec(['touch', dumpFile], { user: 'root' }) + await sub.exec(['chown', 'postgres:postgres', dumpFile], { + user: 'root', + }) await startPg(sub, 'pg-dump') console.log('[pg-dump] dumping database') await sub.execFail( @@ -244,6 +248,10 @@ export class Backups implements InitScript { 'pg-restore', async (sub) => { await mountBackupTarget(sub.rootfs) + await sub.execFail( + ['chown', '-R', 'postgres:postgres', pgMountpoint], + { user: 'root' }, + ) await sub.execFail( ['initdb', '-D', pgdata, '-U', user, ...initdbArgs], { user: 'postgres' }, @@ -260,6 +268,7 @@ export class Backups implements InitScript { '-d', database, '--no-owner', + '--no-privileges', dumpFile, ], { user: 'postgres' },