diff --git a/container-runtime/src/Adapters/Systems/SystemForEmbassy/DockerProcedureContainer.ts b/container-runtime/src/Adapters/Systems/SystemForEmbassy/DockerProcedureContainer.ts index b472862db..e151aa2a9 100644 --- a/container-runtime/src/Adapters/Systems/SystemForEmbassy/DockerProcedureContainer.ts +++ b/container-runtime/src/Adapters/Systems/SystemForEmbassy/DockerProcedureContainer.ts @@ -95,17 +95,15 @@ export class DockerProcedureContainer { key, ) } else if (volumeMount.type === "pointer") { - await effects - .mount({ - location: path, - target: { - packageId: volumeMount["package-id"], - subpath: volumeMount.path, - readonly: volumeMount.readonly, - volumeId: volumeMount["volume-id"], - }, - }) - .catch(console.warn) + await effects.mount({ + location: path, + target: { + packageId: volumeMount["package-id"], + subpath: volumeMount.path, + readonly: volumeMount.readonly, + volumeId: volumeMount["volume-id"], + }, + }) } else if (volumeMount.type === "backup") { await subcontainer.mount(Mounts.of().addBackups(null, mounts[mount])) } diff --git a/core/Cargo.lock b/core/Cargo.lock index 02b74c15b..d894e5703 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -4034,6 +4034,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" +[[package]] +name = "numtoa" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6aa2c4e539b869820a2b82e1aef6ff40aa85e65decdd5185e83fb4b1249cd00f" + [[package]] name = "object" version = "0.32.2" @@ -4630,6 +4636,15 @@ version = "2.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33cb294fe86a74cbcf50d4445b37da762029549ebeea341421c7c70370f86cac" +[[package]] +name = "pty-process" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8277b026e63da5d2cc435f842b52bedb1d050dfd7d633bba009c3c8e1883a21e" +dependencies = [ + "rustix 0.38.44", +] + [[package]] name = "publicsuffix" version = "2.3.0" @@ -4856,6 +4871,12 @@ dependencies = [ "bitflags 2.9.0", ] +[[package]] +name = "redox_termios" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20145670ba436b55d91fc92d25e71160fbfbdd57831631c8d7d36377a476f1cb" + [[package]] name = "redox_users" version = "0.4.6" @@ -5124,6 +5145,7 @@ checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ "bitflags 2.9.0", "errno 0.3.11", + "itoa", "libc", "linux-raw-sys 0.4.15", "windows-sys 0.59.0", @@ -6086,6 +6108,7 @@ dependencies = [ "procfs", "proptest", "proptest-derive", + "pty-process", "qrcode", "rand 0.9.0", "regex", @@ -6113,6 +6136,7 @@ dependencies = [ "sscanf", "ssh-key", "tar", + "termion", "textwrap", "thiserror 1.0.69", "tokio", @@ -6132,7 +6156,6 @@ dependencies = [ "tracing-subscriber", "trust-dns-server", "ts-rs", - "tty-spawn", "typed-builder", "unix-named-pipe", "url", @@ -6308,6 +6331,18 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "termion" +version = "4.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3669a69de26799d6321a5aa713f55f7e2cd37bd47be044b50f2acafc42c122bb" +dependencies = [ + "libc", + "libredox", + "numtoa", + "redox_termios", +] + [[package]] name = "textwrap" version = "0.16.2" @@ -6942,17 +6977,6 @@ dependencies = [ "termcolor", ] -[[package]] -name = "tty-spawn" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb91489cf2611235ae8d755d66ab028437980ee573e2230c05af41b136236ad1" -dependencies = [ - "anyhow", - "nix 0.29.0", - "signal-hook", -] - [[package]] name = "tungstenite" version = "0.23.0" diff --git a/core/startos/Cargo.toml b/core/startos/Cargo.toml index bead9781f..7af8d7768 100644 --- a/core/startos/Cargo.toml +++ b/core/startos/Cargo.toml @@ -39,7 +39,7 @@ path = "src/main.rs" [features] cli = [] -container-runtime = ["procfs", "tty-spawn"] +container-runtime = ["procfs", "pty-process"] daemon = ["mail-send"] registry = [] default = ["cli", "daemon", "registry", "container-runtime"] @@ -166,6 +166,7 @@ prettytable-rs = "0.10.0" procfs = { version = "0.16.0", optional = true } proptest = "1.3.1" proptest-derive = "0.5.0" +pty-process = { version = "0.5.1", optional = true } qrcode = "0.14.1" rand = "0.9.0" regex = "1.10.2" @@ -197,6 +198,7 @@ sqlx = { version = "0.7.2", features = [ sscanf = "0.4.1" ssh-key = { version = "0.6.2", features = ["ed25519"] } tar = "0.4.40" +termion = "4.0.5" thiserror = "1.0.49" textwrap = "0.16.1" tokio = { version = "1.38.1", features = ["full"] } @@ -217,7 +219,6 @@ tracing-journald = "0.3.0" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } trust-dns-server = "0.23.1" ts-rs = { git = "https://github.com/dr-bonez/ts-rs.git", branch = "feature/top-level-as" } # "8.1.0" -tty-spawn = { version = "0.4.0", optional = true } typed-builder = "0.18.0" unix-named-pipe = "0.2.0" url = { version = "2.4.1", features = ["serde"] } diff --git a/core/startos/src/service/effects/subcontainer/sync.rs b/core/startos/src/service/effects/subcontainer/sync.rs index f40c13543..19fee0705 100644 --- a/core/startos/src/service/effects/subcontainer/sync.rs +++ b/core/startos/src/service/effects/subcontainer/sync.rs @@ -1,19 +1,22 @@ use std::collections::BTreeMap; use std::ffi::{c_int, OsStr, OsString}; use std::fs::File; -use std::io::IsTerminal; +use std::io::{IsTerminal, Read}; use std::os::unix::process::CommandExt; use std::path::{Path, PathBuf}; use std::process::{Command as StdCommand, Stdio}; +use std::sync::Arc; use nix::sched::CloneFlags; use nix::unistd::Pid; use signal_hook::consts::signal::*; +use termion::raw::IntoRawMode; use tokio::sync::oneshot; -use tty_spawn::TtySpawn; use crate::service::effects::prelude::*; use crate::service::effects::ContainerCliContext; +use crate::util::io::TermSize; +use crate::CAP_1_KiB; const FWD_SIGNALS: &[c_int] = &[ SIGABRT, SIGALRM, SIGCONT, SIGHUP, SIGINT, SIGIO, SIGPIPE, SIGPROF, SIGQUIT, SIGTERM, SIGTRAP, @@ -98,6 +101,10 @@ fn open_file_read(path: impl AsRef) -> Result { pub struct ExecParams { #[arg(long)] force_tty: bool, + #[arg(long)] + force_stderr_tty: bool, + #[arg(long)] + pty_size: Option, #[arg(short, long)] env: Option, #[arg(short, long)] @@ -181,6 +188,8 @@ pub fn launch( _: ContainerCliContext, ExecParams { force_tty, + force_stderr_tty, + pty_size, env, workdir, user, @@ -188,34 +197,9 @@ pub fn launch( command, }: ExecParams, ) -> Result<(), Error> { - kill_init(Path::new("/proc"), &chroot)?; - if (std::io::stdin().is_terminal() - && std::io::stdout().is_terminal() - && std::io::stderr().is_terminal()) - || force_tty - { - let mut cmd = TtySpawn::new("/usr/bin/start-cli"); - cmd.arg("subcontainer").arg("launch-init"); - if let Some(env) = env { - cmd.arg("--env").arg(env); - } - if let Some(workdir) = workdir { - cmd.arg("--workdir").arg(workdir); - } - if let Some(user) = user { - cmd.arg("--user").arg(user); - } - cmd.arg(&chroot); - cmd.args(command.iter()); - nix::sched::unshare(CloneFlags::CLONE_NEWPID) - .with_ctx(|_| (ErrorKind::Filesystem, "unshare pid ns"))?; - nix::sched::unshare(CloneFlags::CLONE_NEWCGROUP) - .with_ctx(|_| (ErrorKind::Filesystem, "unshare cgroup ns"))?; - nix::sched::unshare(CloneFlags::CLONE_NEWIPC) - .with_ctx(|_| (ErrorKind::Filesystem, "unshare ipc ns"))?; - std::process::exit(cmd.spawn().with_kind(ErrorKind::Filesystem)?); - } + use std::io::Write; + kill_init(Path::new("/proc"), &chroot)?; let mut sig = signal_hook::iterator::Signals::new(FWD_SIGNALS)?; let (send_pid, recv_pid) = oneshot::channel(); std::thread::spawn(move || { @@ -229,75 +213,181 @@ pub fn launch( } } }); - let mut cmd = StdCommand::new("/usr/bin/start-cli"); - cmd.arg("subcontainer").arg("launch-init"); - if let Some(env) = env { - cmd.arg("--env").arg(env); - } - if let Some(workdir) = workdir { - cmd.arg("--workdir").arg(workdir); - } - if let Some(user) = user { - cmd.arg("--user").arg(user); - } - cmd.arg(&chroot); - cmd.args(&command); - cmd.stdin(Stdio::piped()); - cmd.stdout(Stdio::piped()); - cmd.stderr(Stdio::piped()); - let (stdin_send, stdin_recv) = oneshot::channel(); + + let mut stdin = std::io::stdin(); + let stdout = std::io::stdout(); + let stderr = std::io::stderr(); + let stderr_tty = force_stderr_tty || stderr.is_terminal(); + + let tty = force_tty || (stdin.is_terminal() && stdout.is_terminal()); + + let raw = if stdin.is_terminal() && stdout.is_terminal() { + Some(termion::get_tty()?.into_raw_mode()?) + } else { + None + }; + + let pty_size = pty_size.or_else(|| TermSize::get_current()); + + let (stdin_send, stdin_recv) = oneshot::channel::>(); std::thread::spawn(move || { - if let Ok(mut stdin) = stdin_recv.blocking_recv() { - std::io::copy(&mut std::io::stdin(), &mut stdin).unwrap(); + if let Ok(mut cstdin) = stdin_recv.blocking_recv() { + if tty { + let mut buf = [0_u8; CAP_1_KiB]; + while let Ok(n) = stdin.read(&mut buf) { + if n == 0 { + break; + } + cstdin.write_all(&buf[..n]).ok(); + cstdin.flush().ok(); + } + } else { + std::io::copy(&mut stdin, &mut cstdin).unwrap(); + } } }); - let (stdout_send, stdout_recv) = oneshot::channel(); - std::thread::spawn(move || { - if let Ok(mut stdout) = stdout_recv.blocking_recv() { - std::io::copy(&mut stdout, &mut std::io::stdout()).unwrap(); - } - }); - let (stderr_send, stderr_recv) = oneshot::channel(); - std::thread::spawn(move || { - if let Ok(mut stderr) = stderr_recv.blocking_recv() { - std::io::copy(&mut stderr, &mut std::io::stderr()).unwrap(); + let (stdout_send, stdout_recv) = oneshot::channel::>(); + let stdout_thread = std::thread::spawn(move || { + if let Ok(mut cstdout) = stdout_recv.blocking_recv() { + if tty { + let mut stdout = stdout.lock(); + let mut buf = [0_u8; CAP_1_KiB]; + while let Ok(n) = cstdout.read(&mut buf) { + if n == 0 { + break; + } + stdout.write_all(&buf[..n]).ok(); + stdout.flush().ok(); + } + } else { + std::io::copy(&mut cstdout, &mut stdout.lock()).unwrap(); + } } }); + let (stderr_send, stderr_recv) = oneshot::channel::>(); + let stderr_thread = if !stderr_tty { + Some(std::thread::spawn(move || { + if let Ok(mut cstderr) = stderr_recv.blocking_recv() { + std::io::copy(&mut cstderr, &mut stderr.lock()).unwrap(); + } + })) + } else { + None + }; nix::sched::unshare(CloneFlags::CLONE_NEWPID) .with_ctx(|_| (ErrorKind::Filesystem, "unshare pid ns"))?; nix::sched::unshare(CloneFlags::CLONE_NEWCGROUP) .with_ctx(|_| (ErrorKind::Filesystem, "unshare cgroup ns"))?; nix::sched::unshare(CloneFlags::CLONE_NEWIPC) .with_ctx(|_| (ErrorKind::Filesystem, "unshare ipc ns"))?; - let mut child = cmd - .spawn() - .map_err(color_eyre::eyre::Report::msg) - .with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?; - send_pid.send(child.id() as i32).unwrap_or_default(); - stdin_send - .send(child.stdin.take().unwrap()) - .unwrap_or_default(); - stdout_send - .send(child.stdout.take().unwrap()) - .unwrap_or_default(); - stderr_send - .send(child.stderr.take().unwrap()) - .unwrap_or_default(); - // TODO: subreaping, signal handling - let exit = child - .wait() - .with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?; - if let Some(code) = exit.code() { - nix::mount::umount(&chroot.join("proc")) - .with_ctx(|_| (ErrorKind::Filesystem, "umount procfs"))?; - std::process::exit(code); - } else if exit.success() { - Ok(()) + + if tty { + use pty_process::blocking as pty_process; + let (pty, pts) = pty_process::open().with_kind(ErrorKind::Filesystem)?; + let mut cmd = pty_process::Command::new("/usr/bin/start-cli"); + cmd = cmd.arg("subcontainer").arg("launch-init"); + if let Some(env) = env { + cmd = cmd.arg("--env").arg(env); + } + if let Some(workdir) = workdir { + cmd = cmd.arg("--workdir").arg(workdir); + } + if let Some(user) = user { + cmd = cmd.arg("--user").arg(user); + } + cmd = cmd.arg(&chroot).args(&command); + if !stderr_tty { + cmd = cmd.stderr(Stdio::piped()); + } + let mut child = cmd + .spawn(pts) + .map_err(color_eyre::eyre::Report::msg) + .with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?; + send_pid.send(child.id() as i32).unwrap_or_default(); + if let Some(pty_size) = pty_size { + let size = if let Some((x, y)) = pty_size.pixels { + ::pty_process::Size::new_with_pixel(pty_size.size.0, pty_size.size.1, x, y) + } else { + ::pty_process::Size::new(pty_size.size.0, pty_size.size.1) + }; + pty.resize(size).with_kind(ErrorKind::Filesystem)?; + } + let shared = ArcPty(Arc::new(pty)); + stdin_send + .send(Box::new(shared.clone())) + .unwrap_or_default(); + stdout_send + .send(Box::new(shared.clone())) + .unwrap_or_default(); + if let Some(stderr) = child.stderr.take() { + stderr_send.send(Box::new(stderr)).unwrap_or_default(); + } + let exit = child + .wait() + .with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?; + stdout_thread.join().unwrap(); + stderr_thread.map(|t| t.join().unwrap()); + if let Some(code) = exit.code() { + drop(raw); + std::process::exit(code); + } else if exit.success() { + Ok(()) + } else { + Err(Error::new( + color_eyre::eyre::Report::msg(exit), + ErrorKind::Unknown, + )) + } } else { - Err(Error::new( - color_eyre::eyre::Report::msg(exit), - ErrorKind::Unknown, - )) + let mut cmd = StdCommand::new("/usr/bin/start-cli"); + cmd.arg("subcontainer").arg("launch-init"); + if let Some(env) = env { + cmd.arg("--env").arg(env); + } + if let Some(workdir) = workdir { + cmd.arg("--workdir").arg(workdir); + } + if let Some(user) = user { + cmd.arg("--user").arg(user); + } + cmd.arg(&chroot); + cmd.args(&command); + cmd.stdin(Stdio::piped()); + cmd.stdout(Stdio::piped()); + cmd.stderr(Stdio::piped()); + let mut child = cmd + .spawn() + .map_err(color_eyre::eyre::Report::msg) + .with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?; + send_pid.send(child.id() as i32).unwrap_or_default(); + stdin_send + .send(Box::new(child.stdin.take().unwrap())) + .unwrap_or_default(); + stdout_send + .send(Box::new(child.stdout.take().unwrap())) + .unwrap_or_default(); + stderr_send + .send(Box::new(child.stderr.take().unwrap())) + .unwrap_or_default(); + + // TODO: subreaping, signal handling + let exit = child + .wait() + .with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?; + stdout_thread.join().unwrap(); + stderr_thread.map(|t| t.join().unwrap()); + if let Some(code) = exit.code() { + nix::mount::umount(&chroot.join("proc")) + .with_ctx(|_| (ErrorKind::Filesystem, "umount procfs"))?; + std::process::exit(code); + } else if exit.success() { + Ok(()) + } else { + Err(Error::new( + color_eyre::eyre::Report::msg(exit), + ErrorKind::Unknown, + )) + } } } @@ -320,10 +410,28 @@ pub fn launch_init(_: ContainerCliContext, params: ExecParams) -> Result<(), Err } } +#[derive(Clone)] +struct ArcPty(Arc); +impl std::io::Write for ArcPty { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + (&*self.0).write(buf) + } + fn flush(&mut self) -> std::io::Result<()> { + (&*self.0).flush() + } +} +impl std::io::Read for ArcPty { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + (&*self.0).read(buf) + } +} + pub fn exec( _: ContainerCliContext, ExecParams { force_tty, + force_stderr_tty, + pty_size, env, workdir, user, @@ -331,41 +439,8 @@ pub fn exec( command, }: ExecParams, ) -> Result<(), Error> { - if (std::io::stdin().is_terminal() - && std::io::stdout().is_terminal() - && std::io::stderr().is_terminal()) - || force_tty - { - let mut cmd = TtySpawn::new("/usr/bin/start-cli"); - cmd.arg("subcontainer").arg("exec-command"); - if let Some(env) = env { - cmd.arg("--env").arg(env); - } - if let Some(workdir) = workdir { - cmd.arg("--workdir").arg(workdir); - } - if let Some(user) = user { - cmd.arg("--user").arg(user); - } - cmd.arg(&chroot); - cmd.args(command.iter()); - nix::sched::setns( - open_file_read(chroot.join("proc/1/ns/pid"))?, - CloneFlags::CLONE_NEWPID, - ) - .with_ctx(|_| (ErrorKind::Filesystem, "set pid ns"))?; - nix::sched::setns( - open_file_read(chroot.join("proc/1/ns/cgroup"))?, - CloneFlags::CLONE_NEWCGROUP, - ) - .with_ctx(|_| (ErrorKind::Filesystem, "set cgroup ns"))?; - nix::sched::setns( - open_file_read(chroot.join("proc/1/ns/ipc"))?, - CloneFlags::CLONE_NEWIPC, - ) - .with_ctx(|_| (ErrorKind::Filesystem, "set ipc ns"))?; - std::process::exit(cmd.spawn().with_kind(ErrorKind::Filesystem)?); - } + use std::io::Write; + let mut sig = signal_hook::iterator::Signals::new(FWD_SIGNALS)?; let (send_pid, recv_pid) = oneshot::channel(); std::thread::spawn(move || { @@ -379,40 +454,67 @@ pub fn exec( } } }); - let mut cmd = StdCommand::new("/usr/bin/start-cli"); - cmd.arg("subcontainer").arg("exec-command"); - if let Some(env) = env { - cmd.arg("--env").arg(env); - } - if let Some(workdir) = workdir { - cmd.arg("--workdir").arg(workdir); - } - if let Some(user) = user { - cmd.arg("--user").arg(user); - } - cmd.arg(&chroot); - cmd.args(&command); - cmd.stdin(Stdio::piped()); - cmd.stdout(Stdio::piped()); - cmd.stderr(Stdio::piped()); - let (stdin_send, stdin_recv) = oneshot::channel(); + + let mut stdin = std::io::stdin(); + let stdout = std::io::stdout(); + let stderr = std::io::stderr(); + let stderr_tty = force_stderr_tty || stderr.is_terminal(); + + let tty = force_tty || (stdin.is_terminal() && stdout.is_terminal()); + + let raw = if stdin.is_terminal() && stdout.is_terminal() { + Some(termion::get_tty()?.into_raw_mode()?) + } else { + None + }; + + let pty_size = pty_size.or_else(|| TermSize::get_current()); + + let (stdin_send, stdin_recv) = oneshot::channel::>(); std::thread::spawn(move || { - if let Ok(mut stdin) = stdin_recv.blocking_recv() { - std::io::copy(&mut std::io::stdin(), &mut stdin).unwrap(); + if let Ok(mut cstdin) = stdin_recv.blocking_recv() { + if tty { + let mut buf = [0_u8; CAP_1_KiB]; + while let Ok(n) = stdin.read(&mut buf) { + if n == 0 { + break; + } + cstdin.write_all(&buf[..n]).ok(); + cstdin.flush().ok(); + } + } else { + std::io::copy(&mut stdin, &mut cstdin).unwrap(); + } } }); - let (stdout_send, stdout_recv) = oneshot::channel(); - std::thread::spawn(move || { - if let Ok(mut stdout) = stdout_recv.blocking_recv() { - std::io::copy(&mut stdout, &mut std::io::stdout()).unwrap(); - } - }); - let (stderr_send, stderr_recv) = oneshot::channel(); - std::thread::spawn(move || { - if let Ok(mut stderr) = stderr_recv.blocking_recv() { - std::io::copy(&mut stderr, &mut std::io::stderr()).unwrap(); + let (stdout_send, stdout_recv) = oneshot::channel::>(); + let stdout_thread = std::thread::spawn(move || { + if let Ok(mut cstdout) = stdout_recv.blocking_recv() { + if tty { + let mut stdout = stdout.lock(); + let mut buf = [0_u8; CAP_1_KiB]; + while let Ok(n) = cstdout.read(&mut buf) { + if n == 0 { + break; + } + stdout.write_all(&buf[..n]).ok(); + stdout.flush().ok(); + } + } else { + std::io::copy(&mut cstdout, &mut stdout.lock()).unwrap(); + } } }); + let (stderr_send, stderr_recv) = oneshot::channel::>(); + let stderr_thread = if !stderr_tty { + Some(std::thread::spawn(move || { + if let Ok(mut cstderr) = stderr_recv.blocking_recv() { + std::io::copy(&mut cstderr, &mut stderr.lock()).unwrap(); + } + })) + } else { + None + }; nix::sched::setns( open_file_read(chroot.join("proc/1/ns/pid"))?, CloneFlags::CLONE_NEWPID, @@ -428,32 +530,110 @@ pub fn exec( CloneFlags::CLONE_NEWIPC, ) .with_ctx(|_| (ErrorKind::Filesystem, "set ipc ns"))?; - let mut child = cmd - .spawn() - .map_err(color_eyre::eyre::Report::msg) - .with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?; - send_pid.send(child.id() as i32).unwrap_or_default(); - stdin_send - .send(child.stdin.take().unwrap()) - .unwrap_or_default(); - stdout_send - .send(child.stdout.take().unwrap()) - .unwrap_or_default(); - stderr_send - .send(child.stderr.take().unwrap()) - .unwrap_or_default(); - let exit = child - .wait() - .with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?; - if let Some(code) = exit.code() { - std::process::exit(code); - } else if exit.success() { - Ok(()) + + if tty { + use pty_process::blocking as pty_process; + let (pty, pts) = pty_process::open().with_kind(ErrorKind::Filesystem)?; + let mut cmd = pty_process::Command::new("/usr/bin/start-cli"); + cmd = cmd.arg("subcontainer").arg("exec-command"); + if let Some(env) = env { + cmd = cmd.arg("--env").arg(env); + } + if let Some(workdir) = workdir { + cmd = cmd.arg("--workdir").arg(workdir); + } + if let Some(user) = user { + cmd = cmd.arg("--user").arg(user); + } + cmd = cmd.arg(&chroot).args(&command); + if !stderr_tty { + cmd = cmd.stderr(Stdio::piped()); + } + let mut child = cmd + .spawn(pts) + .map_err(color_eyre::eyre::Report::msg) + .with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?; + send_pid.send(child.id() as i32).unwrap_or_default(); + if let Some(pty_size) = pty_size { + let size = if let Some((x, y)) = pty_size.pixels { + ::pty_process::Size::new_with_pixel(pty_size.size.0, pty_size.size.1, x, y) + } else { + ::pty_process::Size::new(pty_size.size.0, pty_size.size.1) + }; + pty.resize(size).with_kind(ErrorKind::Filesystem)?; + } + let shared = ArcPty(Arc::new(pty)); + stdin_send + .send(Box::new(shared.clone())) + .unwrap_or_default(); + stdout_send + .send(Box::new(shared.clone())) + .unwrap_or_default(); + if let Some(stderr) = child.stderr.take() { + stderr_send.send(Box::new(stderr)).unwrap_or_default(); + } + let exit = child + .wait() + .with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?; + stdout_thread.join().unwrap(); + stderr_thread.map(|t| t.join().unwrap()); + if let Some(code) = exit.code() { + drop(raw); + std::process::exit(code); + } else if exit.success() { + Ok(()) + } else { + Err(Error::new( + color_eyre::eyre::Report::msg(exit), + ErrorKind::Unknown, + )) + } } else { - Err(Error::new( - color_eyre::eyre::Report::msg(exit), - ErrorKind::Unknown, - )) + let mut cmd = StdCommand::new("/usr/bin/start-cli"); + cmd.arg("subcontainer").arg("exec-command"); + if let Some(env) = env { + cmd.arg("--env").arg(env); + } + if let Some(workdir) = workdir { + cmd.arg("--workdir").arg(workdir); + } + if let Some(user) = user { + cmd.arg("--user").arg(user); + } + cmd.arg(&chroot); + cmd.args(&command); + cmd.stdin(Stdio::piped()); + cmd.stdout(Stdio::piped()); + cmd.stderr(Stdio::piped()); + let mut child = cmd + .spawn() + .map_err(color_eyre::eyre::Report::msg) + .with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?; + send_pid.send(child.id() as i32).unwrap_or_default(); + stdin_send + .send(Box::new(child.stdin.take().unwrap())) + .unwrap_or_default(); + stdout_send + .send(Box::new(child.stdout.take().unwrap())) + .unwrap_or_default(); + stderr_send + .send(Box::new(child.stderr.take().unwrap())) + .unwrap_or_default(); + let exit = child + .wait() + .with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?; + stdout_thread.join().unwrap(); + stderr_thread.map(|t| t.join().unwrap()); + if let Some(code) = exit.code() { + std::process::exit(code); + } else if exit.success() { + Ok(()) + } else { + Err(Error::new( + color_eyre::eyre::Report::msg(exit), + ErrorKind::Unknown, + )) + } } } diff --git a/core/startos/src/service/mod.rs b/core/startos/src/service/mod.rs index 3e0d5b856..a9e71c913 100644 --- a/core/startos/src/service/mod.rs +++ b/core/startos/src/service/mod.rs @@ -13,7 +13,8 @@ use chrono::{DateTime, Utc}; use clap::Parser; use futures::future::BoxFuture; use futures::stream::FusedStream; -use futures::{SinkExt, StreamExt, TryStreamExt}; +use futures::{FutureExt, SinkExt, StreamExt, TryStreamExt}; +use helpers::NonDetachingJoinHandle; use imbl_value::{json, InternedString}; use itertools::Itertools; use models::{ActionId, HostId, ImageId, PackageId, ProcedureName}; @@ -23,6 +24,7 @@ use rpc_toolkit::{from_fn_async, CallRemoteHandler, Empty, HandlerArgs, HandlerF use serde::{Deserialize, Serialize}; use service_actor::ServiceActor; use start_stop::StartStop; +use termion::raw::IntoRawMode; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::process::Command; use tokio::sync::Notify; @@ -43,7 +45,7 @@ use crate::s9pk::S9pk; use crate::service::action::update_requested_actions; use crate::service::service_map::InstallProgressHandles; use crate::util::actor::concurrent::ConcurrentActor; -use crate::util::io::{create_file, AsyncReadStream}; +use crate::util::io::{create_file, AsyncReadStream, TermSize}; use crate::util::net::WebSocketExt; use crate::util::serde::{NoOutput, Pem}; use crate::util::Never; @@ -739,6 +741,7 @@ pub struct AttachParams { #[ts(type = "string[]")] pub command: Vec, pub tty: bool, + pub stderr_tty: bool, #[ts(skip)] #[serde(rename = "__auth_session")] session: Option, @@ -755,6 +758,7 @@ pub async fn attach( id, command, tty, + stderr_tty, session, subcontainer, image_id, @@ -864,6 +868,7 @@ pub async fn attach( subcontainer_id: Guid, command: Vec, tty: bool, + stderr_tty: bool, image_id: ImageId, workdir: Option, root_command: &RootCommand, @@ -896,6 +901,10 @@ pub async fn attach( cmd.arg("--force-tty"); } + if stderr_tty { + cmd.arg("--force-stderr-tty"); + } + cmd.arg(&root_path).arg("--"); if command.is_empty() { @@ -912,7 +921,7 @@ pub async fn attach( let pid = nix::unistd::Pid::from_raw(child.id().or_not_found("child pid")? as i32); - let mut stdin = child.stdin.take().or_not_found("child stdin")?; + let mut stdin = Some(child.stdin.take().or_not_found("child stdin")?); let mut current_in = "stdin".to_owned(); let mut current_out = "stdout"; @@ -943,6 +952,10 @@ pub async fn attach( ws.send(Message::Binary(out)) .await .with_kind(ErrorKind::Network)?; + } else { + ws.send(Message::Text("close-stdout".into())) + .await + .with_kind(ErrorKind::Network)?; } } err = stderr.try_next() => { @@ -956,6 +969,10 @@ pub async fn attach( ws.send(Message::Binary(err)) .await .with_kind(ErrorKind::Network)?; + } else { + ws.send(Message::Text("close-stderr".into())) + .await + .with_kind(ErrorKind::Network)?; } } msg = ws.try_next() => { @@ -967,7 +984,12 @@ pub async fn attach( Message::Binary(data) => { match &*current_in { "stdin" => { - stdin.write_all(&data).await?; + if let Some(stdin) = &mut stdin { + stdin.write_all(&data).await?; + } + } + "close-stdin" => { + stdin.take(); } "signal" => { if data.len() != 4 { @@ -1022,6 +1044,7 @@ pub async fn attach( subcontainer_id, command, tty, + stderr_tty, image_id, workdir, &root_command, @@ -1100,6 +1123,7 @@ pub struct CliAttachParams { #[arg(long, short)] image_id: Option, } +#[instrument[skip_all]] pub async fn cli_attach( HandlerArgs { context, @@ -1109,8 +1133,39 @@ pub async fn cli_attach( .. }: HandlerArgs, ) -> Result<(), Error> { + use std::io::Write; + use tokio_tungstenite::tungstenite::Message; + let stdin = std::io::stdin(); + let stdout = std::io::stdout(); + let stderr = std::io::stderr(); + + let tty = params.force_tty || (stdin.is_terminal() && stdout.is_terminal()); + + let raw = if stdin.is_terminal() && stdout.is_terminal() { + Some(termion::get_tty()?.into_raw_mode()?) + } else { + None + }; + + let (kill, thread_kill) = tokio::sync::oneshot::channel(); + let (thread_send, recv) = tokio::sync::mpsc::channel(4 * CAP_1_KiB); + let stdin_thread: NonDetachingJoinHandle<()> = tokio::task::spawn_blocking(move || { + use std::io::Read; + let mut stdin = stdin.lock().bytes(); + + while thread_kill.is_empty() { + if let Some(b) = stdin.next() { + thread_send.blocking_send(b).unwrap(); + } else { + break; + } + } + }) + .into(); + let mut stdin = Some(recv); + let guid: Guid = from_value( context .call_remote::( @@ -1118,10 +1173,9 @@ pub async fn cli_attach( json!({ "id": params.id, "command": params.command, - "tty": (std::io::stdin().is_terminal() - && std::io::stdout().is_terminal() - && std::io::stderr().is_terminal()) - || params.force_tty, + "tty": tty, + "stderrTty": stderr.is_terminal(), + "ptySize": if tty { TermSize::get_current() } else { None }, "subcontainer": params.subcontainer, "imageId": params.image_id, "name": params.name, @@ -1136,9 +1190,8 @@ pub async fn cli_attach( ws.send(Message::Text(current_in.into())) .await .with_kind(ErrorKind::Network)?; - let mut stdin = AsyncReadStream::new(tokio::io::stdin(), 4 * CAP_1_KiB).fuse(); - let mut stdout = tokio::io::stdout(); - let mut stderr = tokio::io::stderr(); + let mut stdout = Some(stdout); + let mut stderr = Some(stderr); loop { futures::select_biased! { // signal = tokio:: => { @@ -1153,8 +1206,15 @@ pub async fn cli_attach( // i32::to_be_bytes(exit.into_raw()).to_vec() // )).await.with_kind(ErrorKind::Network)?; // } - input = stdin.try_next() => { - if let Some(input) = input? { + input = stdin.as_mut().map_or( + futures::future::Either::Left(futures::future::pending()), + |s| futures::future::Either::Right(s.recv()) + ).fuse() => { + if let (Some(input), Some(stdin)) = (input.transpose()?, &mut stdin) { + let mut input = vec![input]; + while let Ok(b) = stdin.try_recv() { + input.push(b?); + } if current_in != "stdin" { ws.send(Message::Text("stdin".into())) .await @@ -1164,6 +1224,11 @@ pub async fn cli_attach( ws.send(Message::Binary(input)) .await .with_kind(ErrorKind::Network)?; + } else { + ws.send(Message::Text("close-stdin".into())) + .await + .with_kind(ErrorKind::Network)?; + stdin.take(); } } msg = ws.try_next() => { @@ -1175,12 +1240,22 @@ pub async fn cli_attach( Message::Binary(data) => { match &*current_out { "stdout" => { - stdout.write_all(&data).await?; - stdout.flush().await?; + if let Some(stdout) = &mut stdout { + stdout.write_all(&data)?; + stdout.flush()?; + } } "stderr" => { - stderr.write_all(&data).await?; - stderr.flush().await?; + if let Some(stderr) = &mut stderr { + stderr.write_all(&data)?; + stderr.flush()?; + } + } + "close-stdout" => { + stdout.take(); + } + "close-stderr" => { + stderr.take(); } "exit" => { if data.len() != 4 { @@ -1192,6 +1267,7 @@ pub async fn cli_attach( let mut exit_buf = [0u8; 4]; exit_buf.clone_from_slice(&data); let code = i32::from_be_bytes(exit_buf); + drop(raw); std::process::exit(code); } _ => (), @@ -1208,6 +1284,8 @@ pub async fn cli_attach( _ => () } } else { + kill.send(()).ok(); + stdin_thread.wait_for_abort().await.log_err(); return Ok(()) } } diff --git a/core/startos/src/util/io.rs b/core/startos/src/util/io.rs index 0495e2677..afacde836 100644 --- a/core/startos/src/util/io.rs +++ b/core/startos/src/util/io.rs @@ -5,16 +5,20 @@ use std::mem::MaybeUninit; use std::os::unix::prelude::MetadataExt; use std::path::{Path, PathBuf}; use std::pin::Pin; +use std::str::FromStr; use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::task::{Poll, Waker}; use std::time::Duration; use bytes::{Buf, BytesMut}; +use clap::builder::ValueParserFactory; use futures::future::{BoxFuture, Fuse}; use futures::{AsyncSeek, FutureExt, Stream, TryStreamExt}; use helpers::NonDetachingJoinHandle; +use models::FromStrParser; use nix::unistd::{Gid, Uid}; +use serde::{Deserialize, Serialize}; use tokio::fs::{File, OpenOptions}; use tokio::io::{ duplex, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, DuplexStream, ReadBuf, WriteHalf, @@ -24,6 +28,7 @@ use tokio::sync::{Notify, OwnedMutexGuard}; use tokio::time::{Instant, Sleep}; use crate::prelude::*; +use crate::util::sync::SyncMutex; use crate::{CAP_1_KiB, CAP_1_MiB}; pub trait AsyncReadSeek: AsyncRead + AsyncSeek {} @@ -1395,3 +1400,73 @@ impl Stream for AsyncReadStream { } } } + +pub struct SharedIO(pub Arc>); +impl SharedIO { + pub fn new(t: T) -> Self { + Self(Arc::new(SyncMutex::new(t))) + } +} +impl Clone for SharedIO { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} +impl std::io::Write for SharedIO { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.mutate(|w| w.write(buf)) + } + fn flush(&mut self) -> std::io::Result<()> { + self.0.mutate(|w| w.flush()) + } +} +impl std::io::Read for SharedIO { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.0.mutate(|r| r.read(buf)) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TermSize { + pub size: (u16, u16), + pub pixels: Option<(u16, u16)>, +} +impl TermSize { + pub fn get_current() -> Option { + if let Some(size) = termion::terminal_size().ok() { + Some(Self { + size, + pixels: termion::terminal_size_pixels().ok(), + }) + } else { + None + } + } +} +impl FromStr for TermSize { + type Err = Error; + fn from_str(s: &str) -> Result { + (|| { + let mut split = s.split(":"); + let row: u16 = split.next()?.parse().ok()?; + let col: u16 = split.next()?.parse().ok()?; + let size = (row, col); + let pixels = if let Some(x) = split.next() { + let x: u16 = x.parse().ok()?; + let y: u16 = split.next()?.parse().ok()?; + Some((x, y)) + } else { + None + }; + + Some(Self { size, pixels }).filter(|_| split.next().is_none()) + })() + .ok_or_else(|| Error::new(eyre!("invalid pty size"), ErrorKind::ParseNumber)) + } +} +impl ValueParserFactory for TermSize { + type Parser = FromStrParser; + fn value_parser() -> Self::Parser { + FromStrParser::new() + } +}