Feat/long running (#1676)

* feat: Start the long running container

* feat: Long running docker, running, stoping, and uninstalling

* feat: Just make the folders that we would like to mount.

* fix: Uninstall not working

* chore: remove some logging

* feat: Smarter cleanup

* feat: Wait for start

* wip: Need to kill

* chore: Remove the bad tracing

* feat: Stopping the long running processes without killing the long
running

* Mino Feat: Change the Manifest To have a new type (#1736)

* Add build-essential to README.md (#1716)

Update README.md

* write image to sparse-aware archive format (#1709)

* fix: Add modification to the max_user_watches (#1695)

* fix: Add modification to the max_user_watches

* chore: Move to initialization

* [Feat] follow logs (#1714)

* tail logs

* add cli

* add FE

* abstract http to shared

* batch new logs

* file download for logs

* fix modal error when no config

Co-authored-by: Chris Guida <chrisguida@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>
Co-authored-by: BluJ <mogulslayer@gmail.com>

* Update README.md (#1728)

* fix build for patch-db client for consistency (#1722)

* fix cli install (#1720)

* highlight instructions if not viewed (#1731)

* wip:

* [ ] Fix the build (dependencies:634 map for option)

* fix: Cargo build

* fix: Long running wasn't starting

* fix: uninstall works

Co-authored-by: Chris Guida <chrisguida@users.noreply.github.com>
Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>
Co-authored-by: Lucy C <12953208+elvece@users.noreply.github.com>
Co-authored-by: Matt Hill <MattDHill@users.noreply.github.com>

* chore: Fix a dbg!

* chore: Make the commands of the docker-inject do inject instead of exec

* chore: Fix compile mistake

* chore: Change to use simpler

Co-authored-by: Chris Guida <chrisguida@users.noreply.github.com>
Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>
Co-authored-by: Lucy C <12953208+elvece@users.noreply.github.com>
Co-authored-by: Matt Hill <MattDHill@users.noreply.github.com>
This commit is contained in:
J M
2022-08-19 11:05:23 -06:00
committed by Aiden McClelland
parent 8093faee19
commit 35b220d7a5
23 changed files with 1236 additions and 322 deletions

View File

@@ -113,7 +113,14 @@ pub async fn check<Db: DbHandle>(
let health_results = if let Some(started) = started {
manifest
.health_checks
.check_all(ctx, started, id, &manifest.version, &manifest.volumes)
.check_all(
ctx,
&manifest.container,
started,
id,
&manifest.version,
&manifest.volumes,
)
.await?
} else {
return Ok(());

View File

@@ -1,6 +1,7 @@
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::future::Future;
use std::net::Ipv4Addr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
@@ -13,14 +14,14 @@ use nix::sys::signal::Signal;
use num_enum::TryFromPrimitive;
use patch_db::DbHandle;
use sqlx::{Executor, Postgres};
use tokio::io::BufReader;
use tokio::sync::watch::error::RecvError;
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::sync::{Notify, RwLock};
use tokio::sync::{Mutex, Notify, RwLock};
use tokio::task::JoinHandle;
use torut::onion::TorSecretKeyV3;
use tracing::instrument;
use crate::context::RpcContext;
use crate::manager::sync::synchronizer;
use crate::net::interface::InterfaceId;
use crate::net::GeneratedCertificateMountPoint;
use crate::notifications::NotificationLevel;
@@ -30,6 +31,8 @@ use crate::s9pk::manifest::{Manifest, PackageId};
use crate::status::MainStatus;
use crate::util::{Container, NonDetachingJoinHandle, Version};
use crate::Error;
use crate::{context::RpcContext, procedure::docker::DockerContainer};
use crate::{manager::sync::synchronizer, procedure::docker::DockerInject};
pub mod health;
mod sync;
@@ -68,6 +71,7 @@ impl ManagerMap {
} else {
continue;
};
let tor_keys = man.interfaces.tor_keys(secrets, &package).await?;
res.insert(
(package, man.version.clone()),
@@ -145,6 +149,7 @@ impl ManagerMap {
pub struct Manager {
shared: Arc<ManagerSharedState>,
thread: Container<NonDetachingJoinHandle<()>>,
persistant_container: Arc<PersistantContainer>,
}
#[derive(TryFromPrimitive)]
@@ -176,130 +181,48 @@ pub enum OnStop {
Exit,
}
#[instrument(skip(state))]
#[instrument(skip(state, persistant))]
async fn run_main(
state: &Arc<ManagerSharedState>,
persistant: Arc<PersistantContainer>,
) -> Result<Result<NoOutput, (i32, String)>, Error> {
let rt_state = state.clone();
let interfaces = state
.manifest
.interfaces
.0
.iter()
.map(|(id, info)| {
Ok((
id.clone(),
info,
state
.tor_keys
.get(id)
.ok_or_else(|| {
Error::new(eyre!("interface {} missing key", id), crate::ErrorKind::Tor)
})?
.clone(),
))
})
.collect::<Result<Vec<_>, Error>>()?;
let generated_certificate = state
.ctx
.net_controller
.generate_certificate_mountpoint(&state.manifest.id, &interfaces)
.await?;
let mut runtime =
tokio::spawn(async move { start_up_image(rt_state, generated_certificate).await });
let ip;
loop {
match state
.ctx
.docker
.inspect_container(&state.container_name, None)
.await
{
Ok(res) => {
if let Some(ip_addr) = res
.network_settings
.and_then(|ns| ns.networks)
.and_then(|mut n| n.remove("start9"))
.and_then(|es| es.ip_address)
.filter(|ip| !ip.is_empty())
.map(|ip| ip.parse())
.transpose()?
{
ip = ip_addr;
break;
}
}
Err(bollard::errors::Error::DockerResponseServerError {
status_code: 404, // NOT FOUND
..
}) => (),
Err(e) => Err(e)?,
}
match futures::poll!(&mut runtime) {
Poll::Ready(res) => {
return res
.map_err(|_| {
Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)
})
.and_then(|a| a)
}
_ => (),
let interfaces = states_main_interfaces(state)?;
let generated_certificate = generate_certificate(state, &interfaces).await?;
persistant.wait_for_persistant().await;
let is_injectable_main = check_is_injectable_main(&state);
let mut runtime = match is_injectable_main {
true => {
tokio::spawn(
async move { start_up_inject_image(rt_state, generated_certificate).await },
)
}
false => tokio::spawn(async move { start_up_image(rt_state, generated_certificate).await }),
};
let ip = match is_injectable_main {
false => Some(match get_running_ip(state, &mut runtime).await {
GetRunninIp::Ip(x) => x,
GetRunninIp::Error(e) => return Err(e),
GetRunninIp::EarlyExit(x) => return Ok(x),
}),
true => None,
};
if let Some(ip) = ip {
add_network_for_main(state, ip, interfaces, generated_certificate).await?;
}
state
.ctx
.net_controller
.add(&state.manifest.id, ip, interfaces, generated_certificate)
.await?;
state
.commit_health_check_results
.store(true, Ordering::SeqCst);
let health = async {
tokio::time::sleep(Duration::from_secs(10)).await; // only sleep for 1 second before first health check
loop {
let mut db = state.ctx.db.handle();
if let Err(e) = health::check(
&state.ctx,
&mut db,
&state.manifest.id,
&state.commit_health_check_results,
)
.await
{
tracing::error!(
"Failed to run health check for {}: {}",
&state.manifest.id,
e
);
tracing::debug!("{:?}", e);
}
tokio::time::sleep(Duration::from_secs(HEALTH_CHECK_COOLDOWN_SECONDS)).await;
}
};
let _ = state
.status
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
if x == Status::Starting as usize {
Some(Status::Running as usize)
} else {
None
}
});
set_commit_health_true(state);
let health = main_health_check_daemon(state.clone());
fetch_starting_to_running(state);
let res = tokio::select! {
a = runtime => a.map_err(|_| Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)).and_then(|a| a),
_ = health => Err(Error::new(eyre!("Health check daemon exited!"), crate::ErrorKind::Unknown)),
};
state
.ctx
.net_controller
.remove(
&state.manifest.id,
ip,
state.manifest.interfaces.0.keys().cloned(),
)
.await?;
if let Some(ip) = ip {
remove_network_for_main(state, ip).await?;
}
res
}
@@ -314,12 +237,34 @@ async fn start_up_image(
.main
.execute::<(), NoOutput>(
&rt_state.ctx,
&rt_state.manifest.container,
&rt_state.manifest.id,
&rt_state.manifest.version,
ProcedureName::Main,
&rt_state.manifest.volumes,
None,
None,
)
.await
}
/// We want to start up the manifest, but in this case we want to know that we have generated the certificates.
/// Note for _generated_certificate: Needed to know that before we start the state we have generated the certificate
async fn start_up_inject_image(
rt_state: Arc<ManagerSharedState>,
_generated_certificate: GeneratedCertificateMountPoint,
) -> Result<Result<NoOutput, (i32, String)>, Error> {
rt_state
.manifest
.main
.inject::<(), NoOutput>(
&rt_state.ctx,
&rt_state.manifest.container,
&rt_state.manifest.id,
&rt_state.manifest.version,
ProcedureName::Main,
&rt_state.manifest.volumes,
None,
false,
None,
)
.await
@@ -346,14 +291,17 @@ impl Manager {
});
shared.synchronize_now.notify_one();
let thread_shared = shared.clone();
let persistant_container = PersistantContainer::new(&thread_shared);
let managers_persistant = persistant_container.clone();
let thread = tokio::spawn(async move {
tokio::select! {
_ = manager_thread_loop(recv, &thread_shared) => (),
_ = manager_thread_loop(recv, &thread_shared, managers_persistant) => (),
_ = synchronizer(&*thread_shared) => (),
}
});
Ok(Manager {
shared,
persistant_container,
thread: Container::new(Some(thread.into())),
})
}
@@ -400,41 +348,53 @@ impl Manager {
.commit_health_check_results
.store(false, Ordering::SeqCst);
let _ = self.shared.on_stop.send(OnStop::Exit);
let action = match &self.shared.manifest.main {
PackageProcedure::Docker(a) => a,
let sigterm_timeout: Option<crate::util::serde::Duration> = match &self.shared.manifest.main
{
PackageProcedure::Docker(DockerProcedure {
sigterm_timeout, ..
})
| PackageProcedure::DockerInject(DockerInject {
sigterm_timeout, ..
}) => sigterm_timeout.clone(),
#[cfg(feature = "js_engine")]
PackageProcedure::Script(_) => return Ok(()),
};
match self
.shared
.ctx
.docker
.stop_container(
&self.shared.container_name,
Some(StopContainerOptions {
t: action
.sigterm_timeout
.map(|a| *a)
.unwrap_or(Duration::from_secs(30))
.as_secs_f64() as i64,
}),
)
.await
{
Err(bollard::errors::Error::DockerResponseServerError {
status_code: 404, // NOT FOUND
..
})
| Err(bollard::errors::Error::DockerResponseServerError {
status_code: 409, // CONFLICT
..
})
| Err(bollard::errors::Error::DockerResponseServerError {
status_code: 304, // NOT MODIFIED
..
}) => (), // Already stopped
a => a?,
};
self.persistant_container.stop().await;
if !check_is_injectable_main(&self.shared) {
match self
.shared
.ctx
.docker
.stop_container(
&self.shared.container_name,
Some(StopContainerOptions {
t: sigterm_timeout
.map(|a| *a)
.unwrap_or(Duration::from_secs(30))
.as_secs_f64() as i64,
}),
)
.await
{
Err(bollard::errors::Error::DockerResponseServerError {
status_code: 404, // NOT FOUND
..
})
| Err(bollard::errors::Error::DockerResponseServerError {
status_code: 409, // CONFLICT
..
})
| Err(bollard::errors::Error::DockerResponseServerError {
status_code: 304, // NOT MODIFIED
..
}) => (), // Already stopped
a => a?,
};
} else {
stop_non_first(&*self.shared.container_name).await;
}
self.shared.status.store(
Status::Shutdown as usize,
std::sync::atomic::Ordering::SeqCst,
@@ -456,7 +416,11 @@ impl Manager {
}
}
async fn manager_thread_loop(mut recv: Receiver<OnStop>, thread_shared: &Arc<ManagerSharedState>) {
async fn manager_thread_loop(
mut recv: Receiver<OnStop>,
thread_shared: &Arc<ManagerSharedState>,
persistant_container: Arc<PersistantContainer>,
) {
loop {
fn handle_stop_action<'a>(
recv: &'a mut Receiver<OnStop>,
@@ -496,7 +460,7 @@ async fn manager_thread_loop(mut recv: Receiver<OnStop>, thread_shared: &Arc<Man
);
}
}
match run_main(&thread_shared).await {
match run_main(&thread_shared, persistant_container.clone()).await {
Ok(Ok(NoOutput)) => (), // restart
Ok(Err(e)) => {
let mut db = thread_shared.ctx.db.handle();
@@ -546,6 +510,372 @@ async fn manager_thread_loop(mut recv: Receiver<OnStop>, thread_shared: &Arc<Man
}
}
struct PersistantContainer {
container_name: String,
running_docker:
Arc<Mutex<Option<NonDetachingJoinHandle<Result<Result<NoOutput, (i32, String)>, Error>>>>>,
should_stop_running: Arc<std::sync::atomic::AtomicBool>,
wait_for_start: (Sender<bool>, Receiver<bool>),
}
impl PersistantContainer {
#[instrument(skip(thread_shared))]
fn new(thread_shared: &Arc<ManagerSharedState>) -> Arc<Self> {
let wait_for_start = channel(false);
let container = Arc::new(Self {
container_name: thread_shared.container_name.clone(),
running_docker: Arc::new(Mutex::new(None)),
should_stop_running: Arc::new(AtomicBool::new(false)),
wait_for_start: wait_for_start,
});
tokio::spawn(persistant_container(
thread_shared.clone(),
container.clone(),
));
container
}
#[instrument(skip(self))]
async fn stop(&self) {
let container_name = &self.container_name;
self.should_stop_running.store(true, Ordering::SeqCst);
let mut running_docker = self.running_docker.lock().await;
*running_docker = None;
use tokio::process::Command;
if let Err(_err) = Command::new("docker")
.args(["stop", "-t", "0", &*container_name])
.output()
.await
{}
if let Err(_err) = Command::new("docker")
.args(["kill", &*container_name])
.output()
.await
{}
}
async fn wait_for_persistant(&self) {
let mut changed_rx = self.wait_for_start.1.clone();
loop {
if !*changed_rx.borrow() {
return;
}
changed_rx.changed().await.unwrap();
}
}
async fn start_wait(&self) {
self.wait_for_start.0.send(true).unwrap();
}
async fn done_waiting(&self) {
self.wait_for_start.0.send(false).unwrap();
}
}
impl Drop for PersistantContainer {
fn drop(&mut self) {
self.should_stop_running.store(true, Ordering::SeqCst);
let container_name = self.container_name.clone();
let running_docker = self.running_docker.clone();
tokio::spawn(async move {
let mut running_docker = running_docker.lock().await;
*running_docker = None;
use std::process::Command;
if let Err(_err) = Command::new("docker")
.args(["kill", &*container_name])
.output()
{}
});
}
}
async fn persistant_container(
thread_shared: Arc<ManagerSharedState>,
container: Arc<PersistantContainer>,
) {
let main_docker_procedure_for_long = injectable_main(&thread_shared);
match main_docker_procedure_for_long {
Some(main) => loop {
if container.should_stop_running.load(Ordering::SeqCst) {
return;
}
container.start_wait().await;
match run_persistant_container(&thread_shared, container.clone(), main.clone()).await {
Ok(_) => (),
Err(e) => {
tracing::error!("failed to start persistant container: {}", e);
tracing::debug!("{:?}", e);
}
}
},
None => futures::future::pending().await,
}
}
fn injectable_main(thread_shared: &Arc<ManagerSharedState>) -> Option<Arc<DockerProcedure>> {
if let (
PackageProcedure::DockerInject(DockerInject {
system,
entrypoint,
args,
io_format,
sigterm_timeout,
}),
Some(DockerContainer {
image,
mounts,
shm_size_mb,
}),
) = (
&thread_shared.manifest.main,
&thread_shared.manifest.container,
) {
Some(Arc::new(DockerProcedure {
image: image.clone(),
mounts: mounts.clone(),
io_format: *io_format,
shm_size_mb: *shm_size_mb,
sigterm_timeout: *sigterm_timeout,
system: *system,
entrypoint: "sleep".to_string(),
args: vec!["infinity".to_string()],
}))
} else {
None
}
}
fn check_is_injectable_main(thread_shared: &ManagerSharedState) -> bool {
match &thread_shared.manifest.main {
PackageProcedure::Docker(_a) => false,
PackageProcedure::DockerInject(a) => true,
#[cfg(feature = "js_engine")]
PackageProcedure::Script(_) => false,
}
}
async fn run_persistant_container(
state: &Arc<ManagerSharedState>,
persistant: Arc<PersistantContainer>,
docker_procedure: Arc<DockerProcedure>,
) -> Result<(), Error> {
let interfaces = states_main_interfaces(state)?;
let generated_certificate = generate_certificate(state, &interfaces).await?;
let mut runtime = tokio::spawn(long_running_docker(state.clone(), docker_procedure));
let ip = match get_running_ip(state, &mut runtime).await {
GetRunninIp::Ip(x) => x,
GetRunninIp::Error(e) => return Err(e),
GetRunninIp::EarlyExit(e) => {
tracing::error!("Early Exit");
tracing::debug!("{:?}", e);
return Ok(());
}
};
persistant.done_waiting().await;
add_network_for_main(state, ip, interfaces, generated_certificate).await?;
fetch_starting_to_running(state);
let res = tokio::select! {
a = runtime => a.map_err(|_| Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)).map(|_| ()),
};
remove_network_for_main(state, ip).await?;
res
}
async fn long_running_docker(
rt_state: Arc<ManagerSharedState>,
main_status: Arc<DockerProcedure>,
) -> Result<Result<NoOutput, (i32, String)>, Error> {
main_status
.execute::<(), NoOutput>(
&rt_state.ctx,
&rt_state.manifest.id,
&rt_state.manifest.version,
ProcedureName::LongRunning,
&rt_state.manifest.volumes,
None,
None,
)
.await
}
async fn remove_network_for_main(
state: &Arc<ManagerSharedState>,
ip: std::net::Ipv4Addr,
) -> Result<(), Error> {
state
.ctx
.net_controller
.remove(
&state.manifest.id,
ip,
state.manifest.interfaces.0.keys().cloned(),
)
.await?;
Ok(())
}
fn fetch_starting_to_running(state: &Arc<ManagerSharedState>) {
let _ = state
.status
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
if x == Status::Starting as usize {
Some(Status::Running as usize)
} else {
None
}
});
}
async fn main_health_check_daemon(state: Arc<ManagerSharedState>) {
tokio::time::sleep(Duration::from_secs(10)).await;
loop {
let mut db = state.ctx.db.handle();
if let Err(e) = health::check(
&state.ctx,
&mut db,
&state.manifest.id,
&state.commit_health_check_results,
)
.await
{
tracing::error!(
"Failed to run health check for {}: {}",
&state.manifest.id,
e
);
tracing::debug!("{:?}", e);
}
tokio::time::sleep(Duration::from_secs(HEALTH_CHECK_COOLDOWN_SECONDS)).await;
}
}
fn set_commit_health_true(state: &Arc<ManagerSharedState>) {
state
.commit_health_check_results
.store(true, Ordering::SeqCst);
}
async fn add_network_for_main(
state: &Arc<ManagerSharedState>,
ip: std::net::Ipv4Addr,
interfaces: Vec<(
InterfaceId,
&crate::net::interface::Interface,
TorSecretKeyV3,
)>,
generated_certificate: GeneratedCertificateMountPoint,
) -> Result<(), Error> {
state
.ctx
.net_controller
.add(&state.manifest.id, ip, interfaces, generated_certificate)
.await?;
Ok(())
}
enum GetRunninIp {
Ip(Ipv4Addr),
Error(Error),
EarlyExit(Result<NoOutput, (i32, String)>),
}
async fn get_running_ip(
state: &Arc<ManagerSharedState>,
mut runtime: &mut tokio::task::JoinHandle<Result<Result<NoOutput, (i32, String)>, Error>>,
) -> GetRunninIp {
loop {
match container_inspect(state).await {
Ok(res) => {
match res
.network_settings
.and_then(|ns| ns.networks)
.and_then(|mut n| n.remove("start9"))
.and_then(|es| es.ip_address)
.filter(|ip| !ip.is_empty())
.map(|ip| ip.parse())
.transpose()
{
Ok(Some(ip_addr)) => return GetRunninIp::Ip(ip_addr),
Ok(None) => (),
Err(e) => return GetRunninIp::Error(e.into()),
}
}
Err(bollard::errors::Error::DockerResponseServerError {
status_code: 404, // NOT FOUND
..
}) => (),
Err(e) => return GetRunninIp::Error(e.into()),
}
match futures::poll!(&mut runtime) {
Poll::Ready(res) => match res {
Ok(Ok(response)) => return GetRunninIp::EarlyExit(response),
Err(_) | Ok(Err(_)) => {
return GetRunninIp::Error(Error::new(
eyre!("Manager runtime panicked!"),
crate::ErrorKind::Docker,
))
}
},
_ => (),
}
}
}
async fn container_inspect(
state: &Arc<ManagerSharedState>,
) -> Result<bollard::models::ContainerInspectResponse, bollard::errors::Error> {
state
.ctx
.docker
.inspect_container(&state.container_name, None)
.await
}
async fn generate_certificate(
state: &Arc<ManagerSharedState>,
interfaces: &Vec<(
InterfaceId,
&crate::net::interface::Interface,
TorSecretKeyV3,
)>,
) -> Result<GeneratedCertificateMountPoint, Error> {
Ok(state
.ctx
.net_controller
.generate_certificate_mountpoint(&state.manifest.id, interfaces)
.await?)
}
fn states_main_interfaces(
state: &Arc<ManagerSharedState>,
) -> Result<
Vec<(
InterfaceId,
&crate::net::interface::Interface,
TorSecretKeyV3,
)>,
Error,
> {
Ok(state
.manifest
.interfaces
.0
.iter()
.map(|(id, info)| {
Ok((
id.clone(),
info,
state
.tor_keys
.get(id)
.ok_or_else(|| {
Error::new(eyre!("interface {} missing key", id), crate::ErrorKind::Tor)
})?
.clone(),
))
})
.collect::<Result<Vec<_>, Error>>()?)
}
#[instrument(skip(shared))]
async fn stop(shared: &ManagerSharedState) -> Result<(), Error> {
shared
@@ -563,40 +893,50 @@ async fn stop(shared: &ManagerSharedState) -> Result<(), Error> {
) {
resume(shared).await?;
}
let action = match &shared.manifest.main {
PackageProcedure::Docker(a) => a,
match &shared.manifest.main {
PackageProcedure::Docker(DockerProcedure {
sigterm_timeout, ..
})
| PackageProcedure::DockerInject(DockerInject {
sigterm_timeout, ..
}) => {
if !check_is_injectable_main(shared) {
match shared
.ctx
.docker
.stop_container(
&shared.container_name,
Some(StopContainerOptions {
t: sigterm_timeout
.map(|a| *a)
.unwrap_or(Duration::from_secs(30))
.as_secs_f64() as i64,
}),
)
.await
{
Err(bollard::errors::Error::DockerResponseServerError {
status_code: 404, // NOT FOUND
..
})
| Err(bollard::errors::Error::DockerResponseServerError {
status_code: 409, // CONFLICT
..
})
| Err(bollard::errors::Error::DockerResponseServerError {
status_code: 304, // NOT MODIFIED
..
}) => (), // Already stopped
a => a?,
};
} else {
stop_non_first(&shared.container_name).await;
}
}
#[cfg(feature = "js_engine")]
PackageProcedure::Script(_) => return Ok(()),
};
match shared
.ctx
.docker
.stop_container(
&shared.container_name,
Some(StopContainerOptions {
t: action
.sigterm_timeout
.map(|a| *a)
.unwrap_or(Duration::from_secs(30))
.as_secs_f64() as i64,
}),
)
.await
{
Err(bollard::errors::Error::DockerResponseServerError {
status_code: 404, // NOT FOUND
..
})
| Err(bollard::errors::Error::DockerResponseServerError {
status_code: 409, // CONFLICT
..
})
| Err(bollard::errors::Error::DockerResponseServerError {
status_code: 304, // NOT MODIFIED
..
}) => (), // Already stopped
a => a?,
};
tracing::debug!("Stopping a docker");
shared.status.store(
Status::Stopped as usize,
std::sync::atomic::Ordering::SeqCst,
@@ -604,6 +944,44 @@ async fn stop(shared: &ManagerSharedState) -> Result<(), Error> {
Ok(())
}
/// So the sleep infinity, which is the long running, is pid 1. So we kill the others
async fn stop_non_first(container_name: &str) {
// tracing::error!("BLUJ TODO: sudo docker exec {} sh -c \"ps ax | awk '\\$1 ~ /^[:0-9:]/ && \\$1 > 1 {{print \\$1}}' | xargs kill\"", container_name);
// (sleep infinity) & export RUNNING=$! && echo $! && (wait $RUNNING && echo "DONE FOR $RUNNING") &
// (RUNNING=$(sleep infinity & echo $!); echo "running $RUNNING"; wait $RUNNING; echo "DONE FOR ?") &
let _ = tokio::process::Command::new("docker")
.args([
"container",
"exec",
container_name,
"sh",
"-c",
"ps ax | awk '$1 ~ /^[:0-9:]/ && $1 > 1 {print $1}' | xargs kill",
])
.output()
.await;
}
// #[test]
// fn test_stop_non_first() {
// assert_eq!(
// &format!(
// "{}",
// tokio::process::Command::new("docker").args([
// "container",
// "exec",
// "container_name",
// "sh",
// "-c",
// "ps ax | awk \"\\$1 ~ /^[:0-9:]/ && \\$1 > 1 {print \\$1}\"| xargs kill",
// ])
// ),
// ""
// );
// }
#[instrument(skip(shared))]
async fn start(shared: &ManagerSharedState) -> Result<(), Error> {
shared.on_stop.send(OnStop::Restart).map_err(|_| {