wip: Making Injectable exec (#1897)

* wip: Making Injectable exec

* chore: Cleanup dup code.

* chore: Fixes for consistancy.

* Fix: BUild and the join handle in the join_main
This commit is contained in:
J M
2022-11-02 13:20:22 -06:00
committed by Aiden McClelland
parent 21cf4cd2ce
commit 55b1c021ec
4 changed files with 32 additions and 18 deletions

View File

@@ -16,10 +16,10 @@ use nix::sys::signal::Signal;
use num_enum::TryFromPrimitive; use num_enum::TryFromPrimitive;
use patch_db::DbHandle; use patch_db::DbHandle;
use sqlx::{Executor, Postgres}; use sqlx::{Executor, Postgres};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::watch::error::RecvError; use tokio::sync::watch::error::RecvError;
use tokio::sync::watch::{channel, Receiver, Sender}; use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::sync::{Mutex, Notify, RwLock}; use tokio::sync::{Mutex, Notify, RwLock};
use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use torut::onion::TorSecretKeyV3; use torut::onion::TorSecretKeyV3;
use tracing::instrument; use tracing::instrument;
@@ -196,15 +196,10 @@ async fn run_main(
persistant.wait_for_persistant().await; persistant.wait_for_persistant().await;
let is_injectable_main = check_is_injectable_main(state); let is_injectable_main = check_is_injectable_main(state);
let mut runtime = match injectable_main(state) { let mut runtime = NonDetachingJoinHandle::from(tokio::spawn(start_up_image(
InjectableMain::None => { rt_state,
tokio::spawn(async move { start_up_image(rt_state, generated_certificate).await }) generated_certificate,
} )));
#[cfg(feature = "js_engine")]
InjectableMain::Script(_) => {
tokio::spawn(async move { start_up_image(rt_state, generated_certificate).await })
}
};
let ip = match is_injectable_main { let ip = match is_injectable_main {
false => Some(match get_running_ip(state, &mut runtime).await { false => Some(match get_running_ip(state, &mut runtime).await {
GetRunninIp::Ip(x) => x, GetRunninIp::Ip(x) => x,
@@ -276,16 +271,16 @@ impl Manager {
let thread_shared = shared.clone(); let thread_shared = shared.clone();
let persistant_container = PersistantContainer::new(&thread_shared); let persistant_container = PersistantContainer::new(&thread_shared);
let managers_persistant = persistant_container.clone(); let managers_persistant = persistant_container.clone();
let thread = tokio::spawn(async move { let thread = NonDetachingJoinHandle::from(tokio::spawn(async move {
tokio::select! { tokio::select! {
_ = manager_thread_loop(recv, &thread_shared, managers_persistant.clone()) => (), _ = manager_thread_loop(recv, &thread_shared, managers_persistant.clone()) => (),
_ = synchronizer(&*thread_shared, managers_persistant) => (), _ = synchronizer(&*thread_shared, managers_persistant) => (),
} }
}); }));
Ok(Manager { Ok(Manager {
shared, shared,
persistant_container, persistant_container,
thread: Container::new(Some(thread.into())), thread: Container::new(Some(thread)),
}) })
} }
@@ -736,7 +731,7 @@ impl PersistantContainer {
} }
} }
None => { None => {
return Err("Couldn't get a command inserter in current service".to_string()) return Err("Expecting containers.main in the package manifest".to_string())
} }
}; };
Ok::<RpcId, String>(id) Ok::<RpcId, String>(id)
@@ -937,7 +932,7 @@ enum GetRunninIp {
EarlyExit(Result<NoOutput, (i32, String)>), EarlyExit(Result<NoOutput, (i32, String)>),
} }
type RuntimeOfCommand = JoinHandle<Result<Result<NoOutput, (i32, String)>, Error>>; type RuntimeOfCommand = NonDetachingJoinHandle<Result<Result<NoOutput, (i32, String)>, Error>>;
async fn get_running_ip( async fn get_running_ip(
state: &Arc<ManagerSharedState>, state: &Arc<ManagerSharedState>,

View File

@@ -81,6 +81,8 @@ pub struct DockerProcedure {
#[serde(default)] #[serde(default)]
pub args: Vec<String>, pub args: Vec<String>,
#[serde(default)] #[serde(default)]
pub inject: bool,
#[serde(default)]
pub mounts: BTreeMap<VolumeId, PathBuf>, pub mounts: BTreeMap<VolumeId, PathBuf>,
#[serde(default)] #[serde(default)]
pub io_format: Option<IoFormat>, pub io_format: Option<IoFormat>,
@@ -113,6 +115,7 @@ impl DockerProcedure {
system: injectable.system, system: injectable.system,
entrypoint: injectable.entrypoint.clone(), entrypoint: injectable.entrypoint.clone(),
args: injectable.args.clone(), args: injectable.args.clone(),
inject: false,
mounts: container.mounts.clone(), mounts: container.mounts.clone(),
io_format: injectable.io_format, io_format: injectable.io_format,
sigterm_timeout: injectable.sigterm_timeout, sigterm_timeout: injectable.sigterm_timeout,
@@ -129,6 +132,7 @@ impl DockerProcedure {
system: container.system, system: container.system,
entrypoint: "sleep".to_string(), entrypoint: "sleep".to_string(),
args: Vec::new(), args: Vec::new(),
inject: false,
mounts: container.mounts.clone(), mounts: container.mounts.clone(),
io_format: None, io_format: None,
sigterm_timeout: container.sigterm_timeout, sigterm_timeout: container.sigterm_timeout,
@@ -184,7 +188,8 @@ impl DockerProcedure {
.arg("--name") .arg("--name")
.arg(&container_name) .arg(&container_name)
.arg(format!("--hostname={}", &container_name)) .arg(format!("--hostname={}", &container_name))
.arg("--no-healthcheck"); .arg("--no-healthcheck")
.kill_on_drop(true);
match ctx match ctx
.docker .docker
.remove_container( .remove_container(
@@ -796,7 +801,6 @@ impl LongRunning {
pkg_id: &PackageId, pkg_id: &PackageId,
pkg_version: &Version, pkg_version: &Version,
) -> Result<tokio::process::Command, Error> { ) -> Result<tokio::process::Command, Error> {
tracing::error!("BLUJ setup_long_running_docker_cmd {container_name}");
const INIT_EXEC: &str = "/start9/embassy_container_init"; const INIT_EXEC: &str = "/start9/embassy_container_init";
const BIND_LOCATION: &str = "/usr/lib/embassy/container"; const BIND_LOCATION: &str = "/usr/lib/embassy/container";
tracing::trace!("setup_long_running_docker_cmd"); tracing::trace!("setup_long_running_docker_cmd");
@@ -831,7 +835,8 @@ impl LongRunning {
.arg("--entrypoint") .arg("--entrypoint")
.arg(format!("{INIT_EXEC}.{image_architecture}")) .arg(format!("{INIT_EXEC}.{image_architecture}"))
.arg("-i") .arg("-i")
.arg("--rm"); .arg("--rm")
.kill_on_drop(true);
for (volume_id, dst) in &docker.mounts { for (volume_id, dst) in &docker.mounts {
let volume = if let Some(v) = volumes.get(volume_id) { let volume = if let Some(v) = volumes.get(volume_id) {

View File

@@ -70,6 +70,11 @@ impl PackageProcedure {
) -> Result<Result<O, (i32, String)>, Error> { ) -> Result<Result<O, (i32, String)>, Error> {
tracing::trace!("Procedure execute {} {} - {:?}", self, pkg_id, name); tracing::trace!("Procedure execute {} {} - {:?}", self, pkg_id, name);
match self { match self {
PackageProcedure::Docker(procedure) if procedure.inject == true => {
procedure
.inject(ctx, pkg_id, pkg_version, name, volumes, input, timeout)
.await
}
PackageProcedure::Docker(procedure) => { PackageProcedure::Docker(procedure) => {
procedure procedure
.execute(ctx, pkg_id, pkg_version, name, volumes, input, timeout) .execute(ctx, pkg_id, pkg_version, name, volumes, input, timeout)

View File

@@ -230,6 +230,15 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send + Sync> S9pkReader<R> {
&man.volumes, &man.volumes,
&validated_image_ids, &validated_image_ids,
)?; )?;
if man.containers.is_some()
&& matches!(man.main, crate::procedure::PackageProcedure::Docker(_))
{
return Err(Error::new(
eyre!("Cannot have a main docker and a main in containers"),
crate::ErrorKind::ValidateS9pk,
));
}
if let Some(props) = &man.properties { if let Some(props) = &man.properties {
props props
.validate( .validate(