add support for remote attaching to container (#2732)

* add support for remote attaching to container

* feature: Add in the subcontainer searching

* feat: Add in the name/ imageId filtering

* Feat: Fix the env and the workdir

* chore: Make the sigkill first?

* add some extra guard on term

* fix: Health during error doesnt return what we need

* chore: Cleanup for pr

* fix build

* fix build

* Update startos-iso.yaml

* Update startos-iso.yaml

* Update startos-iso.yaml

* Update startos-iso.yaml

* Update startos-iso.yaml

* Update startos-iso.yaml

* Update startos-iso.yaml

* check status during build

---------

Co-authored-by: J H <dragondef@gmail.com>
This commit is contained in:
Aiden McClelland
2024-09-20 15:38:16 -06:00
committed by GitHub
parent 24c6cd235b
commit eec5cf6b65
31 changed files with 852 additions and 269 deletions

View File

@@ -275,8 +275,8 @@ pub struct Session {
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct SessionList {
#[ts(type = "string")]
current: InternedString,
#[ts(type = "string | null")]
current: Option<InternedString>,
sessions: Sessions,
}
@@ -323,7 +323,7 @@ fn display_sessions(params: WithIoFormat<ListParams>, arg: SessionList) {
session.user_agent.as_deref().unwrap_or("N/A"),
&format!("{}", session.metadata),
];
if id == arg.current {
if Some(id) == arg.current {
row.iter_mut()
.map(|c| c.style(Attr::ForegroundColor(color::GREEN)))
.collect::<()>()
@@ -340,7 +340,7 @@ pub struct ListParams {
#[arg(skip)]
#[ts(skip)]
#[serde(rename = "__auth_session")] // from Auth middleware
session: InternedString,
session: Option<InternedString>,
}
// #[command(display(display_sessions))]

View File

@@ -58,7 +58,7 @@ pub struct RpcContextSeed {
pub shutdown: broadcast::Sender<Option<Shutdown>>,
pub tor_socks: SocketAddr,
pub lxc_manager: Arc<LxcManager>,
pub open_authed_continuations: OpenAuthedContinuations<InternedString>,
pub open_authed_continuations: OpenAuthedContinuations<Option<InternedString>>,
pub rpc_continuations: RpcContinuations,
pub callbacks: ServiceCallbacks,
pub wifi_manager: Option<Arc<RwLock<WpaCli>>>,
@@ -431,8 +431,8 @@ impl AsRef<RpcContinuations> for RpcContext {
&self.rpc_continuations
}
}
impl AsRef<OpenAuthedContinuations<InternedString>> for RpcContext {
fn as_ref(&self) -> &OpenAuthedContinuations<InternedString> {
impl AsRef<OpenAuthedContinuations<Option<InternedString>>> for RpcContext {
fn as_ref(&self) -> &OpenAuthedContinuations<Option<InternedString>> {
&self.open_authed_continuations
}
}

View File

@@ -115,7 +115,7 @@ pub struct SubscribeParams {
pointer: Option<JsonPointer>,
#[ts(skip)]
#[serde(rename = "__auth_session")]
session: InternedString,
session: Option<InternedString>,
}
#[derive(Deserialize, Serialize, TS)]

View File

@@ -173,7 +173,7 @@ pub async fn install(
pub struct SideloadParams {
#[ts(skip)]
#[serde(rename = "__auth_session")]
session: InternedString,
session: Option<InternedString>,
}
#[derive(Deserialize, Serialize, TS)]

View File

@@ -323,6 +323,13 @@ pub fn package<C: Context>() -> ParentHandler<C> {
"connect",
from_fn_async(service::connect_rpc_cli).no_display(),
)
.subcommand(
"attach",
from_fn_async(service::attach)
.with_metadata("get_session", Value::Bool(true))
.no_cli(),
)
.subcommand("attach", from_fn_async(service::cli_attach).no_display())
}
pub fn diagnostic_api() -> ParentHandler<DiagnosticContext> {

View File

@@ -49,7 +49,7 @@ impl HasLoggedOutSessions {
.map(|s| s.as_logout_session_id())
.collect();
for sid in &to_log_out {
ctx.open_authed_continuations.kill(sid)
ctx.open_authed_continuations.kill(&Some(sid.clone()))
}
ctx.ephemeral_sessions.mutate(|s| {
for sid in &to_log_out {

View File

@@ -1,12 +1,15 @@
use std::path::{Path, PathBuf};
use imbl_value::InternedString;
use models::ImageId;
use tokio::process::Command;
use crate::disk::mount::filesystem::overlayfs::OverlayGuard;
use crate::rpc_continuations::Guid;
use crate::service::effects::prelude::*;
use crate::util::Invoke;
use crate::{
disk::mount::filesystem::overlayfs::OverlayGuard, service::persistent_container::Subcontainer,
};
#[cfg(feature = "container-runtime")]
mod sync;
@@ -38,7 +41,7 @@ pub async fn destroy_subcontainer_fs(
.await
.remove(&guid)
{
overlay.unmount(true).await?;
overlay.overlay.unmount(true).await?;
} else {
tracing::warn!("Could not find a subcontainer fs to destroy; assumming that it already is destroyed and will be skipping");
}
@@ -50,11 +53,13 @@ pub async fn destroy_subcontainer_fs(
#[ts(export)]
pub struct CreateSubcontainerFsParams {
image_id: ImageId,
#[ts(type = "string | null")]
name: Option<InternedString>,
}
#[instrument(skip_all)]
pub async fn create_subcontainer_fs(
context: EffectContext,
CreateSubcontainerFsParams { image_id }: CreateSubcontainerFsParams,
CreateSubcontainerFsParams { image_id, name }: CreateSubcontainerFsParams,
) -> Result<(PathBuf, Guid), Error> {
let context = context.deref()?;
if let Some(image) = context
@@ -87,7 +92,13 @@ pub async fn create_subcontainer_fs(
.with_kind(ErrorKind::Incoherent)?,
);
tracing::info!("Mounting overlay {guid} for {image_id}");
let guard = OverlayGuard::mount(image, &mountpoint).await?;
let subcontainer_wrapper = Subcontainer {
overlay: OverlayGuard::mount(image, &mountpoint).await?,
name: name
.unwrap_or_else(|| InternedString::intern(format!("subcontainer-{}", image_id))),
image_id: image_id.clone(),
};
Command::new("chown")
.arg("100000:100000")
.arg(&mountpoint)
@@ -100,7 +111,7 @@ pub async fn create_subcontainer_fs(
.subcontainers
.lock()
.await
.insert(guid.clone(), guard);
.insert(guid.clone(), subcontainer_wrapper);
Ok((container_mountpoint, guid))
} else {
Err(Error::new(

View File

@@ -1,4 +1,3 @@
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::ffi::{c_int, OsStr, OsString};
use std::fs::File;
@@ -12,7 +11,6 @@ use nix::unistd::Pid;
use signal_hook::consts::signal::*;
use tokio::sync::oneshot;
use tty_spawn::TtySpawn;
use unshare::Command as NSCommand;
use crate::service::effects::prelude::*;
use crate::service::effects::ContainerCliContext;
@@ -50,11 +48,13 @@ fn open_file_read(path: impl AsRef<Path>) -> Result<File, Error> {
#[derive(Debug, Clone, Serialize, Deserialize, Parser)]
pub struct ExecParams {
#[arg(short = 'e', long = "env")]
#[arg(long)]
force_tty: bool,
#[arg(short, long)]
env: Option<PathBuf>,
#[arg(short = 'w', long = "workdir")]
#[arg(short, long)]
workdir: Option<PathBuf>,
#[arg(short = 'u', long = "user")]
#[arg(short, long)]
user: Option<String>,
chroot: PathBuf,
#[arg(trailing_var_arg = true)]
@@ -68,6 +68,7 @@ impl ExecParams {
user,
chroot,
command,
..
} = self;
let Some(([command], args)) = command.split_at_checked(1) else {
return Err(Error::new(
@@ -88,16 +89,6 @@ impl ExecParams {
.collect::<BTreeMap<_, _>>();
std::os::unix::fs::chroot(chroot)
.with_ctx(|_| (ErrorKind::Filesystem, lazy_format!("chroot {chroot:?}")))?;
let command = which::which_in(
command,
env.get("PATH")
.copied()
.map(Cow::Borrowed)
.or_else(|| std::env::var("PATH").ok().map(Cow::Owned))
.as_deref(),
workdir.as_deref().unwrap_or(Path::new("/")),
)
.with_kind(ErrorKind::Filesystem)?;
let mut cmd = StdCommand::new(command);
cmd.args(args);
for (k, v) in env {
@@ -135,6 +126,7 @@ impl ExecParams {
pub fn launch(
_: ContainerCliContext,
ExecParams {
force_tty,
env,
workdir,
user,
@@ -142,47 +134,8 @@ pub fn launch(
command,
}: ExecParams,
) -> Result<(), Error> {
use unshare::{Namespace, Stdio};
use crate::service::cli::ContainerCliContext;
let mut sig = signal_hook::iterator::Signals::new(FWD_SIGNALS)?;
let mut cmd = NSCommand::new("/usr/bin/start-cli");
cmd.arg("subcontainer").arg("launch-init");
if let Some(env) = env {
cmd.arg("--env").arg(env);
}
if let Some(workdir) = workdir {
cmd.arg("--workdir").arg(workdir);
}
if let Some(user) = user {
cmd.arg("--user").arg(user);
}
cmd.arg(&chroot);
cmd.args(&command);
cmd.unshare(&[Namespace::Pid, Namespace::Cgroup, Namespace::Ipc]);
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let (stdin_send, stdin_recv) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(mut stdin) = stdin_recv.blocking_recv() {
std::io::copy(&mut std::io::stdin(), &mut stdin).unwrap();
}
});
let (stdout_send, stdout_recv) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(mut stdout) = stdout_recv.blocking_recv() {
std::io::copy(&mut stdout, &mut std::io::stdout()).unwrap();
}
});
let (stderr_send, stderr_recv) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(mut stderr) = stderr_recv.blocking_recv() {
std::io::copy(&mut stderr, &mut std::io::stderr()).unwrap();
}
});
if chroot.join("proc/1").exists() {
let ns_id = procfs::process::Process::new_with_root(chroot.join("proc"))
let ns_id = procfs::process::Process::new_with_root(chroot.join("proc/1"))
.with_ctx(|_| (ErrorKind::Filesystem, "open subcontainer procfs"))?
.namespaces()
.with_ctx(|_| (ErrorKind::Filesystem, "read subcontainer pid 1 ns"))?
@@ -225,20 +178,92 @@ pub fn launch(
nix::mount::umount(&chroot.join("proc"))
.with_ctx(|_| (ErrorKind::Filesystem, "unmounting subcontainer procfs"))?;
}
if (std::io::stdin().is_terminal()
&& std::io::stdout().is_terminal()
&& std::io::stderr().is_terminal())
|| force_tty
{
let mut cmd = TtySpawn::new("/usr/bin/start-cli");
cmd.arg("subcontainer").arg("launch-init");
if let Some(env) = env {
cmd.arg("--env").arg(env);
}
if let Some(workdir) = workdir {
cmd.arg("--workdir").arg(workdir);
}
if let Some(user) = user {
cmd.arg("--user").arg(user);
}
cmd.arg(&chroot);
cmd.args(command.iter());
nix::sched::unshare(CloneFlags::CLONE_NEWPID)
.with_ctx(|_| (ErrorKind::Filesystem, "unshare pid ns"))?;
nix::sched::unshare(CloneFlags::CLONE_NEWCGROUP)
.with_ctx(|_| (ErrorKind::Filesystem, "unshare cgroup ns"))?;
nix::sched::unshare(CloneFlags::CLONE_NEWIPC)
.with_ctx(|_| (ErrorKind::Filesystem, "unshare ipc ns"))?;
std::process::exit(cmd.spawn().with_kind(ErrorKind::Filesystem)?);
}
let mut sig = signal_hook::iterator::Signals::new(FWD_SIGNALS)?;
let (send_pid, recv_pid) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(pid) = recv_pid.blocking_recv() {
for sig in sig.forever() {
nix::sys::signal::kill(
Pid::from_raw(pid),
Some(nix::sys::signal::Signal::try_from(sig).unwrap()),
)
.unwrap();
}
}
});
let mut cmd = StdCommand::new("/usr/bin/start-cli");
cmd.arg("subcontainer").arg("launch-init");
if let Some(env) = env {
cmd.arg("--env").arg(env);
}
if let Some(workdir) = workdir {
cmd.arg("--workdir").arg(workdir);
}
if let Some(user) = user {
cmd.arg("--user").arg(user);
}
cmd.arg(&chroot);
cmd.args(&command);
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let (stdin_send, stdin_recv) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(mut stdin) = stdin_recv.blocking_recv() {
std::io::copy(&mut std::io::stdin(), &mut stdin).unwrap();
}
});
let (stdout_send, stdout_recv) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(mut stdout) = stdout_recv.blocking_recv() {
std::io::copy(&mut stdout, &mut std::io::stdout()).unwrap();
}
});
let (stderr_send, stderr_recv) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(mut stderr) = stderr_recv.blocking_recv() {
std::io::copy(&mut stderr, &mut std::io::stderr()).unwrap();
}
});
nix::sched::unshare(CloneFlags::CLONE_NEWPID)
.with_ctx(|_| (ErrorKind::Filesystem, "unshare pid ns"))?;
nix::sched::unshare(CloneFlags::CLONE_NEWCGROUP)
.with_ctx(|_| (ErrorKind::Filesystem, "unshare cgroup ns"))?;
nix::sched::unshare(CloneFlags::CLONE_NEWIPC)
.with_ctx(|_| (ErrorKind::Filesystem, "unshare ipc ns"))?;
let mut child = cmd
.spawn()
.map_err(color_eyre::eyre::Report::msg)
.with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?;
let pid = child.pid();
std::thread::spawn(move || {
for sig in sig.forever() {
nix::sys::signal::kill(
Pid::from_raw(pid),
Some(nix::sys::signal::Signal::try_from(sig).unwrap()),
)
.unwrap();
}
});
send_pid.send(child.id() as i32).unwrap_or_default();
stdin_send
.send(child.stdin.take().unwrap())
.unwrap_or_default();
@@ -253,16 +278,16 @@ pub fn launch(
.wait()
.with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?;
if let Some(code) = exit.code() {
nix::mount::umount(&chroot.join("proc"))
.with_ctx(|_| (ErrorKind::Filesystem, "umount procfs"))?;
std::process::exit(code);
} else if exit.success() {
Ok(())
} else {
if exit.success() {
Ok(())
} else {
Err(Error::new(
color_eyre::eyre::Report::msg(exit),
ErrorKind::Unknown,
))
}
Err(Error::new(
color_eyre::eyre::Report::msg(exit),
ErrorKind::Unknown,
))
}
}
@@ -288,6 +313,7 @@ pub fn launch_init(_: ContainerCliContext, params: ExecParams) -> Result<(), Err
pub fn exec(
_: ContainerCliContext,
ExecParams {
force_tty,
env,
workdir,
user,
@@ -295,7 +321,11 @@ pub fn exec(
command,
}: ExecParams,
) -> Result<(), Error> {
if std::io::stdin().is_terminal() {
if (std::io::stdin().is_terminal()
&& std::io::stdout().is_terminal()
&& std::io::stderr().is_terminal())
|| force_tty
{
let mut cmd = TtySpawn::new("/usr/bin/start-cli");
cmd.arg("subcontainer").arg("exec-command");
if let Some(env) = env {
@@ -407,15 +437,13 @@ pub fn exec(
.with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?;
if let Some(code) = exit.code() {
std::process::exit(code);
} else if exit.success() {
Ok(())
} else {
if exit.success() {
Ok(())
} else {
Err(Error::new(
color_eyre::eyre::Report::msg(exit),
ErrorKind::Unknown,
))
}
Err(Error::new(
color_eyre::eyre::Report::msg(exit),
ErrorKind::Unknown,
))
}
}

View File

@@ -1,18 +1,31 @@
use std::io::IsTerminal;
use std::ops::Deref;
use std::os::unix::process::ExitStatusExt;
use std::path::Path;
use std::process::Stdio;
use std::sync::{Arc, Weak};
use std::time::Duration;
use std::{ffi::OsString, path::PathBuf};
use axum::extract::ws::WebSocket;
use chrono::{DateTime, Utc};
use clap::Parser;
use futures::future::BoxFuture;
use imbl::OrdMap;
use models::{HealthCheckId, PackageId, ProcedureName};
use persistent_container::PersistentContainer;
use futures::stream::FusedStream;
use futures::{SinkExt, StreamExt, TryStreamExt};
use imbl_value::{json, InternedString};
use itertools::Itertools;
use models::{ImageId, PackageId, ProcedureName};
use nix::sys::signal::Signal;
use persistent_container::{PersistentContainer, Subcontainer};
use rpc_toolkit::{from_fn_async, CallRemoteHandler, Empty, HandlerArgs, HandlerFor};
use serde::{Deserialize, Serialize};
use service_actor::ServiceActor;
use start_stop::StartStop;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::Command;
use tokio::sync::Notify;
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
use ts_rs::TS;
use crate::context::{CliContext, RpcContext};
@@ -24,15 +37,16 @@ use crate::install::PKG_ARCHIVE_DIR;
use crate::lxc::ContainerId;
use crate::prelude::*;
use crate::progress::{NamedProgress, Progress};
use crate::rpc_continuations::Guid;
use crate::rpc_continuations::{Guid, RpcContinuation};
use crate::s9pk::S9pk;
use crate::service::service_map::InstallProgressHandles;
use crate::status::health_check::NamedHealthCheckResult;
use crate::util::actor::concurrent::ConcurrentActor;
use crate::util::io::create_file;
use crate::util::io::{create_file, AsyncReadStream};
use crate::util::net::WebSocketExt;
use crate::util::serde::{NoOutput, Pem};
use crate::util::Never;
use crate::volume::data_dir;
use crate::CAP_1_KiB;
mod action;
pub mod cli;
@@ -68,6 +82,8 @@ pub enum LoadDisposition {
Undo,
}
struct RootCommand(pub String);
pub struct ServiceRef(Arc<Service>);
impl ServiceRef {
pub fn weak(&self) -> Weak<Service> {
@@ -183,7 +199,7 @@ impl ServiceRef {
impl Deref for ServiceRef {
type Target = Service;
fn deref(&self) -> &Self::Target {
&*self.0
&self.0
}
}
impl From<Service> for ServiceRef {
@@ -354,7 +370,7 @@ impl Service {
tracing::debug!("{e:?}")
})
{
match ServiceRef::from(service).uninstall(None).await {
match service.uninstall(None).await {
Err(e) => {
tracing::error!("Error uninstalling service: {e}");
tracing::debug!("{e:?}")
@@ -578,3 +594,488 @@ pub async fn connect_rpc_cli(
crate::lxc::connect_cli(&ctx, guid).await
}
#[derive(Deserialize, Serialize, TS)]
#[serde(rename_all = "camelCase")]
pub struct AttachParams {
pub id: PackageId,
#[ts(type = "string[]")]
pub command: Vec<OsString>,
pub tty: bool,
#[ts(skip)]
#[serde(rename = "__auth_session")]
session: Option<InternedString>,
#[ts(type = "string | null")]
subcontainer: Option<InternedString>,
#[ts(type = "string | null")]
name: Option<InternedString>,
#[ts(type = "string | null")]
image_id: Option<ImageId>,
}
pub async fn attach(
ctx: RpcContext,
AttachParams {
id,
command,
tty,
session,
subcontainer,
image_id,
name,
}: AttachParams,
) -> Result<Guid, Error> {
let (container_id, subcontainer_id, image_id, workdir, root_command) = {
let id = &id;
let service = ctx.services.get(id).await;
let service_ref = service.as_ref().or_not_found(id)?;
let container = &service_ref.seed.persistent_container;
let root_dir = container
.lxc_container
.get()
.map(|x| x.rootfs_dir().to_owned())
.or_not_found(format!("container for {id}"))?;
let subcontainer = subcontainer.map(|x| AsRef::<str>::as_ref(&x).to_uppercase());
let name = name.map(|x| AsRef::<str>::as_ref(&x).to_uppercase());
let image_id = image_id.map(|x| AsRef::<Path>::as_ref(&x).to_string_lossy().to_uppercase());
let subcontainers = container.subcontainers.lock().await;
let subcontainer_ids: Vec<_> = subcontainers
.iter()
.filter(|(x, wrapper)| {
if let Some(subcontainer) = subcontainer.as_ref() {
AsRef::<str>::as_ref(x).contains(AsRef::<str>::as_ref(subcontainer))
} else if let Some(name) = name.as_ref() {
AsRef::<str>::as_ref(&wrapper.name)
.to_uppercase()
.contains(AsRef::<str>::as_ref(name))
} else if let Some(image_id) = image_id.as_ref() {
let Some(wrapper_image_id) = AsRef::<Path>::as_ref(&wrapper.image_id).to_str()
else {
return false;
};
wrapper_image_id
.to_uppercase()
.contains(AsRef::<str>::as_ref(&image_id))
} else {
true
}
})
.collect();
let format_subcontainer_pair = |(guid, wrapper): (&Guid, &Subcontainer)| {
format!(
"{guid} imageId: {image_id} name: \"{name}\"",
name = &wrapper.name,
image_id = &wrapper.image_id
)
};
let Some((subcontainer_id, image_id)) = subcontainer_ids
.first()
.map::<(Guid, ImageId), _>(|&x| (x.0.clone(), x.1.image_id.clone()))
else {
drop(subcontainers);
let subcontainers = container
.subcontainers
.lock()
.await
.iter()
.map(format_subcontainer_pair)
.join("\n");
return Err(Error::new(
eyre!("no matching subcontainers are running for {id}; some possible choices are:\n{subcontainers}"),
ErrorKind::NotFound,
));
};
let passwd = root_dir
.join("media/startos/subcontainers")
.join(subcontainer_id.as_ref())
.join("etc")
.join("passwd");
let root_command = get_passwd_root_command(passwd).await;
let workdir = attach_workdir(&image_id, &root_dir).await?;
if subcontainer_ids.len() > 1 {
let subcontainer_ids = subcontainer_ids
.into_iter()
.map(format_subcontainer_pair)
.join("\n");
return Err(Error::new(
eyre!("multiple subcontainers found for {id}: \n{subcontainer_ids}"),
ErrorKind::InvalidRequest,
));
}
(
service_ref.container_id()?,
subcontainer_id,
image_id,
workdir,
root_command,
)
};
let guid = Guid::new();
async fn handler(
ws: &mut WebSocket,
container_id: ContainerId,
subcontainer_id: Guid,
command: Vec<OsString>,
tty: bool,
image_id: ImageId,
workdir: Option<String>,
root_command: &RootCommand,
) -> Result<(), Error> {
use axum::extract::ws::Message;
let mut ws = ws.fuse();
let mut cmd = Command::new("lxc-attach");
let root_path = Path::new("/media/startos/subcontainers").join(subcontainer_id.as_ref());
cmd.kill_on_drop(true);
cmd.arg(&*container_id)
.arg("--")
.arg("start-cli")
.arg("subcontainer")
.arg("exec")
.arg("--env")
.arg(
Path::new("/media/startos/images")
.join(image_id)
.with_extension("env"),
);
if let Some(workdir) = workdir {
cmd.arg("--workdir").arg(workdir);
}
if tty {
cmd.arg("--force-tty");
}
cmd.arg(&root_path).arg("--");
if command.is_empty() {
cmd.arg(&root_command.0);
} else {
cmd.args(&command);
}
let mut child = cmd
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let pid = nix::unistd::Pid::from_raw(child.id().or_not_found("child pid")? as i32);
let mut stdin = child.stdin.take().or_not_found("child stdin")?;
let mut current_in = "stdin".to_owned();
let mut current_out = "stdout";
ws.send(Message::Text(current_out.into()))
.await
.with_kind(ErrorKind::Network)?;
let mut stdout = AsyncReadStream::new(
child.stdout.take().or_not_found("child stdout")?,
4 * CAP_1_KiB,
)
.fuse();
let mut stderr = AsyncReadStream::new(
child.stderr.take().or_not_found("child stderr")?,
4 * CAP_1_KiB,
)
.fuse();
loop {
futures::select_biased! {
out = stdout.try_next() => {
if let Some(out) = out? {
if current_out != "stdout" {
ws.send(Message::Text("stdout".into()))
.await
.with_kind(ErrorKind::Network)?;
current_out = "stdout";
}
dbg!(&current_out);
ws.send(Message::Binary(out))
.await
.with_kind(ErrorKind::Network)?;
}
}
err = stderr.try_next() => {
if let Some(err) = err? {
if current_out != "stderr" {
ws.send(Message::Text("stderr".into()))
.await
.with_kind(ErrorKind::Network)?;
current_out = "stderr";
}
dbg!(&current_out);
ws.send(Message::Binary(err))
.await
.with_kind(ErrorKind::Network)?;
}
}
msg = ws.try_next() => {
if let Some(msg) = msg.with_kind(ErrorKind::Network)? {
match msg {
Message::Text(in_ty) => {
current_in = in_ty;
}
Message::Binary(data) => {
match &*current_in {
"stdin" => {
stdin.write_all(&data).await?;
}
"signal" => {
if data.len() != 4 {
return Err(Error::new(
eyre!("invalid byte length for signal: {}", data.len()),
ErrorKind::InvalidRequest
));
}
let mut sig_buf = [0u8; 4];
sig_buf.clone_from_slice(&data);
nix::sys::signal::kill(
pid,
Signal::try_from(i32::from_be_bytes(sig_buf))
.with_kind(ErrorKind::InvalidRequest)?
).with_kind(ErrorKind::Filesystem)?;
}
_ => (),
}
}
_ => ()
}
} else {
return Ok(())
}
}
}
if stdout.is_terminated() && stderr.is_terminated() {
break;
}
}
let exit = child.wait().await?;
ws.send(Message::Text("exit".into()))
.await
.with_kind(ErrorKind::Network)?;
ws.send(Message::Binary(i32::to_be_bytes(exit.into_raw()).to_vec()))
.await
.with_kind(ErrorKind::Network)?;
Ok(())
}
ctx.rpc_continuations
.add(
guid.clone(),
RpcContinuation::ws_authed(
&ctx,
session,
move |mut ws| async move {
if let Err(e) = handler(
&mut ws,
container_id,
subcontainer_id,
command,
tty,
image_id,
workdir,
&root_command,
)
.await
{
tracing::error!("Error in attach websocket: {e}");
tracing::debug!("{e:?}");
ws.close_result(Err::<&str, _>(e)).await.log_err();
} else {
ws.normal_close("exit").await.log_err();
}
},
Duration::from_secs(30),
),
)
.await;
Ok(guid)
}
async fn attach_workdir(image_id: &ImageId, root_dir: &Path) -> Result<Option<String>, Error> {
let path_str = root_dir.join("media/startos/images/");
let mut subcontainer_json =
tokio::fs::File::open(path_str.join(image_id).with_extension("json")).await?;
let mut contents = vec![];
subcontainer_json.read_to_end(&mut contents).await?;
let subcontainer_json: serde_json::Value =
serde_json::from_slice(&contents).with_kind(ErrorKind::Filesystem)?;
Ok(subcontainer_json["workdir"].as_str().map(|x| x.to_string()))
}
async fn get_passwd_root_command(etc_passwd_path: PathBuf) -> RootCommand {
async {
let mut file = tokio::fs::File::open(etc_passwd_path).await?;
let mut contents = vec![];
file.read_to_end(&mut contents).await?;
let contents = String::from_utf8_lossy(&contents);
for line in contents.split('\n') {
let line_information = line.split(':').collect::<Vec<_>>();
if let (Some(&"root"), Some(shell)) =
(line_information.first(), line_information.last())
{
return Ok(shell.to_string());
}
}
Err(Error::new(
eyre!("Could not parse /etc/passwd for shell: {}", contents),
ErrorKind::Filesystem,
))
}
.await
.map(RootCommand)
.unwrap_or_else(|e| {
tracing::error!("Could not get the /etc/passwd: {e}");
tracing::debug!("{e:?}");
RootCommand("/bin/sh".to_string())
})
}
#[derive(Deserialize, Serialize, Parser)]
pub struct CliAttachParams {
pub id: PackageId,
#[arg(long)]
pub force_tty: bool,
#[arg(trailing_var_arg = true)]
pub command: Vec<OsString>,
#[arg(long, short)]
subcontainer: Option<InternedString>,
#[arg(long, short)]
name: Option<InternedString>,
#[arg(long, short)]
image_id: Option<ImageId>,
}
pub async fn cli_attach(
HandlerArgs {
context,
parent_method,
method,
params,
..
}: HandlerArgs<CliContext, CliAttachParams>,
) -> Result<(), Error> {
use tokio_tungstenite::tungstenite::Message;
let guid: Guid = from_value(
context
.call_remote::<RpcContext>(
&parent_method.into_iter().chain(method).join("."),
json!({
"id": params.id,
"command": params.command,
"tty": (std::io::stdin().is_terminal()
&& std::io::stdout().is_terminal()
&& std::io::stderr().is_terminal())
|| params.force_tty,
"subcontainer": params.subcontainer,
"imageId": params.image_id,
"name": params.name,
}),
)
.await?,
)?;
let mut ws = context.ws_continuation(guid).await?;
let mut current_in = "stdin";
let mut current_out = "stdout".to_owned();
ws.send(Message::Text(current_in.into()))
.await
.with_kind(ErrorKind::Network)?;
let mut stdin = AsyncReadStream::new(tokio::io::stdin(), 4 * CAP_1_KiB).fuse();
let mut stdout = tokio::io::stdout();
let mut stderr = tokio::io::stderr();
loop {
futures::select_biased! {
// signal = tokio:: => {
// let exit = exit?;
// if current_out != "exit" {
// ws.send(Message::Text("exit".into()))
// .await
// .with_kind(ErrorKind::Network)?;
// current_out = "exit";
// }
// ws.send(Message::Binary(
// i32::to_be_bytes(exit.into_raw()).to_vec()
// )).await.with_kind(ErrorKind::Network)?;
// }
input = stdin.try_next() => {
if let Some(input) = input? {
if current_in != "stdin" {
ws.send(Message::Text("stdin".into()))
.await
.with_kind(ErrorKind::Network)?;
current_in = "stdin";
}
ws.send(Message::Binary(input))
.await
.with_kind(ErrorKind::Network)?;
}
}
msg = ws.try_next() => {
if let Some(msg) = msg.with_kind(ErrorKind::Network)? {
match msg {
Message::Text(out_ty) => {
current_out = out_ty;
}
Message::Binary(data) => {
match &*current_out {
"stdout" => {
stdout.write_all(&data).await?;
stdout.flush().await?;
}
"stderr" => {
stderr.write_all(&data).await?;
stderr.flush().await?;
}
"exit" => {
if data.len() != 4 {
return Err(Error::new(
eyre!("invalid byte length for exit code: {}", data.len()),
ErrorKind::InvalidRequest
));
}
let mut exit_buf = [0u8; 4];
exit_buf.clone_from_slice(&data);
let code = i32::from_be_bytes(exit_buf);
std::process::exit(code);
}
_ => (),
}
}
Message::Close(Some(close)) => {
if close.code != CloseCode::Normal {
return Err(Error::new(
color_eyre::eyre::Report::msg(close.reason),
ErrorKind::Network
));
}
}
_ => ()
}
} else {
return Ok(())
}
}
}
}
}

View File

@@ -4,9 +4,10 @@ use std::sync::{Arc, Weak};
use std::time::Duration;
use futures::future::ready;
use futures::{Future, FutureExt};
use futures::Future;
use helpers::NonDetachingJoinHandle;
use imbl::Vector;
use imbl_value::InternedString;
use models::{ImageId, ProcedureName, VolumeId};
use rpc_toolkit::{Empty, Server, ShutdownHandle};
use serde::de::DeserializeOwned;
@@ -36,7 +37,7 @@ use crate::service::{rpc, RunningStatus, Service};
use crate::util::io::create_file;
use crate::util::rpc_client::UnixRpcClient;
use crate::util::Invoke;
use crate::volume::{asset_dir, data_dir};
use crate::volume::data_dir;
use crate::ARCH;
const RPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
@@ -84,6 +85,14 @@ impl ServiceState {
}
}
/// Want to have a wrapper for uses like the inject where we are going to be finding the subcontainer and doing some filtering on it.
/// As well, the imageName is also used for things like env.
pub struct Subcontainer {
pub(super) name: InternedString,
pub(super) image_id: ImageId,
pub(super) overlay: OverlayGuard<Arc<MountGuard>>,
}
// @DRB On top of this we need to also have the procedures to have the effects and get the results back for them, maybe lock them to the running instance?
/// This contains the LXC container running the javascript init system
/// that can be used via a JSON RPC Client connected to a unix domain
@@ -98,7 +107,7 @@ pub struct PersistentContainer {
volumes: BTreeMap<VolumeId, MountGuard>,
assets: BTreeMap<VolumeId, MountGuard>,
pub(super) images: BTreeMap<ImageId, Arc<MountGuard>>,
pub(super) subcontainers: Arc<Mutex<BTreeMap<Guid, OverlayGuard<Arc<MountGuard>>>>>,
pub(super) subcontainers: Arc<Mutex<BTreeMap<Guid, Subcontainer>>>,
pub(super) state: Arc<watch::Sender<ServiceState>>,
pub(super) net_service: Mutex<NetService>,
destroyed: bool,
@@ -405,7 +414,7 @@ impl PersistentContainer {
errs.handle(assets.unmount(true).await);
}
for (_, overlay) in std::mem::take(&mut *subcontainers.lock().await) {
errs.handle(overlay.unmount(true).await);
errs.handle(overlay.overlay.unmount(true).await);
}
for (_, images) in images {
errs.handle(images.unmount().await);

View File

@@ -25,7 +25,7 @@ use crate::util::io::{create_file, TmpDir};
pub async fn upload(
ctx: &RpcContext,
session: InternedString,
session: Option<InternedString>,
) -> Result<(Guid, UploadingFile), Error> {
let guid = Guid::new();
let (mut handle, file) = UploadingFile::new().await?;

View File

@@ -1,6 +1,7 @@
use std::collections::VecDeque;
use std::future::Future;
use std::io::Cursor;
use std::mem::MaybeUninit;
use std::os::unix::prelude::MetadataExt;
use std::path::{Path, PathBuf};
use std::pin::Pin;
@@ -11,7 +12,7 @@ use std::time::Duration;
use bytes::{Buf, BytesMut};
use futures::future::{BoxFuture, Fuse};
use futures::{AsyncSeek, FutureExt, TryStreamExt};
use futures::{AsyncSeek, FutureExt, Stream, TryStreamExt};
use helpers::NonDetachingJoinHandle;
use nix::unistd::{Gid, Uid};
use tokio::fs::File;
@@ -23,6 +24,7 @@ use tokio::sync::{Notify, OwnedMutexGuard};
use tokio::time::{Instant, Sleep};
use crate::prelude::*;
use crate::{CAP_1_KiB, CAP_1_MiB};
pub trait AsyncReadSeek: AsyncRead + AsyncSeek {}
impl<T: AsyncRead + AsyncSeek> AsyncReadSeek for T {}
@@ -1267,3 +1269,33 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for MutexIO<W> {
Pin::new(&mut *self.get_mut().0).poll_shutdown(cx)
}
}
#[pin_project::pin_project]
pub struct AsyncReadStream<T> {
buffer: Vec<MaybeUninit<u8>>,
#[pin]
pub io: T,
}
impl<T> AsyncReadStream<T> {
pub fn new(io: T, buffer_size: usize) -> Self {
Self {
buffer: vec![MaybeUninit::uninit(); buffer_size],
io,
}
}
}
impl<T: AsyncRead> Stream for AsyncReadStream<T> {
type Item = Result<Vec<u8>, Error>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.project();
let mut buf = ReadBuf::uninit(this.buffer);
match futures::ready!(this.io.poll_read(cx, &mut buf)) {
Ok(()) if buf.filled().is_empty() => Poll::Ready(None),
Ok(()) => Poll::Ready(Some(Ok(buf.filled().to_vec()))),
Err(e) => Poll::Ready(Some(Err(e.into()))),
}
}
}