Feature/subcontainers (#2720)

* wip: subcontainers

* wip: subcontainer infra

* rename NonDestroyableOverlay to SubContainerHandle

* chore: Changes to the container and other things

* wip:

* wip: fixes

* fix launch & exec

Co-authored-by: Jade <Blu-J@users.noreply.github.com>

* tweak apis

* misc fixes

* don't treat sigterm as error

* handle health check set during starting

---------

Co-authored-by: J H <dragondef@gmail.com>
Co-authored-by: Jade <Blu-J@users.noreply.github.com>
This commit is contained in:
Aiden McClelland
2024-08-22 21:45:54 -06:00
committed by GitHub
parent 72898d897c
commit 4defec194f
37 changed files with 1212 additions and 566 deletions

View File

@@ -268,9 +268,10 @@ impl LxcContainer {
.invoke(ErrorKind::Docker)
.await?,
)?;
let out_str = output.trim();
if !out_str.is_empty() {
return Ok(out_str.parse()?);
for line in output.lines() {
if let Ok(ip) = line.trim().parse() {
return Ok(ip);
}
}
if start.elapsed() > CONTAINER_DHCP_TIMEOUT {
return Err(Error::new(

View File

@@ -32,8 +32,8 @@ pub async fn set_health(
.as_main_mut()
.mutate(|main| {
match main {
&mut MainStatus::Running { ref mut health, .. }
| &mut MainStatus::BackingUp { ref mut health, .. } => {
MainStatus::Running { ref mut health, .. }
| MainStatus::Starting { ref mut health } => {
health.insert(id, result);
}
_ => (),

View File

@@ -1,4 +1,4 @@
use rpc_toolkit::{from_fn, from_fn_async, Context, HandlerExt, ParentHandler};
use rpc_toolkit::{from_fn, from_fn_async, from_fn_blocking, Context, HandlerExt, ParentHandler};
use crate::echo;
use crate::prelude::*;
@@ -12,44 +12,44 @@ pub mod context;
mod control;
mod dependency;
mod health;
mod image;
mod net;
mod prelude;
mod store;
mod subcontainer;
mod system;
pub fn handler<C: Context>() -> ParentHandler<C> {
ParentHandler::new()
.subcommand("gitInfo", from_fn(|_: C| crate::version::git_info()))
.subcommand("git-info", from_fn(|_: C| crate::version::git_info()))
.subcommand(
"echo",
from_fn(echo::<EffectContext>).with_call_remote::<ContainerCliContext>(),
)
// action
.subcommand(
"executeAction",
"execute-action",
from_fn_async(action::execute_action).no_cli(),
)
.subcommand(
"exportAction",
"export-action",
from_fn_async(action::export_action).no_cli(),
)
.subcommand(
"clearActions",
"clear-actions",
from_fn_async(action::clear_actions).no_cli(),
)
// callbacks
.subcommand(
"clearCallbacks",
"clear-callbacks",
from_fn(callbacks::clear_callbacks).no_cli(),
)
// config
.subcommand(
"getConfigured",
"get-configured",
from_fn_async(config::get_configured).no_cli(),
)
.subcommand(
"setConfigured",
"set-configured",
from_fn_async(config::set_configured)
.no_display()
.with_call_remote::<ContainerCliContext>(),
@@ -68,110 +68,133 @@ pub fn handler<C: Context>() -> ParentHandler<C> {
.with_call_remote::<ContainerCliContext>(),
)
.subcommand(
"setMainStatus",
"set-main-status",
from_fn_async(control::set_main_status)
.no_display()
.with_call_remote::<ContainerCliContext>(),
)
// dependency
.subcommand(
"setDependencies",
"set-dependencies",
from_fn_async(dependency::set_dependencies)
.no_display()
.with_call_remote::<ContainerCliContext>(),
)
.subcommand(
"getDependencies",
"get-dependencies",
from_fn_async(dependency::get_dependencies)
.no_display()
.with_call_remote::<ContainerCliContext>(),
)
.subcommand(
"checkDependencies",
"check-dependencies",
from_fn_async(dependency::check_dependencies)
.no_display()
.with_call_remote::<ContainerCliContext>(),
)
.subcommand("mount", from_fn_async(dependency::mount).no_cli())
.subcommand(
"getInstalledPackages",
"get-installed-packages",
from_fn_async(dependency::get_installed_packages).no_cli(),
)
.subcommand(
"exposeForDependents",
"expose-for-dependents",
from_fn_async(dependency::expose_for_dependents).no_cli(),
)
// health
.subcommand("setHealth", from_fn_async(health::set_health).no_cli())
// image
.subcommand("set-health", from_fn_async(health::set_health).no_cli())
// subcontainer
.subcommand(
"chroot",
from_fn(image::chroot::<ContainerCliContext>).no_display(),
)
.subcommand(
"createOverlayedImage",
from_fn_async(image::create_overlayed_image)
.with_custom_display_fn(|_, (path, _)| Ok(println!("{}", path.display())))
.with_call_remote::<ContainerCliContext>(),
)
.subcommand(
"destroyOverlayedImage",
from_fn_async(image::destroy_overlayed_image).no_cli(),
"subcontainer",
ParentHandler::<C>::new()
.subcommand(
"launch",
from_fn_blocking(subcontainer::launch::<ContainerCliContext>).no_display(),
)
.subcommand(
"launch-init",
from_fn_blocking(subcontainer::launch_init::<ContainerCliContext>).no_display(),
)
.subcommand(
"exec",
from_fn_blocking(subcontainer::exec::<ContainerCliContext>).no_display(),
)
.subcommand(
"exec-command",
from_fn_blocking(subcontainer::exec_command::<ContainerCliContext>)
.no_display(),
)
.subcommand(
"create-fs",
from_fn_async(subcontainer::create_subcontainer_fs)
.with_custom_display_fn(|_, (path, _)| Ok(println!("{}", path.display())))
.with_call_remote::<ContainerCliContext>(),
)
.subcommand(
"destroy-fs",
from_fn_async(subcontainer::destroy_subcontainer_fs)
.no_display()
.with_call_remote::<ContainerCliContext>(),
),
)
// net
.subcommand("bind", from_fn_async(net::bind::bind).no_cli())
.subcommand(
"getServicePortForward",
"get-service-port-forward",
from_fn_async(net::bind::get_service_port_forward).no_cli(),
)
.subcommand(
"clearBindings",
"clear-bindings",
from_fn_async(net::bind::clear_bindings).no_cli(),
)
.subcommand(
"getHostInfo",
"get-host-info",
from_fn_async(net::host::get_host_info).no_cli(),
)
.subcommand(
"getPrimaryUrl",
"get-primary-url",
from_fn_async(net::host::get_primary_url).no_cli(),
)
.subcommand(
"getContainerIp",
"get-container-ip",
from_fn_async(net::info::get_container_ip).no_cli(),
)
.subcommand(
"exportServiceInterface",
"export-service-interface",
from_fn_async(net::interface::export_service_interface).no_cli(),
)
.subcommand(
"getServiceInterface",
"get-service-interface",
from_fn_async(net::interface::get_service_interface).no_cli(),
)
.subcommand(
"listServiceInterfaces",
"list-service-interfaces",
from_fn_async(net::interface::list_service_interfaces).no_cli(),
)
.subcommand(
"clearServiceInterfaces",
"clear-service-interfaces",
from_fn_async(net::interface::clear_service_interfaces).no_cli(),
)
.subcommand(
"getSslCertificate",
"get-ssl-certificate",
from_fn_async(net::ssl::get_ssl_certificate).no_cli(),
)
.subcommand("getSslKey", from_fn_async(net::ssl::get_ssl_key).no_cli())
.subcommand("get-ssl-key", from_fn_async(net::ssl::get_ssl_key).no_cli())
// store
.subcommand("getStore", from_fn_async(store::get_store).no_cli())
.subcommand("setStore", from_fn_async(store::set_store).no_cli())
.subcommand(
"setDataVersion",
"store",
ParentHandler::<C>::new()
.subcommand("get", from_fn_async(store::get_store).no_cli())
.subcommand("set", from_fn_async(store::set_store).no_cli()),
)
.subcommand(
"set-data-version",
from_fn_async(store::set_data_version)
.no_display()
.with_call_remote::<ContainerCliContext>(),
)
.subcommand(
"getDataVersion",
"get-data-version",
from_fn_async(store::get_data_version)
.with_custom_display_fn(|_, v| {
if let Some(v) = v {
@@ -185,7 +208,7 @@ pub fn handler<C: Context>() -> ParentHandler<C> {
)
// system
.subcommand(
"getSystemSmtp",
"get-system-smtp",
from_fn_async(system::get_system_smtp).no_cli(),
)

View File

@@ -1,9 +1,6 @@
use std::ffi::OsString;
use std::os::unix::process::CommandExt;
use std::path::{Path, PathBuf};
use models::ImageId;
use rpc_toolkit::Context;
use tokio::process::Command;
use crate::disk::mount::filesystem::overlayfs::OverlayGuard;
@@ -11,89 +8,33 @@ use crate::rpc_continuations::Guid;
use crate::service::effects::prelude::*;
use crate::util::Invoke;
#[derive(Debug, Clone, Serialize, Deserialize, Parser)]
pub struct ChrootParams {
#[arg(short = 'e', long = "env")]
env: Option<PathBuf>,
#[arg(short = 'w', long = "workdir")]
workdir: Option<PathBuf>,
#[arg(short = 'u', long = "user")]
user: Option<String>,
path: PathBuf,
command: OsString,
args: Vec<OsString>,
}
pub fn chroot<C: Context>(
_: C,
ChrootParams {
env,
workdir,
user,
path,
command,
args,
}: ChrootParams,
) -> Result<(), Error> {
let mut cmd: std::process::Command = std::process::Command::new(command);
if let Some(env) = env {
for (k, v) in std::fs::read_to_string(env)?
.lines()
.map(|l| l.trim())
.filter_map(|l| l.split_once("="))
{
cmd.env(k, v);
}
}
nix::unistd::setsid().ok(); // https://stackoverflow.com/questions/25701333/os-setsid-operation-not-permitted
std::os::unix::fs::chroot(path)?;
if let Some(uid) = user.as_deref().and_then(|u| u.parse::<u32>().ok()) {
cmd.uid(uid);
} else if let Some(user) = user {
let (uid, gid) = std::fs::read_to_string("/etc/passwd")?
.lines()
.find_map(|l| {
let mut split = l.trim().split(":");
if user != split.next()? {
return None;
}
split.next(); // throw away x
Some((split.next()?.parse().ok()?, split.next()?.parse().ok()?))
// uid gid
})
.or_not_found(lazy_format!("{user} in /etc/passwd"))?;
cmd.uid(uid);
cmd.gid(gid);
};
if let Some(workdir) = workdir {
cmd.current_dir(workdir);
}
cmd.args(args);
Err(cmd.exec().into())
}
mod sync;
pub use sync::*;
#[derive(Debug, Deserialize, Serialize, Parser, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct DestroyOverlayedImageParams {
pub struct DestroySubcontainerFsParams {
guid: Guid,
}
#[instrument(skip_all)]
pub async fn destroy_overlayed_image(
pub async fn destroy_subcontainer_fs(
context: EffectContext,
DestroyOverlayedImageParams { guid }: DestroyOverlayedImageParams,
DestroySubcontainerFsParams { guid }: DestroySubcontainerFsParams,
) -> Result<(), Error> {
let context = context.deref()?;
if let Some(overlay) = context
.seed
.persistent_container
.overlays
.subcontainers
.lock()
.await
.remove(&guid)
{
overlay.unmount(true).await?;
} else {
tracing::warn!("Could not find a guard to remove on the destroy overlayed image; assumming that it already is removed and will be skipping");
tracing::warn!("Could not find a subcontainer fs to destroy; assumming that it already is destroyed and will be skipping");
}
Ok(())
}
@@ -101,13 +42,13 @@ pub async fn destroy_overlayed_image(
#[derive(Debug, Deserialize, Serialize, Parser, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct CreateOverlayedImageParams {
pub struct CreateSubcontainerFsParams {
image_id: ImageId,
}
#[instrument(skip_all)]
pub async fn create_overlayed_image(
pub async fn create_subcontainer_fs(
context: EffectContext,
CreateOverlayedImageParams { image_id }: CreateOverlayedImageParams,
CreateSubcontainerFsParams { image_id }: CreateSubcontainerFsParams,
) -> Result<(PathBuf, Guid), Error> {
let context = context.deref()?;
if let Some(image) = context
@@ -131,7 +72,7 @@ pub async fn create_overlayed_image(
})?
.rootfs_dir();
let mountpoint = rootfs_dir
.join("media/startos/overlays")
.join("media/startos/subcontainers")
.join(guid.as_ref());
tokio::fs::create_dir_all(&mountpoint).await?;
let container_mountpoint = Path::new("/").join(
@@ -150,7 +91,7 @@ pub async fn create_overlayed_image(
context
.seed
.persistent_container
.overlays
.subcontainers
.lock()
.await
.insert(guid.clone(), guard);

View File

@@ -0,0 +1,389 @@
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::ffi::{c_int, OsStr, OsString};
use std::fs::File;
use std::os::unix::process::CommandExt;
use std::path::{Path, PathBuf};
use std::process::{Command as StdCommand, Stdio};
use nix::sched::CloneFlags;
use nix::unistd::Pid;
use rpc_toolkit::Context;
use signal_hook::consts::signal::*;
use tokio::sync::oneshot;
use unshare::Command as NSCommand;
use crate::service::effects::prelude::*;
const FWD_SIGNALS: &[c_int] = &[
SIGABRT, SIGALRM, SIGCONT, SIGHUP, SIGINT, SIGIO, SIGPIPE, SIGPROF, SIGQUIT, SIGTERM, SIGTRAP,
SIGTSTP, SIGTTIN, SIGTTOU, SIGURG, SIGUSR1, SIGUSR2, SIGVTALRM,
];
struct NSPid(Vec<i32>);
impl procfs::FromBufRead for NSPid {
fn from_buf_read<R: std::io::BufRead>(r: R) -> procfs::ProcResult<Self> {
for line in r.lines() {
let line = line?;
if let Some(row) = line.trim().strip_prefix("NSpid") {
return Ok(Self(
row.split_ascii_whitespace()
.map(|pid| pid.parse::<i32>())
.collect::<Result<Vec<_>, _>>()?,
));
}
}
Err(procfs::ProcError::Incomplete(None))
}
}
fn open_file_read(path: impl AsRef<Path>) -> Result<File, Error> {
File::open(&path).with_ctx(|_| {
(
ErrorKind::Filesystem,
lazy_format!("open r {}", path.as_ref().display()),
)
})
}
#[derive(Debug, Clone, Serialize, Deserialize, Parser)]
pub struct ExecParams {
#[arg(short = 'e', long = "env")]
env: Option<PathBuf>,
#[arg(short = 'w', long = "workdir")]
workdir: Option<PathBuf>,
#[arg(short = 'u', long = "user")]
user: Option<String>,
chroot: PathBuf,
#[arg(trailing_var_arg = true)]
command: Vec<OsString>,
}
impl ExecParams {
fn exec(&self) -> Result<(), Error> {
let ExecParams {
env,
workdir,
user,
chroot,
command,
} = self;
let Some(([command], args)) = command.split_at_checked(1) else {
return Err(Error::new(
eyre!("command cannot be empty"),
ErrorKind::InvalidRequest,
));
};
let env_string = if let Some(env) = &env {
std::fs::read_to_string(env)
.with_ctx(|_| (ErrorKind::Filesystem, lazy_format!("read {env:?}")))?
} else {
Default::default()
};
let env = env_string
.lines()
.map(|l| l.trim())
.filter_map(|l| l.split_once("="))
.collect::<BTreeMap<_, _>>();
std::os::unix::fs::chroot(chroot)
.with_ctx(|_| (ErrorKind::Filesystem, lazy_format!("chroot {chroot:?}")))?;
let command = which::which_in(
command,
env.get("PATH")
.copied()
.map(Cow::Borrowed)
.or_else(|| std::env::var("PATH").ok().map(Cow::Owned))
.as_deref(),
workdir.as_deref().unwrap_or(Path::new("/")),
)
.with_kind(ErrorKind::Filesystem)?;
let mut cmd = StdCommand::new(command);
cmd.args(args);
for (k, v) in env {
cmd.env(k, v);
}
if let Some(uid) = user.as_deref().and_then(|u| u.parse::<u32>().ok()) {
cmd.uid(uid);
} else if let Some(user) = user {
let (uid, gid) = std::fs::read_to_string("/etc/passwd")
.with_ctx(|_| (ErrorKind::Filesystem, "read /etc/passwd"))?
.lines()
.find_map(|l| {
let mut split = l.trim().split(":");
if user != split.next()? {
return None;
}
split.next(); // throw away x
Some((split.next()?.parse().ok()?, split.next()?.parse().ok()?))
// uid gid
})
.or_not_found(lazy_format!("{user} in /etc/passwd"))?;
cmd.uid(uid);
cmd.gid(gid);
};
if let Some(workdir) = workdir {
cmd.current_dir(workdir);
} else {
cmd.current_dir("/");
}
Err(cmd.exec().into())
}
}
pub fn launch<C: Context>(
_: C,
ExecParams {
env,
workdir,
user,
chroot,
command,
}: ExecParams,
) -> Result<(), Error> {
use unshare::{Namespace, Stdio};
let mut sig = signal_hook::iterator::Signals::new(FWD_SIGNALS)?;
let mut cmd = NSCommand::new("/usr/bin/start-cli");
cmd.arg("subcontainer").arg("launch-init");
if let Some(env) = env {
cmd.arg("--env").arg(env);
}
if let Some(workdir) = workdir {
cmd.arg("--workdir").arg(workdir);
}
if let Some(user) = user {
cmd.arg("--user").arg(user);
}
cmd.arg(&chroot);
cmd.args(&command);
cmd.unshare(&[Namespace::Pid, Namespace::Cgroup, Namespace::Ipc]);
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let (stdin_send, stdin_recv) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(mut stdin) = stdin_recv.blocking_recv() {
std::io::copy(&mut std::io::stdin(), &mut stdin).unwrap();
}
});
let (stdout_send, stdout_recv) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(mut stdout) = stdout_recv.blocking_recv() {
std::io::copy(&mut stdout, &mut std::io::stdout()).unwrap();
}
});
let (stderr_send, stderr_recv) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(mut stderr) = stderr_recv.blocking_recv() {
std::io::copy(&mut stderr, &mut std::io::stderr()).unwrap();
}
});
if chroot.join("proc/1").exists() {
let ns_id = procfs::process::Process::new_with_root(chroot.join("proc"))
.with_ctx(|_| (ErrorKind::Filesystem, "open subcontainer procfs"))?
.namespaces()
.with_ctx(|_| (ErrorKind::Filesystem, "read subcontainer pid 1 ns"))?
.0
.get(OsStr::new("pid"))
.or_not_found("pid namespace")?
.identifier;
for proc in
procfs::process::all_processes().with_ctx(|_| (ErrorKind::Filesystem, "open procfs"))?
{
let proc = proc.with_ctx(|_| (ErrorKind::Filesystem, "read single process details"))?;
let pid = proc.pid();
if proc
.namespaces()
.with_ctx(|_| (ErrorKind::Filesystem, lazy_format!("read pid {} ns", pid)))?
.0
.get(OsStr::new("pid"))
.map_or(false, |ns| ns.identifier == ns_id)
{
let pids = proc.read::<NSPid>("status").with_ctx(|_| {
(
ErrorKind::Filesystem,
lazy_format!("read pid {} NSpid", pid),
)
})?;
if pids.0.len() == 2 && pids.0[1] == 1 {
nix::sys::signal::kill(Pid::from_raw(pid), nix::sys::signal::SIGKILL)
.with_ctx(|_| {
(
ErrorKind::Filesystem,
lazy_format!(
"kill pid {} (determined to be pid 1 in subcontainer)",
pid
),
)
})?;
}
}
}
nix::mount::umount(&chroot.join("proc"))
.with_ctx(|_| (ErrorKind::Filesystem, "unmounting subcontainer procfs"))?;
}
let mut child = cmd
.spawn()
.map_err(color_eyre::eyre::Report::msg)
.with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?;
let pid = child.pid();
std::thread::spawn(move || {
for sig in sig.forever() {
nix::sys::signal::kill(
Pid::from_raw(pid),
Some(nix::sys::signal::Signal::try_from(sig).unwrap()),
)
.unwrap();
}
});
stdin_send
.send(child.stdin.take().unwrap())
.unwrap_or_default();
stdout_send
.send(child.stdout.take().unwrap())
.unwrap_or_default();
stderr_send
.send(child.stderr.take().unwrap())
.unwrap_or_default();
// TODO: subreaping, signal handling
let exit = child
.wait()
.with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?;
if let Some(code) = exit.code() {
std::process::exit(code);
} else {
if exit.success() {
Ok(())
} else {
Err(Error::new(
color_eyre::eyre::Report::msg(exit),
ErrorKind::Unknown,
))
}
}
}
pub fn launch_init<C: Context>(_: C, params: ExecParams) -> Result<(), Error> {
nix::mount::mount(
Some("proc"),
&params.chroot.join("proc"),
Some("proc"),
nix::mount::MsFlags::empty(),
None::<&str>,
)
.with_ctx(|_| (ErrorKind::Filesystem, "mount procfs"))?;
if params.command.is_empty() {
signal_hook::iterator::Signals::new(signal_hook::consts::TERM_SIGNALS)?
.forever()
.next();
std::process::exit(0)
} else {
params.exec()
}
}
pub fn exec<C: Context>(
_: C,
ExecParams {
env,
workdir,
user,
chroot,
command,
}: ExecParams,
) -> Result<(), Error> {
let mut sig = signal_hook::iterator::Signals::new(FWD_SIGNALS)?;
let (send_pid, recv_pid) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(pid) = recv_pid.blocking_recv() {
for sig in sig.forever() {
nix::sys::signal::kill(
Pid::from_raw(pid),
Some(nix::sys::signal::Signal::try_from(sig).unwrap()),
)
.unwrap();
}
}
});
let mut cmd = StdCommand::new("/usr/bin/start-cli");
cmd.arg("subcontainer").arg("exec-command");
if let Some(env) = env {
cmd.arg("--env").arg(env);
}
if let Some(workdir) = workdir {
cmd.arg("--workdir").arg(workdir);
}
if let Some(user) = user {
cmd.arg("--user").arg(user);
}
cmd.arg(&chroot);
cmd.args(&command);
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let (stdin_send, stdin_recv) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(mut stdin) = stdin_recv.blocking_recv() {
std::io::copy(&mut std::io::stdin(), &mut stdin).unwrap();
}
});
let (stdout_send, stdout_recv) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(mut stdout) = stdout_recv.blocking_recv() {
std::io::copy(&mut stdout, &mut std::io::stdout()).unwrap();
}
});
let (stderr_send, stderr_recv) = oneshot::channel();
std::thread::spawn(move || {
if let Ok(mut stderr) = stderr_recv.blocking_recv() {
std::io::copy(&mut stderr, &mut std::io::stderr()).unwrap();
}
});
nix::sched::setns(
open_file_read(chroot.join("proc/1/ns/pid"))?,
CloneFlags::CLONE_NEWPID,
)
.with_ctx(|_| (ErrorKind::Filesystem, "set pid ns"))?;
nix::sched::setns(
open_file_read(chroot.join("proc/1/ns/cgroup"))?,
CloneFlags::CLONE_NEWCGROUP,
)
.with_ctx(|_| (ErrorKind::Filesystem, "set cgroup ns"))?;
nix::sched::setns(
open_file_read(chroot.join("proc/1/ns/ipc"))?,
CloneFlags::CLONE_NEWIPC,
)
.with_ctx(|_| (ErrorKind::Filesystem, "set ipc ns"))?;
let mut child = cmd
.spawn()
.map_err(color_eyre::eyre::Report::msg)
.with_ctx(|_| (ErrorKind::Filesystem, "spawning child process"))?;
send_pid.send(child.id() as i32).unwrap_or_default();
stdin_send
.send(child.stdin.take().unwrap())
.unwrap_or_default();
stdout_send
.send(child.stdout.take().unwrap())
.unwrap_or_default();
stderr_send
.send(child.stderr.take().unwrap())
.unwrap_or_default();
let exit = child
.wait()
.with_ctx(|_| (ErrorKind::Filesystem, "waiting on child process"))?;
if let Some(code) = exit.code() {
std::process::exit(code);
} else {
if exit.success() {
Ok(())
} else {
Err(Error::new(
color_eyre::eyre::Report::msg(exit),
ErrorKind::Unknown,
))
}
}
}
pub fn exec_command<C: Context>(_: C, params: ExecParams) -> Result<(), Error> {
params.exec()
}

View File

@@ -45,7 +45,7 @@ mod properties;
mod rpc;
mod service_actor;
pub mod service_map;
mod start_stop;
pub mod start_stop;
mod transition;
mod util;
@@ -493,7 +493,6 @@ impl Service {
#[derive(Debug, Clone)]
pub struct RunningStatus {
health: OrdMap<HealthCheckId, NamedHealthCheckResult>,
started: DateTime<Utc>,
}
@@ -516,7 +515,6 @@ impl ServiceActorSeed {
.running_status
.take()
.unwrap_or_else(|| RunningStatus {
health: Default::default(),
started: Utc::now(),
}),
);

View File

@@ -98,7 +98,7 @@ pub struct PersistentContainer {
volumes: BTreeMap<VolumeId, MountGuard>,
assets: BTreeMap<VolumeId, MountGuard>,
pub(super) images: BTreeMap<ImageId, Arc<MountGuard>>,
pub(super) overlays: Arc<Mutex<BTreeMap<Guid, OverlayGuard<Arc<MountGuard>>>>>,
pub(super) subcontainers: Arc<Mutex<BTreeMap<Guid, OverlayGuard<Arc<MountGuard>>>>>,
pub(super) state: Arc<watch::Sender<ServiceState>>,
pub(super) net_service: Mutex<NetService>,
destroyed: bool,
@@ -273,7 +273,7 @@ impl PersistentContainer {
volumes,
assets,
images,
overlays: Arc::new(Mutex::new(BTreeMap::new())),
subcontainers: Arc::new(Mutex::new(BTreeMap::new())),
state: Arc::new(watch::channel(ServiceState::new(start)).0),
net_service: Mutex::new(net_service),
destroyed: false,
@@ -388,7 +388,7 @@ impl PersistentContainer {
let volumes = std::mem::take(&mut self.volumes);
let assets = std::mem::take(&mut self.assets);
let images = std::mem::take(&mut self.images);
let overlays = self.overlays.clone();
let subcontainers = self.subcontainers.clone();
let lxc_container = self.lxc_container.take();
self.destroyed = true;
Some(async move {
@@ -404,7 +404,7 @@ impl PersistentContainer {
for (_, assets) in assets {
errs.handle(assets.unmount(true).await);
}
for (_, overlay) in std::mem::take(&mut *overlays.lock().await) {
for (_, overlay) in std::mem::take(&mut *subcontainers.lock().await) {
errs.handle(overlay.unmount(true).await);
}
for (_, images) in images {

View File

@@ -1,11 +1,10 @@
use std::sync::Arc;
use std::time::Duration;
use imbl::OrdMap;
use super::start_stop::StartStop;
use super::ServiceActorSeed;
use crate::prelude::*;
use crate::service::persistent_container::ServiceStateKinds;
use crate::service::transition::TransitionKind;
use crate::service::SYNC_RETRY_COOLDOWN_SECONDS;
use crate::status::MainStatus;
@@ -46,96 +45,77 @@ async fn service_actor_loop(
let id = &seed.id;
let kinds = current.borrow().kinds();
if let Err(e) = async {
let main_status = match (
kinds.transition_state,
kinds.desired_state,
kinds.running_status,
) {
(Some(TransitionKind::Restarting), StartStop::Stop, Some(_)) => {
seed.persistent_container.stop().await?;
MainStatus::Restarting
}
(Some(TransitionKind::Restarting), StartStop::Start, _) => {
seed.persistent_container.start().await?;
MainStatus::Restarting
}
(Some(TransitionKind::Restarting), _, _) => MainStatus::Restarting,
(Some(TransitionKind::Restoring), _, _) => MainStatus::Restoring,
(Some(TransitionKind::BackingUp), StartStop::Stop, Some(status)) => {
seed.persistent_container.stop().await?;
MainStatus::BackingUp {
started: Some(status.started),
health: status.health.clone(),
}
}
(Some(TransitionKind::BackingUp), StartStop::Start, _) => {
seed.persistent_container.start().await?;
MainStatus::BackingUp {
started: None,
health: OrdMap::new(),
}
}
(Some(TransitionKind::BackingUp), _, _) => MainStatus::BackingUp {
started: None,
health: OrdMap::new(),
},
(None, StartStop::Stop, None) => MainStatus::Stopped,
(None, StartStop::Stop, Some(_)) => {
let task_seed = seed.clone();
seed.ctx
.db
.mutate(|d| {
if let Some(i) = d.as_public_mut().as_package_data_mut().as_idx_mut(&id) {
i.as_status_mut().as_main_mut().ser(&MainStatus::Stopping)?;
}
Ok(())
})
.await?;
task_seed.persistent_container.stop().await?;
MainStatus::Stopped
}
(None, StartStop::Start, Some(status)) => MainStatus::Running {
started: status.started,
health: status.health.clone(),
},
(None, StartStop::Start, None) => {
seed.persistent_container.start().await?;
MainStatus::Starting
}
};
seed.ctx
.db
.mutate(|d| {
if let Some(i) = d.as_public_mut().as_package_data_mut().as_idx_mut(&id) {
let previous = i.as_status().as_main().de()?;
let previous_health = previous.health();
let previous_started = previous.started();
let mut main_status = main_status;
match &mut main_status {
&mut MainStatus::Running { ref mut health, .. }
| &mut MainStatus::BackingUp { ref mut health, .. } => {
*health = previous_health.unwrap_or(health).clone();
}
_ => (),
};
match &mut main_status {
MainStatus::Running {
ref mut started, ..
} => {
*started = previous_started.unwrap_or(*started);
}
MainStatus::BackingUp {
ref mut started, ..
} => {
*started = previous_started.map(Some).unwrap_or(*started);
}
_ => (),
let main_status = match &kinds {
ServiceStateKinds {
transition_state: Some(TransitionKind::Restarting),
..
} => MainStatus::Restarting,
ServiceStateKinds {
transition_state: Some(TransitionKind::Restoring),
..
} => MainStatus::Restoring,
ServiceStateKinds {
transition_state: Some(TransitionKind::BackingUp),
..
} => previous.backing_up(),
ServiceStateKinds {
running_status: Some(status),
desired_state: StartStop::Start,
..
} => MainStatus::Running {
started: status.started,
health: previous.health().cloned().unwrap_or_default(),
},
ServiceStateKinds {
running_status: None,
desired_state: StartStop::Start,
..
} => MainStatus::Starting {
health: previous.health().cloned().unwrap_or_default(),
},
ServiceStateKinds {
running_status: Some(_),
desired_state: StartStop::Stop,
..
} => MainStatus::Stopping,
ServiceStateKinds {
running_status: None,
desired_state: StartStop::Stop,
..
} => MainStatus::Stopped,
};
i.as_status_mut().as_main_mut().ser(&main_status)?;
}
Ok(())
})
.await?;
seed.synchronized.notify_waiters();
match kinds {
ServiceStateKinds {
running_status: None,
desired_state: StartStop::Start,
..
} => {
seed.persistent_container.start().await?;
}
ServiceStateKinds {
running_status: Some(_),
desired_state: StartStop::Stop,
..
} => {
seed.persistent_container.stop().await?;
seed.persistent_container
.state
.send_if_modified(|s| s.running_status.take().is_some());
}
_ => (),
};
Ok::<_, Error>(())
}

View File

@@ -1,6 +1,10 @@
use serde::{Deserialize, Serialize};
use ts_rs::TS;
use crate::status::MainStatus;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize, TS)]
#[serde(rename_all = "camelCase")]
pub enum StartStop {
Start,
Stop,
@@ -11,23 +15,19 @@ impl StartStop {
matches!(self, StartStop::Start)
}
}
impl From<MainStatus> for StartStop {
fn from(value: MainStatus) -> Self {
match value {
MainStatus::Stopped => StartStop::Stop,
MainStatus::Restoring => StartStop::Stop,
MainStatus::Restarting => StartStop::Start,
MainStatus::Stopping { .. } => StartStop::Stop,
MainStatus::Starting => StartStop::Start,
MainStatus::Running {
started: _,
health: _,
} => StartStop::Start,
MainStatus::BackingUp { started, health: _ } if started.is_some() => StartStop::Start,
MainStatus::BackingUp {
started: _,
health: _,
} => StartStop::Stop,
}
}
}
// impl From<MainStatus> for StartStop {
// fn from(value: MainStatus) -> Self {
// match value {
// MainStatus::Stopped => StartStop::Stop,
// MainStatus::Restoring => StartStop::Stop,
// MainStatus::Restarting => StartStop::Start,
// MainStatus::Stopping { .. } => StartStop::Stop,
// MainStatus::Starting => StartStop::Start,
// MainStatus::Running {
// started: _,
// health: _,
// } => StartStop::Start,
// MainStatus::BackingUp { on_complete } => on_complete,
// }
// }
// }

View File

@@ -1,5 +1,4 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use imbl::OrdMap;
@@ -8,8 +7,8 @@ use ts_rs::TS;
use self::health_check::HealthCheckId;
use crate::prelude::*;
use crate::service::start_stop::StartStop;
use crate::status::health_check::NamedHealthCheckResult;
use crate::util::GeneralGuard;
pub mod health_check;
#[derive(Clone, Debug, Deserialize, Serialize, HasModel, TS)]
@@ -24,25 +23,24 @@ pub struct Status {
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, TS)]
#[serde(tag = "status")]
#[serde(rename_all = "camelCase")]
#[serde(rename_all_fields = "camelCase")]
pub enum MainStatus {
Stopped,
Restarting,
Restoring,
Stopping,
Starting,
#[serde(rename_all = "camelCase")]
Starting {
#[ts(as = "BTreeMap<HealthCheckId, NamedHealthCheckResult>")]
health: OrdMap<HealthCheckId, NamedHealthCheckResult>,
},
Running {
#[ts(type = "string")]
started: DateTime<Utc>,
#[ts(as = "BTreeMap<HealthCheckId, NamedHealthCheckResult>")]
health: OrdMap<HealthCheckId, NamedHealthCheckResult>,
},
#[serde(rename_all = "camelCase")]
BackingUp {
#[ts(type = "string | null")]
started: Option<DateTime<Utc>>,
#[ts(as = "BTreeMap<HealthCheckId, NamedHealthCheckResult>")]
health: OrdMap<HealthCheckId, NamedHealthCheckResult>,
on_complete: StartStop,
},
}
impl MainStatus {
@@ -50,60 +48,37 @@ impl MainStatus {
match self {
MainStatus::Starting { .. }
| MainStatus::Running { .. }
| MainStatus::Restarting
| MainStatus::BackingUp {
started: Some(_), ..
on_complete: StartStop::Start,
} => true,
MainStatus::Stopped
| MainStatus::Restoring
| MainStatus::Stopping { .. }
| MainStatus::Restarting
| MainStatus::BackingUp { started: None, .. } => false,
| MainStatus::BackingUp {
on_complete: StartStop::Stop,
} => false,
}
}
// pub fn stop(&mut self) {
// match self {
// MainStatus::Starting { .. } | MainStatus::Running { .. } => {
// *self = MainStatus::Stopping;
// }
// MainStatus::BackingUp { started, .. } => {
// *started = None;
// }
// MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restarting => (),
// }
// }
pub fn started(&self) -> Option<DateTime<Utc>> {
match self {
MainStatus::Running { started, .. } => Some(*started),
MainStatus::BackingUp { started, .. } => *started,
MainStatus::Stopped => None,
MainStatus::Restoring => None,
MainStatus::Restarting => None,
MainStatus::Stopping { .. } => None,
MainStatus::Starting { .. } => None,
pub fn backing_up(self) -> Self {
MainStatus::BackingUp {
on_complete: if self.running() {
StartStop::Start
} else {
StartStop::Stop
},
}
}
pub fn backing_up(&self) -> Self {
let (started, health) = match self {
MainStatus::Starting { .. } => (Some(Utc::now()), Default::default()),
MainStatus::Running { started, health } => (Some(started.clone()), health.clone()),
MainStatus::Stopped
| MainStatus::Stopping { .. }
| MainStatus::Restoring
| MainStatus::Restarting => (None, Default::default()),
MainStatus::BackingUp { .. } => return self.clone(),
};
MainStatus::BackingUp { started, health }
}
pub fn health(&self) -> Option<&OrdMap<HealthCheckId, NamedHealthCheckResult>> {
match self {
MainStatus::Running { health, .. } => Some(health),
MainStatus::BackingUp { health, .. } => Some(health),
MainStatus::Stopped
MainStatus::Running { health, .. } | MainStatus::Starting { health } => Some(health),
MainStatus::BackingUp { .. }
| MainStatus::Stopped
| MainStatus::Restoring
| MainStatus::Stopping { .. }
| MainStatus::Restarting => None,
MainStatus::Starting { .. } => None,
}
}
}