fix shell support in package attach (#2929)

* fix package attach

* final fixes

* apply changes to launch

* Update core/startos/src/service/effects/subcontainer/sync.rs
This commit is contained in:
Aiden McClelland
2025-05-07 09:19:38 -06:00
committed by GitHub
parent 68955c29cb
commit f6b4dfffb6
6 changed files with 573 additions and 217 deletions

View File

@@ -1,19 +1,22 @@
use std::collections::BTreeMap;
use std::ffi::{c_int, OsStr, OsString};
use std::fs::File;
use std::io::IsTerminal;
use std::io::{IsTerminal, Read};
use std::os::unix::process::CommandExt;
use std::path::{Path, PathBuf};
use std::process::{Command as StdCommand, Stdio};
use std::sync::Arc;
use nix::sched::CloneFlags;
use nix::unistd::Pid;
use signal_hook::consts::signal::*;
use termion::raw::IntoRawMode;
use tokio::sync::oneshot;
use tty_spawn::TtySpawn;
use crate::service::effects::prelude::*;
use crate::service::effects::ContainerCliContext;
use crate::util::io::TermSize;
use crate::CAP_1_KiB;
const FWD_SIGNALS: &[c_int] = &[
SIGABRT, SIGALRM, SIGCONT, SIGHUP, SIGINT, SIGIO, SIGPIPE, SIGPROF, SIGQUIT, SIGTERM, SIGTRAP,
@@ -98,6 +101,10 @@ fn open_file_read(path: impl AsRef<Path>) -> Result<File, Error> {
pub struct ExecParams {
#[arg(long)]
force_tty: bool,
#[arg(long)]
force_stderr_tty: bool,
#[arg(long)]
pty_size: Option<TermSize>,
#[arg(short, long)]
env: Option<PathBuf>,
#[arg(short, long)]
@@ -181,6 +188,8 @@ pub fn launch(
_: ContainerCliContext,
ExecParams {
force_tty,
force_stderr_tty,
pty_size,
env,
workdir,
user,
@@ -188,34 +197,9 @@ pub fn launch(
command,
}: ExecParams,
) -> Result<(), Error> {
kill_init(Path::new("/proc"), &chroot)?;
if (std::io::stdin().is_terminal()
&& std::io::stdout().is_terminal()
&& std::io::stderr().is_terminal())
|| force_tty
{
let mut cmd = TtySpawn::new("/usr/bin/start-cli");
cmd.arg("subcontainer").arg("launch-init");
if let Some(env) = env {
cmd.arg("--env").arg(env);
}
if let Some(workdir) = workdir {
cmd.arg("--workdir").arg(workdir);
}
if let Some(user) = user {
cmd.arg("--user").arg(user);
}
cmd.arg(&chroot);
cmd.args(command.iter());
nix::sched::unshare(CloneFlags::CLONE_NEWPID)
.with_ctx(|_| (ErrorKind::Filesystem, "unshare pid ns"))?;
nix::sched::unshare(CloneFlags::CLONE_NEWCGROUP)
.with_ctx(|_| (ErrorKind::Filesystem, "unshare cgroup ns"))?;
nix::sched::unshare(CloneFlags::CLONE_NEWIPC)
.with_ctx(|_| (ErrorKind::Filesystem, "unshare ipc ns"))?;
std::process::exit(cmd.spawn().with_kind(ErrorKind::Filesystem)?);
}
use std::io::Write;
kill_init(Path::new("/proc"), &chroot)?;
let mut sig = signal_hook::iterator::Signals::new(FWD_SIGNALS)?;
let (send_pid, recv_pid) = oneshot::channel();
std::thread::spawn(move || {
@@ -229,75 +213,181 @@ pub fn launch(
}
}
});
let mut cmd = StdCommand::new("/usr/bin/start-cli");
cmd.arg("subcontainer").arg("launch-init");
if let Some(env) = env {
cmd.arg("--env").arg(env);
}
if let Some(workdir) = workdir {
cmd.arg("--workdir").arg(workdir);
}
if let Some(user) = user {
cmd.arg("--user").arg(user);
}
cmd.arg(&chroot);
cmd.args(&command);
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let (stdin_send, stdin_recv) = oneshot::channel();
let mut stdin = std::io::stdin();
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let stderr_tty = force_stderr_tty || stderr.is_terminal();
let tty = force_tty || (stdin.is_terminal() && stdout.is_terminal());
let raw = if stdin.is_terminal() && stdout.is_terminal() {
Some(termion::get_tty()?.into_raw_mode()?)
} else {
None
};
let pty_size = pty_size.or_else(|| TermSize::get_current());
let (stdin_send, stdin_recv) = oneshot::channel::<Box<dyn Write + Send>>();
std::thread::spawn(move || {
if let Ok(mut stdin) = stdin_recv.blocking_recv() {
std::io::copy(&mut std::io::stdin(), &mut stdin).unwrap();
if let Ok(mut cstdin) = stdin_recv.blocking_recv() {
if tty {
let mut buf = [0_u8; CAP_1_KiB];
while let Ok(n) = stdin.read(&mut buf) {
if n == 0 {
break;
}
cstdin.write_all(&buf[..n]).ok();
cstdin.flush().ok();
}
} else {
std::io::copy(&mut stdin, &mut cstdin).unwrap();
}
}
});
let (stdout_send, stdout_recv) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(mut stdout) = stdout_recv.blocking_recv() {
std::io::copy(&mut stdout, &mut std::io::stdout()).unwrap();
}
});
let (stderr_send, stderr_recv) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(mut stderr) = stderr_recv.blocking_recv() {
std::io::copy(&mut stderr, &mut std::io::stderr()).unwrap();
let (stdout_send, stdout_recv) = oneshot::channel::<Box<dyn std::io::Read + Send>>();
let stdout_thread = std::thread::spawn(move || {
if let Ok(mut cstdout) = stdout_recv.blocking_recv() {
if tty {
let mut stdout = stdout.lock();
let mut buf = [0_u8; CAP_1_KiB];
while let Ok(n) = cstdout.read(&mut buf) {
if n == 0 {
break;
}
stdout.write_all(&buf[..n]).ok();
stdout.flush().ok();
}
} else {
std::io::copy(&mut cstdout, &mut stdout.lock()).unwrap();
}
}
});
let (stderr_send, stderr_recv) = oneshot::channel::<Box<dyn std::io::Read + Send>>();
let stderr_thread = if !stderr_tty {
Some(std::thread::spawn(move || {
if let Ok(mut cstderr) = stderr_recv.blocking_recv() {
std::io::copy(&mut cstderr, &mut stderr.lock()).unwrap();
}
}))
} else {
None
};
nix::sched::unshare(CloneFlags::CLONE_NEWPID)
.with_ctx(|_| (ErrorKind::Filesystem, "unshare pid ns"))?;
nix::sched::unshare(CloneFlags::CLONE_NEWCGROUP)
.with_ctx(|_| (ErrorKind::Filesystem, "unshare cgroup ns"))?;
nix::sched::unshare(CloneFlags::CLONE_NEWIPC)
.with_ctx(|_| (ErrorKind::Filesystem, "unshare ipc ns"))?;
let mut child = cmd
.spawn()
.map_err(color_eyre::eyre::Report::msg)
.with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?;
send_pid.send(child.id() as i32).unwrap_or_default();
stdin_send
.send(child.stdin.take().unwrap())
.unwrap_or_default();
stdout_send
.send(child.stdout.take().unwrap())
.unwrap_or_default();
stderr_send
.send(child.stderr.take().unwrap())
.unwrap_or_default();
// TODO: subreaping, signal handling
let exit = child
.wait()
.with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?;
if let Some(code) = exit.code() {
nix::mount::umount(&chroot.join("proc"))
.with_ctx(|_| (ErrorKind::Filesystem, "umount procfs"))?;
std::process::exit(code);
} else if exit.success() {
Ok(())
if tty {
use pty_process::blocking as pty_process;
let (pty, pts) = pty_process::open().with_kind(ErrorKind::Filesystem)?;
let mut cmd = pty_process::Command::new("/usr/bin/start-cli");
cmd = cmd.arg("subcontainer").arg("launch-init");
if let Some(env) = env {
cmd = cmd.arg("--env").arg(env);
}
if let Some(workdir) = workdir {
cmd = cmd.arg("--workdir").arg(workdir);
}
if let Some(user) = user {
cmd = cmd.arg("--user").arg(user);
}
cmd = cmd.arg(&chroot).args(&command);
if !stderr_tty {
cmd = cmd.stderr(Stdio::piped());
}
let mut child = cmd
.spawn(pts)
.map_err(color_eyre::eyre::Report::msg)
.with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?;
send_pid.send(child.id() as i32).unwrap_or_default();
if let Some(pty_size) = pty_size {
let size = if let Some((x, y)) = pty_size.pixels {
::pty_process::Size::new_with_pixel(pty_size.size.0, pty_size.size.1, x, y)
} else {
::pty_process::Size::new(pty_size.size.0, pty_size.size.1)
};
pty.resize(size).with_kind(ErrorKind::Filesystem)?;
}
let shared = ArcPty(Arc::new(pty));
stdin_send
.send(Box::new(shared.clone()))
.unwrap_or_default();
stdout_send
.send(Box::new(shared.clone()))
.unwrap_or_default();
if let Some(stderr) = child.stderr.take() {
stderr_send.send(Box::new(stderr)).unwrap_or_default();
}
let exit = child
.wait()
.with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?;
stdout_thread.join().unwrap();
stderr_thread.map(|t| t.join().unwrap());
if let Some(code) = exit.code() {
drop(raw);
std::process::exit(code);
} else if exit.success() {
Ok(())
} else {
Err(Error::new(
color_eyre::eyre::Report::msg(exit),
ErrorKind::Unknown,
))
}
} else {
Err(Error::new(
color_eyre::eyre::Report::msg(exit),
ErrorKind::Unknown,
))
let mut cmd = StdCommand::new("/usr/bin/start-cli");
cmd.arg("subcontainer").arg("launch-init");
if let Some(env) = env {
cmd.arg("--env").arg(env);
}
if let Some(workdir) = workdir {
cmd.arg("--workdir").arg(workdir);
}
if let Some(user) = user {
cmd.arg("--user").arg(user);
}
cmd.arg(&chroot);
cmd.args(&command);
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = cmd
.spawn()
.map_err(color_eyre::eyre::Report::msg)
.with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?;
send_pid.send(child.id() as i32).unwrap_or_default();
stdin_send
.send(Box::new(child.stdin.take().unwrap()))
.unwrap_or_default();
stdout_send
.send(Box::new(child.stdout.take().unwrap()))
.unwrap_or_default();
stderr_send
.send(Box::new(child.stderr.take().unwrap()))
.unwrap_or_default();
// TODO: subreaping, signal handling
let exit = child
.wait()
.with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?;
stdout_thread.join().unwrap();
stderr_thread.map(|t| t.join().unwrap());
if let Some(code) = exit.code() {
nix::mount::umount(&chroot.join("proc"))
.with_ctx(|_| (ErrorKind::Filesystem, "umount procfs"))?;
std::process::exit(code);
} else if exit.success() {
Ok(())
} else {
Err(Error::new(
color_eyre::eyre::Report::msg(exit),
ErrorKind::Unknown,
))
}
}
}
@@ -320,10 +410,28 @@ pub fn launch_init(_: ContainerCliContext, params: ExecParams) -> Result<(), Err
}
}
#[derive(Clone)]
struct ArcPty(Arc<pty_process::blocking::Pty>);
impl std::io::Write for ArcPty {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
(&*self.0).write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
(&*self.0).flush()
}
}
impl std::io::Read for ArcPty {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
(&*self.0).read(buf)
}
}
pub fn exec(
_: ContainerCliContext,
ExecParams {
force_tty,
force_stderr_tty,
pty_size,
env,
workdir,
user,
@@ -331,41 +439,8 @@ pub fn exec(
command,
}: ExecParams,
) -> Result<(), Error> {
if (std::io::stdin().is_terminal()
&& std::io::stdout().is_terminal()
&& std::io::stderr().is_terminal())
|| force_tty
{
let mut cmd = TtySpawn::new("/usr/bin/start-cli");
cmd.arg("subcontainer").arg("exec-command");
if let Some(env) = env {
cmd.arg("--env").arg(env);
}
if let Some(workdir) = workdir {
cmd.arg("--workdir").arg(workdir);
}
if let Some(user) = user {
cmd.arg("--user").arg(user);
}
cmd.arg(&chroot);
cmd.args(command.iter());
nix::sched::setns(
open_file_read(chroot.join("proc/1/ns/pid"))?,
CloneFlags::CLONE_NEWPID,
)
.with_ctx(|_| (ErrorKind::Filesystem, "set pid ns"))?;
nix::sched::setns(
open_file_read(chroot.join("proc/1/ns/cgroup"))?,
CloneFlags::CLONE_NEWCGROUP,
)
.with_ctx(|_| (ErrorKind::Filesystem, "set cgroup ns"))?;
nix::sched::setns(
open_file_read(chroot.join("proc/1/ns/ipc"))?,
CloneFlags::CLONE_NEWIPC,
)
.with_ctx(|_| (ErrorKind::Filesystem, "set ipc ns"))?;
std::process::exit(cmd.spawn().with_kind(ErrorKind::Filesystem)?);
}
use std::io::Write;
let mut sig = signal_hook::iterator::Signals::new(FWD_SIGNALS)?;
let (send_pid, recv_pid) = oneshot::channel();
std::thread::spawn(move || {
@@ -379,40 +454,67 @@ pub fn exec(
}
}
});
let mut cmd = StdCommand::new("/usr/bin/start-cli");
cmd.arg("subcontainer").arg("exec-command");
if let Some(env) = env {
cmd.arg("--env").arg(env);
}
if let Some(workdir) = workdir {
cmd.arg("--workdir").arg(workdir);
}
if let Some(user) = user {
cmd.arg("--user").arg(user);
}
cmd.arg(&chroot);
cmd.args(&command);
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let (stdin_send, stdin_recv) = oneshot::channel();
let mut stdin = std::io::stdin();
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let stderr_tty = force_stderr_tty || stderr.is_terminal();
let tty = force_tty || (stdin.is_terminal() && stdout.is_terminal());
let raw = if stdin.is_terminal() && stdout.is_terminal() {
Some(termion::get_tty()?.into_raw_mode()?)
} else {
None
};
let pty_size = pty_size.or_else(|| TermSize::get_current());
let (stdin_send, stdin_recv) = oneshot::channel::<Box<dyn Write + Send>>();
std::thread::spawn(move || {
if let Ok(mut stdin) = stdin_recv.blocking_recv() {
std::io::copy(&mut std::io::stdin(), &mut stdin).unwrap();
if let Ok(mut cstdin) = stdin_recv.blocking_recv() {
if tty {
let mut buf = [0_u8; CAP_1_KiB];
while let Ok(n) = stdin.read(&mut buf) {
if n == 0 {
break;
}
cstdin.write_all(&buf[..n]).ok();
cstdin.flush().ok();
}
} else {
std::io::copy(&mut stdin, &mut cstdin).unwrap();
}
}
});
let (stdout_send, stdout_recv) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(mut stdout) = stdout_recv.blocking_recv() {
std::io::copy(&mut stdout, &mut std::io::stdout()).unwrap();
}
});
let (stderr_send, stderr_recv) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(mut stderr) = stderr_recv.blocking_recv() {
std::io::copy(&mut stderr, &mut std::io::stderr()).unwrap();
let (stdout_send, stdout_recv) = oneshot::channel::<Box<dyn std::io::Read + Send>>();
let stdout_thread = std::thread::spawn(move || {
if let Ok(mut cstdout) = stdout_recv.blocking_recv() {
if tty {
let mut stdout = stdout.lock();
let mut buf = [0_u8; CAP_1_KiB];
while let Ok(n) = cstdout.read(&mut buf) {
if n == 0 {
break;
}
stdout.write_all(&buf[..n]).ok();
stdout.flush().ok();
}
} else {
std::io::copy(&mut cstdout, &mut stdout.lock()).unwrap();
}
}
});
let (stderr_send, stderr_recv) = oneshot::channel::<Box<dyn std::io::Read + Send>>();
let stderr_thread = if !stderr_tty {
Some(std::thread::spawn(move || {
if let Ok(mut cstderr) = stderr_recv.blocking_recv() {
std::io::copy(&mut cstderr, &mut stderr.lock()).unwrap();
}
}))
} else {
None
};
nix::sched::setns(
open_file_read(chroot.join("proc/1/ns/pid"))?,
CloneFlags::CLONE_NEWPID,
@@ -428,32 +530,110 @@ pub fn exec(
CloneFlags::CLONE_NEWIPC,
)
.with_ctx(|_| (ErrorKind::Filesystem, "set ipc ns"))?;
let mut child = cmd
.spawn()
.map_err(color_eyre::eyre::Report::msg)
.with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?;
send_pid.send(child.id() as i32).unwrap_or_default();
stdin_send
.send(child.stdin.take().unwrap())
.unwrap_or_default();
stdout_send
.send(child.stdout.take().unwrap())
.unwrap_or_default();
stderr_send
.send(child.stderr.take().unwrap())
.unwrap_or_default();
let exit = child
.wait()
.with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?;
if let Some(code) = exit.code() {
std::process::exit(code);
} else if exit.success() {
Ok(())
if tty {
use pty_process::blocking as pty_process;
let (pty, pts) = pty_process::open().with_kind(ErrorKind::Filesystem)?;
let mut cmd = pty_process::Command::new("/usr/bin/start-cli");
cmd = cmd.arg("subcontainer").arg("exec-command");
if let Some(env) = env {
cmd = cmd.arg("--env").arg(env);
}
if let Some(workdir) = workdir {
cmd = cmd.arg("--workdir").arg(workdir);
}
if let Some(user) = user {
cmd = cmd.arg("--user").arg(user);
}
cmd = cmd.arg(&chroot).args(&command);
if !stderr_tty {
cmd = cmd.stderr(Stdio::piped());
}
let mut child = cmd
.spawn(pts)
.map_err(color_eyre::eyre::Report::msg)
.with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?;
send_pid.send(child.id() as i32).unwrap_or_default();
if let Some(pty_size) = pty_size {
let size = if let Some((x, y)) = pty_size.pixels {
::pty_process::Size::new_with_pixel(pty_size.size.0, pty_size.size.1, x, y)
} else {
::pty_process::Size::new(pty_size.size.0, pty_size.size.1)
};
pty.resize(size).with_kind(ErrorKind::Filesystem)?;
}
let shared = ArcPty(Arc::new(pty));
stdin_send
.send(Box::new(shared.clone()))
.unwrap_or_default();
stdout_send
.send(Box::new(shared.clone()))
.unwrap_or_default();
if let Some(stderr) = child.stderr.take() {
stderr_send.send(Box::new(stderr)).unwrap_or_default();
}
let exit = child
.wait()
.with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?;
stdout_thread.join().unwrap();
stderr_thread.map(|t| t.join().unwrap());
if let Some(code) = exit.code() {
drop(raw);
std::process::exit(code);
} else if exit.success() {
Ok(())
} else {
Err(Error::new(
color_eyre::eyre::Report::msg(exit),
ErrorKind::Unknown,
))
}
} else {
Err(Error::new(
color_eyre::eyre::Report::msg(exit),
ErrorKind::Unknown,
))
let mut cmd = StdCommand::new("/usr/bin/start-cli");
cmd.arg("subcontainer").arg("exec-command");
if let Some(env) = env {
cmd.arg("--env").arg(env);
}
if let Some(workdir) = workdir {
cmd.arg("--workdir").arg(workdir);
}
if let Some(user) = user {
cmd.arg("--user").arg(user);
}
cmd.arg(&chroot);
cmd.args(&command);
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = cmd
.spawn()
.map_err(color_eyre::eyre::Report::msg)
.with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?;
send_pid.send(child.id() as i32).unwrap_or_default();
stdin_send
.send(Box::new(child.stdin.take().unwrap()))
.unwrap_or_default();
stdout_send
.send(Box::new(child.stdout.take().unwrap()))
.unwrap_or_default();
stderr_send
.send(Box::new(child.stderr.take().unwrap()))
.unwrap_or_default();
let exit = child
.wait()
.with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?;
stdout_thread.join().unwrap();
stderr_thread.map(|t| t.join().unwrap());
if let Some(code) = exit.code() {
std::process::exit(code);
} else if exit.success() {
Ok(())
} else {
Err(Error::new(
color_eyre::eyre::Report::msg(exit),
ErrorKind::Unknown,
))
}
}
}

View File

@@ -13,7 +13,8 @@ use chrono::{DateTime, Utc};
use clap::Parser;
use futures::future::BoxFuture;
use futures::stream::FusedStream;
use futures::{SinkExt, StreamExt, TryStreamExt};
use futures::{FutureExt, SinkExt, StreamExt, TryStreamExt};
use helpers::NonDetachingJoinHandle;
use imbl_value::{json, InternedString};
use itertools::Itertools;
use models::{ActionId, HostId, ImageId, PackageId, ProcedureName};
@@ -23,6 +24,7 @@ use rpc_toolkit::{from_fn_async, CallRemoteHandler, Empty, HandlerArgs, HandlerF
use serde::{Deserialize, Serialize};
use service_actor::ServiceActor;
use start_stop::StartStop;
use termion::raw::IntoRawMode;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::Command;
use tokio::sync::Notify;
@@ -43,7 +45,7 @@ use crate::s9pk::S9pk;
use crate::service::action::update_requested_actions;
use crate::service::service_map::InstallProgressHandles;
use crate::util::actor::concurrent::ConcurrentActor;
use crate::util::io::{create_file, AsyncReadStream};
use crate::util::io::{create_file, AsyncReadStream, TermSize};
use crate::util::net::WebSocketExt;
use crate::util::serde::{NoOutput, Pem};
use crate::util::Never;
@@ -739,6 +741,7 @@ pub struct AttachParams {
#[ts(type = "string[]")]
pub command: Vec<OsString>,
pub tty: bool,
pub stderr_tty: bool,
#[ts(skip)]
#[serde(rename = "__auth_session")]
session: Option<InternedString>,
@@ -755,6 +758,7 @@ pub async fn attach(
id,
command,
tty,
stderr_tty,
session,
subcontainer,
image_id,
@@ -864,6 +868,7 @@ pub async fn attach(
subcontainer_id: Guid,
command: Vec<OsString>,
tty: bool,
stderr_tty: bool,
image_id: ImageId,
workdir: Option<String>,
root_command: &RootCommand,
@@ -896,6 +901,10 @@ pub async fn attach(
cmd.arg("--force-tty");
}
if stderr_tty {
cmd.arg("--force-stderr-tty");
}
cmd.arg(&root_path).arg("--");
if command.is_empty() {
@@ -912,7 +921,7 @@ pub async fn attach(
let pid = nix::unistd::Pid::from_raw(child.id().or_not_found("child pid")? as i32);
let mut stdin = child.stdin.take().or_not_found("child stdin")?;
let mut stdin = Some(child.stdin.take().or_not_found("child stdin")?);
let mut current_in = "stdin".to_owned();
let mut current_out = "stdout";
@@ -943,6 +952,10 @@ pub async fn attach(
ws.send(Message::Binary(out))
.await
.with_kind(ErrorKind::Network)?;
} else {
ws.send(Message::Text("close-stdout".into()))
.await
.with_kind(ErrorKind::Network)?;
}
}
err = stderr.try_next() => {
@@ -956,6 +969,10 @@ pub async fn attach(
ws.send(Message::Binary(err))
.await
.with_kind(ErrorKind::Network)?;
} else {
ws.send(Message::Text("close-stderr".into()))
.await
.with_kind(ErrorKind::Network)?;
}
}
msg = ws.try_next() => {
@@ -967,7 +984,12 @@ pub async fn attach(
Message::Binary(data) => {
match &*current_in {
"stdin" => {
stdin.write_all(&data).await?;
if let Some(stdin) = &mut stdin {
stdin.write_all(&data).await?;
}
}
"close-stdin" => {
stdin.take();
}
"signal" => {
if data.len() != 4 {
@@ -1022,6 +1044,7 @@ pub async fn attach(
subcontainer_id,
command,
tty,
stderr_tty,
image_id,
workdir,
&root_command,
@@ -1100,6 +1123,7 @@ pub struct CliAttachParams {
#[arg(long, short)]
image_id: Option<ImageId>,
}
#[instrument[skip_all]]
pub async fn cli_attach(
HandlerArgs {
context,
@@ -1109,8 +1133,39 @@ pub async fn cli_attach(
..
}: HandlerArgs<CliContext, CliAttachParams>,
) -> Result<(), Error> {
use std::io::Write;
use tokio_tungstenite::tungstenite::Message;
let stdin = std::io::stdin();
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let tty = params.force_tty || (stdin.is_terminal() && stdout.is_terminal());
let raw = if stdin.is_terminal() && stdout.is_terminal() {
Some(termion::get_tty()?.into_raw_mode()?)
} else {
None
};
let (kill, thread_kill) = tokio::sync::oneshot::channel();
let (thread_send, recv) = tokio::sync::mpsc::channel(4 * CAP_1_KiB);
let stdin_thread: NonDetachingJoinHandle<()> = tokio::task::spawn_blocking(move || {
use std::io::Read;
let mut stdin = stdin.lock().bytes();
while thread_kill.is_empty() {
if let Some(b) = stdin.next() {
thread_send.blocking_send(b).unwrap();
} else {
break;
}
}
})
.into();
let mut stdin = Some(recv);
let guid: Guid = from_value(
context
.call_remote::<RpcContext>(
@@ -1118,10 +1173,9 @@ pub async fn cli_attach(
json!({
"id": params.id,
"command": params.command,
"tty": (std::io::stdin().is_terminal()
&& std::io::stdout().is_terminal()
&& std::io::stderr().is_terminal())
|| params.force_tty,
"tty": tty,
"stderrTty": stderr.is_terminal(),
"ptySize": if tty { TermSize::get_current() } else { None },
"subcontainer": params.subcontainer,
"imageId": params.image_id,
"name": params.name,
@@ -1136,9 +1190,8 @@ pub async fn cli_attach(
ws.send(Message::Text(current_in.into()))
.await
.with_kind(ErrorKind::Network)?;
let mut stdin = AsyncReadStream::new(tokio::io::stdin(), 4 * CAP_1_KiB).fuse();
let mut stdout = tokio::io::stdout();
let mut stderr = tokio::io::stderr();
let mut stdout = Some(stdout);
let mut stderr = Some(stderr);
loop {
futures::select_biased! {
// signal = tokio:: => {
@@ -1153,8 +1206,15 @@ pub async fn cli_attach(
// i32::to_be_bytes(exit.into_raw()).to_vec()
// )).await.with_kind(ErrorKind::Network)?;
// }
input = stdin.try_next() => {
if let Some(input) = input? {
input = stdin.as_mut().map_or(
futures::future::Either::Left(futures::future::pending()),
|s| futures::future::Either::Right(s.recv())
).fuse() => {
if let (Some(input), Some(stdin)) = (input.transpose()?, &mut stdin) {
let mut input = vec![input];
while let Ok(b) = stdin.try_recv() {
input.push(b?);
}
if current_in != "stdin" {
ws.send(Message::Text("stdin".into()))
.await
@@ -1164,6 +1224,11 @@ pub async fn cli_attach(
ws.send(Message::Binary(input))
.await
.with_kind(ErrorKind::Network)?;
} else {
ws.send(Message::Text("close-stdin".into()))
.await
.with_kind(ErrorKind::Network)?;
stdin.take();
}
}
msg = ws.try_next() => {
@@ -1175,12 +1240,22 @@ pub async fn cli_attach(
Message::Binary(data) => {
match &*current_out {
"stdout" => {
stdout.write_all(&data).await?;
stdout.flush().await?;
if let Some(stdout) = &mut stdout {
stdout.write_all(&data)?;
stdout.flush()?;
}
}
"stderr" => {
stderr.write_all(&data).await?;
stderr.flush().await?;
if let Some(stderr) = &mut stderr {
stderr.write_all(&data)?;
stderr.flush()?;
}
}
"close-stdout" => {
stdout.take();
}
"close-stderr" => {
stderr.take();
}
"exit" => {
if data.len() != 4 {
@@ -1192,6 +1267,7 @@ pub async fn cli_attach(
let mut exit_buf = [0u8; 4];
exit_buf.clone_from_slice(&data);
let code = i32::from_be_bytes(exit_buf);
drop(raw);
std::process::exit(code);
}
_ => (),
@@ -1208,6 +1284,8 @@ pub async fn cli_attach(
_ => ()
}
} else {
kill.send(()).ok();
stdin_thread.wait_for_abort().await.log_err();
return Ok(())
}
}

View File

@@ -5,16 +5,20 @@ use std::mem::MaybeUninit;
use std::os::unix::prelude::MetadataExt;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::task::{Poll, Waker};
use std::time::Duration;
use bytes::{Buf, BytesMut};
use clap::builder::ValueParserFactory;
use futures::future::{BoxFuture, Fuse};
use futures::{AsyncSeek, FutureExt, Stream, TryStreamExt};
use helpers::NonDetachingJoinHandle;
use models::FromStrParser;
use nix::unistd::{Gid, Uid};
use serde::{Deserialize, Serialize};
use tokio::fs::{File, OpenOptions};
use tokio::io::{
duplex, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, DuplexStream, ReadBuf, WriteHalf,
@@ -24,6 +28,7 @@ use tokio::sync::{Notify, OwnedMutexGuard};
use tokio::time::{Instant, Sleep};
use crate::prelude::*;
use crate::util::sync::SyncMutex;
use crate::{CAP_1_KiB, CAP_1_MiB};
pub trait AsyncReadSeek: AsyncRead + AsyncSeek {}
@@ -1395,3 +1400,73 @@ impl<T: AsyncRead> Stream for AsyncReadStream<T> {
}
}
}
pub struct SharedIO<T>(pub Arc<SyncMutex<T>>);
impl<T> SharedIO<T> {
pub fn new(t: T) -> Self {
Self(Arc::new(SyncMutex::new(t)))
}
}
impl<T> Clone for SharedIO<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T: std::io::Write> std::io::Write for SharedIO<T> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.mutate(|w| w.write(buf))
}
fn flush(&mut self) -> std::io::Result<()> {
self.0.mutate(|w| w.flush())
}
}
impl<T: std::io::Read> std::io::Read for SharedIO<T> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.0.mutate(|r| r.read(buf))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TermSize {
pub size: (u16, u16),
pub pixels: Option<(u16, u16)>,
}
impl TermSize {
pub fn get_current() -> Option<Self> {
if let Some(size) = termion::terminal_size().ok() {
Some(Self {
size,
pixels: termion::terminal_size_pixels().ok(),
})
} else {
None
}
}
}
impl FromStr for TermSize {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
(|| {
let mut split = s.split(":");
let row: u16 = split.next()?.parse().ok()?;
let col: u16 = split.next()?.parse().ok()?;
let size = (row, col);
let pixels = if let Some(x) = split.next() {
let x: u16 = x.parse().ok()?;
let y: u16 = split.next()?.parse().ok()?;
Some((x, y))
} else {
None
};
Some(Self { size, pixels }).filter(|_| split.next().is_none())
})()
.ok_or_else(|| Error::new(eyre!("invalid pty size"), ErrorKind::ParseNumber))
}
}
impl ValueParserFactory for TermSize {
type Parser = FromStrParser<Self>;
fn value_parser() -> Self::Parser {
FromStrParser::new()
}
}