fix: pg_dump/pg_restore permission errors in backup subcontainer

- Pre-create and chown dump file for postgres user before pg_dump
- Chown volume mountpoint to postgres before initdb on restore
- Add --no-privileges to pg_restore to skip GRANT/REVOKE for missing roles
This commit is contained in:
Aiden McClelland
2026-03-23 01:13:20 -06:00
parent b7e4df44bf
commit 8d1e11e158
3 changed files with 20 additions and 360 deletions

View File

@@ -161,7 +161,6 @@ nix = { version = "0.30.1", features = [
"process",
"sched",
"signal",
"socket",
"user",
] }
nom = "8.0.0"

View File

@@ -1,7 +1,6 @@
use std::ffi::{OsStr, OsString, c_int};
use std::fs::File;
use std::io::{BufRead, BufReader, IsTerminal, Read};
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
use std::os::unix::process::{CommandExt, ExitStatusExt};
use std::path::{Path, PathBuf};
use std::process::{Command as StdCommand, Stdio};
@@ -9,9 +8,6 @@ use std::sync::Arc;
use nix::errno::Errno;
use nix::sched::CloneFlags;
use nix::sys::socket::{
recvmsg, sendmsg, ControlMessage, ControlMessageOwned, MsgFlags,
};
use nix::unistd::Pid;
use signal_hook::consts::signal::*;
use termion::raw::IntoRawMode;
@@ -108,129 +104,6 @@ fn open_file_read(path: impl AsRef<Path>) -> Result<File, Error> {
})
}
const LOG_DRAIN_SOCK: &str = ".log-drain.sock";
fn log_drain_sock_path(chroot: &Path) -> PathBuf {
chroot.join(LOG_DRAIN_SOCK)
}
/// Send file descriptors to the log drain socket via SCM_RIGHTS.
/// Best-effort: silently returns on any failure.
fn send_drain_fds(chroot: &Path, fds: &[RawFd]) {
let sock_path = log_drain_sock_path(chroot);
let stream = match std::os::unix::net::UnixStream::connect(&sock_path) {
Ok(s) => s,
Err(e) => {
tracing::debug!("no log drain socket: {e}");
return;
}
};
let cmsg = [ControlMessage::ScmRights(fds)];
let iov = [std::io::IoSlice::new(&[0u8])];
if let Err(e) = sendmsg::<()>(stream.as_raw_fd(), &iov, &cmsg, MsgFlags::empty(), None) {
tracing::warn!("failed to send drain fds: {e}");
}
}
/// Receive file descriptors from a Unix stream via SCM_RIGHTS.
fn recv_drain_fds(stream: &std::os::unix::net::UnixStream) -> Result<Vec<OwnedFd>, nix::Error> {
let mut buf = [0u8; 1];
let mut iov = [std::io::IoSliceMut::new(&mut buf)];
let mut cmsg_buf = nix::cmsg_space!([RawFd; 4]);
let msg = recvmsg::<()>(
stream.as_raw_fd(),
&mut iov,
Some(&mut cmsg_buf),
MsgFlags::empty(),
)?;
let mut fds = Vec::new();
for cmsg in msg.cmsgs()? {
if let ControlMessageOwned::ScmRights(received) = cmsg {
for fd in received {
fds.push(unsafe { OwnedFd::from_raw_fd(fd) });
}
}
}
Ok(fds)
}
/// Non-blocking drain of the pipe/pty buffer into `out`. The dup'd FD shares
/// the same pipe as the I/O thread, so both consumers write to the caller's
/// stdout/stderr — no data is lost.
fn drain_buffer(fd: &OwnedFd, out: &mut impl std::io::Write) {
use std::os::fd::AsFd;
let flags = nix::fcntl::fcntl(fd.as_fd(), nix::fcntl::FcntlArg::F_GETFL).unwrap_or(0);
let _ = nix::fcntl::fcntl(
fd.as_fd(),
nix::fcntl::FcntlArg::F_SETFL(
nix::fcntl::OFlag::from_bits_truncate(flags) | nix::fcntl::OFlag::O_NONBLOCK,
),
);
let mut buf = [0u8; CAP_1_KiB];
loop {
match nix::unistd::read(fd.as_fd(), &mut buf) {
Ok(0) | Err(Errno::EAGAIN) | Err(_) => break,
Ok(n) => {
out.write_all(&buf[..n]).ok();
}
}
}
out.flush().ok();
let _ = nix::fcntl::fcntl(
fd.as_fd(),
nix::fcntl::FcntlArg::F_SETFL(nix::fcntl::OFlag::from_bits_truncate(flags)),
);
}
/// Start a Unix socket listener that accepts FD handoffs from `exec` processes.
/// Received FDs are drained to stderr (which flows to container-runtime's journald).
fn start_drain_listener(chroot: &Path) {
use std::io::{Read, Write};
use std::os::unix::net::UnixListener;
let sock_path = log_drain_sock_path(chroot);
let _ = std::fs::remove_file(&sock_path);
let listener = match UnixListener::bind(&sock_path) {
Ok(l) => l,
Err(e) => {
tracing::warn!("failed to bind log drain socket: {e}");
return;
}
};
std::thread::spawn(move || {
for stream in listener.incoming() {
let stream = match stream {
Ok(s) => s,
Err(_) => break,
};
match recv_drain_fds(&stream) {
Ok(fds) => {
for fd in fds {
std::thread::spawn(move || {
let mut reader = File::from(fd);
let mut buf = [0u8; CAP_1_KiB];
loop {
match reader.read(&mut buf) {
Ok(0) | Err(_) => break,
Ok(n) => {
let stderr = std::io::stderr();
let mut out = stderr.lock();
out.write_all(&buf[..n]).ok();
}
}
}
});
}
}
Err(e) => {
tracing::warn!("failed to receive drain fds: {e}");
}
}
}
});
}
#[derive(Debug, Clone, Serialize, Deserialize, Parser)]
#[group(skip)]
pub struct ExecParams {
@@ -396,13 +269,6 @@ impl ExecParams {
std::os::unix::fs::chroot(chroot)
.with_ctx(|_| (ErrorKind::Filesystem, lazy_format!("chroot {chroot:?}")))?;
if let Ok(uid) = uid {
if uid != 0 {
std::os::unix::fs::chown("/proc/self/fd/0", Some(uid), gid.ok()).ok();
std::os::unix::fs::chown("/proc/self/fd/1", Some(uid), gid.ok()).ok();
std::os::unix::fs::chown("/proc/self/fd/2", Some(uid), gid.ok()).ok();
}
}
// Handle credential changes in pre_exec to control the order:
// setgroups must happen before setgid/setuid (requires CAP_SETGID)
{
@@ -426,6 +292,10 @@ impl ExecParams {
nix::unistd::setuid(nix::unistd::Uid::from_raw(uid))
.map_err(|e| std::io::Error::from_raw_os_error(e as i32))?;
}
// Restore dumpable flag cleared by setuid so that
// /proc/self/fd/* is owned by the current uid and
// /dev/stderr works for the target user.
libc::prctl(libc::PR_SET_DUMPABLE, 1, 0, 0, 0);
Ok(())
});
}
@@ -458,7 +328,6 @@ pub fn launch(
use std::io::Write;
kill_init(Path::new("/proc"), &chroot)?;
start_drain_listener(&chroot);
let mut sig = signal_hook::iterator::Signals::new(FWD_SIGNALS)?;
let (send_pid, recv_pid) = oneshot::channel();
std::thread::spawn(move || {
@@ -754,7 +623,7 @@ pub fn exec(
}
});
let (stdout_send, stdout_recv) = oneshot::channel::<Box<dyn std::io::Read + Send>>();
let _stdout_thread = std::thread::spawn(move || {
let stdout_thread = std::thread::spawn(move || {
if let Ok(mut cstdout) = stdout_recv.blocking_recv() {
if tty {
let mut stdout = stdout.lock();
@@ -772,7 +641,7 @@ pub fn exec(
}
});
let (stderr_send, stderr_recv) = oneshot::channel::<Box<dyn std::io::Read + Send>>();
let _stderr_thread = if !stderr_tty {
let stderr_thread = if !stderr_tty {
Some(std::thread::spawn(move || {
if let Ok(mut cstderr) = stderr_recv.blocking_recv() {
std::io::copy(&mut cstderr, &mut stderr.lock()).unwrap();
@@ -831,18 +700,6 @@ pub fn exec(
};
pty.resize(size).with_kind(ErrorKind::Filesystem)?;
}
// Dup the PTY master (and separate stderr if piped) for drain handoff
let pty_drain = nix::unistd::dup(&pty)
.with_ctx(|_| (ErrorKind::Filesystem, "dup pty for drain"))?;
let mut drain_fds = vec![pty_drain];
if let Some(stderr) = child.stderr.take() {
let stderr_drain = nix::unistd::dup(&stderr)
.with_ctx(|_| (ErrorKind::Filesystem, "dup stderr for drain"))?;
drain_fds.push(stderr_drain);
stderr_send.send(Box::new(stderr)).unwrap_or_default();
}
let shared = ArcPty(Arc::new(pty));
stdin_send
.send(Box::new(shared.clone()))
@@ -850,23 +707,14 @@ pub fn exec(
stdout_send
.send(Box::new(shared.clone()))
.unwrap_or_default();
if let Some(stderr) = child.stderr.take() {
stderr_send.send(Box::new(stderr)).unwrap_or_default();
}
let exit = child
.wait()
.with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?;
// Flush any remaining pipe buffer data to the caller, then hand off
// the dup'd FDs to launch for grandchild log capture.
// We cannot join the I/O threads because grandchildren may hold the
// pipe write ends open indefinitely.
drain_buffer(&drain_fds[0], &mut std::io::stdout().lock());
if drain_fds.len() > 1 {
drain_buffer(&drain_fds[1], &mut std::io::stderr().lock());
}
let raw_fds: Vec<RawFd> = drain_fds.iter().map(|fd| fd.as_raw_fd()).collect();
send_drain_fds(&chroot, &raw_fds);
drop(drain_fds);
stdout_thread.join().unwrap();
stderr_thread.map(|t| t.join().unwrap());
if let Some(code) = exit.code() {
drop(raw);
std::process::exit(code);
@@ -895,48 +743,14 @@ pub fn exec(
}
cmd.arg(&chroot);
cmd.args(&command);
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = cmd
.spawn()
.map_err(color_eyre::eyre::Report::msg)
.with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?;
send_pid.send(child.id() as i32).unwrap_or_default();
// Take pipe handles and dup them for later handoff to the drain listener.
// We dup before sending the originals to I/O threads.
let child_stdout = child.stdout.take().unwrap();
let child_stderr = child.stderr.take().unwrap();
let stdout_drain = nix::unistd::dup(&child_stdout)
.with_ctx(|_| (ErrorKind::Filesystem, "dup stdout for drain"))?;
let stderr_drain = nix::unistd::dup(&child_stderr)
.with_ctx(|_| (ErrorKind::Filesystem, "dup stderr for drain"))?;
stdin_send
.send(Box::new(child.stdin.take().unwrap()))
.unwrap_or_default();
stdout_send
.send(Box::new(child_stdout))
.unwrap_or_default();
stderr_send
.send(Box::new(child_stderr))
.unwrap_or_default();
let exit = child
.wait()
.with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?;
// Flush any remaining pipe buffer data to the caller, then hand off
// the dup'd FDs to launch for grandchild log capture.
// We cannot join the I/O threads because grandchildren may hold the
// pipe write ends open indefinitely.
drain_buffer(&stdout_drain, &mut std::io::stdout().lock());
drain_buffer(&stderr_drain, &mut std::io::stderr().lock());
send_drain_fds(&chroot, &[stdout_drain.as_raw_fd(), stderr_drain.as_raw_fd()]);
drop(stdout_drain);
drop(stderr_drain);
if let Some(code) = exit.code() {
std::process::exit(code);
} else if exit.success() || exit.signal() == Some(15) {
@@ -953,165 +767,3 @@ pub fn exec(
pub fn exec_command(_: ContainerCliContext, params: ExecParams) -> Result<(), Error> {
params.exec()
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use std::os::fd::AsFd;
use std::os::unix::net::UnixListener;
use std::sync::atomic::{AtomicU32, Ordering};
static TEST_COUNTER: AtomicU32 = AtomicU32::new(0);
fn test_dir() -> PathBuf {
let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
let dir = std::env::temp_dir().join(format!(
"startos-test-drain-{}-{}",
std::process::id(),
n,
));
std::fs::create_dir_all(&dir).unwrap();
dir
}
/// Create an anonymous pipe, returning (read_end, write_end) as OwnedFds.
fn pipe() -> (OwnedFd, OwnedFd) {
let (r, w) = nix::unistd::pipe().unwrap();
(r, w)
}
#[test]
fn test_send_recv_drain_fds() {
let dir = test_dir();
let sock_path = dir.join(LOG_DRAIN_SOCK);
let listener = UnixListener::bind(&sock_path).unwrap();
let (pipe_r, pipe_w) = pipe();
let raw_fd = pipe_r.as_raw_fd();
// Send the read end of the pipe
let send_thread = std::thread::spawn(move || {
let stream = std::os::unix::net::UnixStream::connect(&sock_path).unwrap();
let cmsg = [ControlMessage::ScmRights(&[raw_fd])];
let iov = [std::io::IoSlice::new(&[0u8])];
sendmsg::<()>(stream.as_raw_fd(), &iov, &cmsg, MsgFlags::empty(), None).unwrap();
drop(pipe_r); // close our copy after sending
});
// Receive on the listener side
let (stream, _) = listener.accept().unwrap();
let fds = recv_drain_fds(&stream).unwrap();
send_thread.join().unwrap();
assert_eq!(fds.len(), 1, "should receive exactly one FD");
// Write through the pipe and read from the received FD
let mut writer = File::from(pipe_w);
writer.write_all(b"hello from pipe").unwrap();
drop(writer);
let mut reader = File::from(fds.into_iter().next().unwrap());
let mut buf = String::new();
std::io::Read::read_to_string(&mut reader, &mut buf).unwrap();
assert_eq!(buf, "hello from pipe");
}
#[test]
fn test_drain_buffer_reads_available_data() {
let (pipe_r, pipe_w) = pipe();
let mut writer = File::from(pipe_w);
writer.write_all(b"buffered data").unwrap();
drop(writer);
let mut output = Vec::new();
drain_buffer(&pipe_r, &mut output);
assert_eq!(output, b"buffered data");
}
#[test]
fn test_drain_buffer_returns_immediately_on_empty_pipe() {
let (pipe_r, _pipe_w) = pipe();
// Pipe is open but empty — should return immediately (EAGAIN)
let mut output = Vec::new();
drain_buffer(&pipe_r, &mut output);
assert!(output.is_empty(), "empty pipe should yield no data");
}
#[test]
fn test_drain_buffer_restores_blocking_mode() {
let (pipe_r, _pipe_w) = pipe();
let flags_before =
nix::fcntl::fcntl(pipe_r.as_fd(), nix::fcntl::FcntlArg::F_GETFL).unwrap();
drain_buffer(&pipe_r, &mut Vec::new());
let flags_after =
nix::fcntl::fcntl(pipe_r.as_fd(), nix::fcntl::FcntlArg::F_GETFL).unwrap();
assert_eq!(flags_before, flags_after, "flags should be restored");
}
#[test]
fn test_dup_preserves_pipe_after_original_closed() {
// Simulates the real flow: dup the pipe, close the original,
// verify data is still readable from the dup'd FD.
let (pipe_r, pipe_w) = pipe();
let drain_fd = nix::unistd::dup(pipe_r.as_fd()).unwrap();
let mut writer = File::from(pipe_w);
writer.write_all(b"child output").unwrap();
drop(writer);
// Close original — pipe stays alive through drain_fd
drop(pipe_r);
let mut reader = File::from(drain_fd);
let mut buf = String::new();
std::io::Read::read_to_string(&mut reader, &mut buf).unwrap();
assert_eq!(buf, "child output");
}
#[test]
fn test_start_drain_listener_end_to_end() {
// Full integration: start the listener, send a pipe FD, write data,
// verify the listener drains it.
let chroot = test_dir();
start_drain_listener(&chroot);
// Give the listener thread a moment to bind
std::thread::sleep(std::time::Duration::from_millis(50));
let (pipe_r, pipe_w) = pipe();
// Send the pipe read end to the drain listener
send_drain_fds(&chroot, &[pipe_r.as_raw_fd()]);
drop(pipe_r); // close our copy; listener has its own
// Write data — the listener should drain it (to stderr in production,
// but we just verify the pipe doesn't block/hang)
let mut writer = File::from(pipe_w);
for _ in 0..100 {
writer.write_all(b"log line\n").unwrap();
}
drop(writer);
// Give the drain thread time to process
std::thread::sleep(std::time::Duration::from_millis(100));
// If we get here without hanging, the test passes.
}
#[test]
fn test_send_drain_fds_no_socket_does_not_panic() {
// When no drain socket exists, send_drain_fds should silently return
let dir = test_dir();
let (pipe_r, _pipe_w) = pipe();
send_drain_fds(&dir, &[pipe_r.as_raw_fd()]);
// No panic, no error — best-effort
}
}