mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-04-01 21:13:09 +00:00
Refactor/service manager (#2401)
* wip: Pulling in the features of the refactor since march * chore: Fixes to make the system able to build * chore: Adding in the documentation for the manager stuff * feat: Restarting and wait for stop * feat: Add a soft shutdown not commit to db. * chore: Remove the comments of bluj * chore: Clean up some of the linting errors * chore: Clean up the signal * chore: Some more cleanup * fix: The configure * fix: A missing config * fix: typo * chore: Remove a comment of BLUJ that needed to be removed
This commit is contained in:
@@ -1,7 +1,5 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use itertools::Itertools;
|
||||
use patch_db::{DbHandle, LockReceipt, LockType};
|
||||
use tracing::instrument;
|
||||
|
||||
@@ -91,12 +89,12 @@ impl HealthCheckStatusReceipt {
|
||||
}
|
||||
}
|
||||
|
||||
/// So, this is used for a service to run a health check cycle, go out and run the health checks, and store those in the db
|
||||
#[instrument(skip_all)]
|
||||
pub async fn check<Db: DbHandle>(
|
||||
ctx: &RpcContext,
|
||||
db: &mut Db,
|
||||
id: &PackageId,
|
||||
should_commit: &AtomicBool,
|
||||
) -> Result<(), Error> {
|
||||
let mut tx = db.begin().await?;
|
||||
let (manifest, started) = {
|
||||
@@ -115,40 +113,12 @@ pub async fn check<Db: DbHandle>(
|
||||
tracing::debug!("Checking health of {}", id);
|
||||
manifest
|
||||
.health_checks
|
||||
.check_all(
|
||||
ctx,
|
||||
&manifest.containers,
|
||||
started,
|
||||
id,
|
||||
&manifest.version,
|
||||
&manifest.volumes,
|
||||
)
|
||||
.check_all(ctx, started, id, &manifest.version, &manifest.volumes)
|
||||
.await?
|
||||
} else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if !should_commit.load(Ordering::SeqCst) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if !health_results
|
||||
.iter()
|
||||
.any(|(_, res)| matches!(res, HealthCheckResult::Failure { .. }))
|
||||
{
|
||||
tracing::debug!("All health checks succeeded for {}", id);
|
||||
} else {
|
||||
tracing::debug!(
|
||||
"Some health checks failed for {}: {}",
|
||||
id,
|
||||
health_results
|
||||
.iter()
|
||||
.filter(|(_, res)| matches!(res, HealthCheckResult::Failure { .. }))
|
||||
.map(|(id, _)| &*id)
|
||||
.join(", ")
|
||||
);
|
||||
}
|
||||
|
||||
let current_dependents = {
|
||||
let mut checkpoint = tx.begin().await?;
|
||||
let receipts = HealthCheckStatusReceipt::new(&mut checkpoint, id).await?;
|
||||
|
||||
343
backend/src/manager/manager_container.rs
Normal file
343
backend/src/manager/manager_container.rs
Normal file
@@ -0,0 +1,343 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::FutureExt;
|
||||
use patch_db::PatchDbHandle;
|
||||
use tokio::sync::watch;
|
||||
use tokio::sync::watch::Sender;
|
||||
use tracing::instrument;
|
||||
|
||||
use super::start_stop::StartStop;
|
||||
use super::{manager_seed, run_main, ManagerPersistentContainer, RunMainResult};
|
||||
use crate::procedure::NoOutput;
|
||||
use crate::s9pk::manifest::Manifest;
|
||||
use crate::status::MainStatus;
|
||||
use crate::util::{GeneralBoxedGuard, NonDetachingJoinHandle};
|
||||
use crate::Error;
|
||||
|
||||
pub type ManageContainerOverride = Arc<watch::Sender<Option<MainStatus>>>;
|
||||
|
||||
/// This is the thing describing the state machine actor for a service
|
||||
/// state and current running/ desired states.
|
||||
pub struct ManageContainer {
|
||||
pub(super) current_state: Arc<watch::Sender<StartStop>>,
|
||||
pub(super) desired_state: Arc<watch::Sender<StartStop>>,
|
||||
_service: NonDetachingJoinHandle<()>,
|
||||
_save_state: NonDetachingJoinHandle<()>,
|
||||
override_main_status: ManageContainerOverride,
|
||||
}
|
||||
|
||||
impl ManageContainer {
|
||||
pub async fn new(
|
||||
seed: Arc<manager_seed::ManagerSeed>,
|
||||
persistent_container: ManagerPersistentContainer,
|
||||
) -> Result<Self, Error> {
|
||||
let mut db = seed.ctx.db.handle();
|
||||
let current_state = Arc::new(watch::channel(StartStop::Stop).0);
|
||||
let desired_state = Arc::new(
|
||||
watch::channel::<StartStop>(get_status(&mut db, &seed.manifest).await.into()).0,
|
||||
);
|
||||
let override_main_status: ManageContainerOverride = Arc::new(watch::channel(None).0);
|
||||
let service = tokio::spawn(create_service_manager(
|
||||
desired_state.clone(),
|
||||
seed.clone(),
|
||||
current_state.clone(),
|
||||
persistent_container,
|
||||
))
|
||||
.into();
|
||||
let save_state = tokio::spawn(save_state(
|
||||
desired_state.clone(),
|
||||
current_state.clone(),
|
||||
override_main_status.clone(),
|
||||
seed.clone(),
|
||||
))
|
||||
.into();
|
||||
Ok(ManageContainer {
|
||||
current_state,
|
||||
desired_state,
|
||||
_service: service,
|
||||
override_main_status,
|
||||
_save_state: save_state,
|
||||
})
|
||||
}
|
||||
|
||||
/// Set override is used during something like a restart of a service. We want to show certain statuses be different
|
||||
/// from the actual status of the service.
|
||||
pub fn set_override(&self, override_status: Option<MainStatus>) -> GeneralBoxedGuard {
|
||||
self.override_main_status
|
||||
.send_modify(|x| *x = override_status);
|
||||
let override_main_status = self.override_main_status.clone();
|
||||
GeneralBoxedGuard::new(move || {
|
||||
override_main_status.send_modify(|x| *x = None);
|
||||
})
|
||||
}
|
||||
|
||||
/// Set the override, but don't have a guard to revert it. Used only on the mananger to do a shutdown.
|
||||
pub(super) async fn lock_state_forever(&self, seed: &manager_seed::ManagerSeed) {
|
||||
let mut db = seed.ctx.db.handle();
|
||||
let current_state = get_status(&mut db, &seed.manifest).await;
|
||||
self.override_main_status
|
||||
.send_modify(|x| *x = Some(current_state));
|
||||
}
|
||||
|
||||
/// We want to set the state of the service, like to start or stop
|
||||
pub fn to_desired(&self, new_state: StartStop) {
|
||||
self.desired_state.send_modify(|x| *x = new_state);
|
||||
}
|
||||
|
||||
/// This is a tool to say wait for the service to be in a certain state.
|
||||
pub async fn wait_for_desired(&self, new_state: StartStop) {
|
||||
let mut current_state = self.current_state();
|
||||
self.to_desired(new_state);
|
||||
while *current_state.borrow() != new_state {
|
||||
current_state.changed().await.unwrap_or_default();
|
||||
}
|
||||
}
|
||||
|
||||
/// Getter
|
||||
pub fn current_state(&self) -> watch::Receiver<StartStop> {
|
||||
self.current_state.subscribe()
|
||||
}
|
||||
|
||||
/// Getter
|
||||
pub fn desired_state(&self) -> watch::Receiver<StartStop> {
|
||||
self.desired_state.subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_service_manager(
|
||||
desired_state: Arc<Sender<StartStop>>,
|
||||
seed: Arc<manager_seed::ManagerSeed>,
|
||||
current_state: Arc<Sender<StartStop>>,
|
||||
persistent_container: Arc<Option<super::persistent_container::PersistentContainer>>,
|
||||
) {
|
||||
let mut desired_state_receiver = desired_state.subscribe();
|
||||
let mut running_service: Option<NonDetachingJoinHandle<()>> = None;
|
||||
let seed = seed.clone();
|
||||
loop {
|
||||
let current: StartStop = *current_state.borrow();
|
||||
let desired: StartStop = *desired_state_receiver.borrow();
|
||||
match (current, desired) {
|
||||
(StartStop::Start, StartStop::Start) => (),
|
||||
(StartStop::Start, StartStop::Stop) => {
|
||||
if persistent_container.is_none() {
|
||||
if let Err(err) = seed.stop_container().await {
|
||||
tracing::error!("Could not stop container");
|
||||
tracing::debug!("{:?}", err)
|
||||
}
|
||||
running_service = None;
|
||||
} else if let Some(current_service) = running_service.take() {
|
||||
tokio::select! {
|
||||
_ = current_service => (),
|
||||
_ = tokio::time::sleep(Duration::from_secs_f64(seed.manifest
|
||||
.containers
|
||||
.as_ref()
|
||||
.and_then(|c| c.main.sigterm_timeout).map(|x| x.as_secs_f64()).unwrap_or_default())) => {
|
||||
tracing::error!("Could not stop service");
|
||||
}
|
||||
}
|
||||
}
|
||||
current_state.send_modify(|x| *x = StartStop::Stop);
|
||||
}
|
||||
(StartStop::Stop, StartStop::Start) => starting_service(
|
||||
current_state.clone(),
|
||||
desired_state.clone(),
|
||||
seed.clone(),
|
||||
persistent_container.clone(),
|
||||
&mut running_service,
|
||||
),
|
||||
(StartStop::Stop, StartStop::Stop) => (),
|
||||
}
|
||||
|
||||
if desired_state_receiver.changed().await.is_err() {
|
||||
tracing::error!("Desired state error");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn save_state(
|
||||
desired_state: Arc<Sender<StartStop>>,
|
||||
current_state: Arc<Sender<StartStop>>,
|
||||
override_main_status: Arc<Sender<Option<MainStatus>>>,
|
||||
seed: Arc<manager_seed::ManagerSeed>,
|
||||
) {
|
||||
let mut desired_state_receiver = desired_state.subscribe();
|
||||
let mut current_state_receiver = current_state.subscribe();
|
||||
let mut override_main_status_receiver = override_main_status.subscribe();
|
||||
loop {
|
||||
let current: StartStop = *current_state_receiver.borrow();
|
||||
let desired: StartStop = *desired_state_receiver.borrow();
|
||||
let override_status = override_main_status_receiver.borrow().clone();
|
||||
let mut db = seed.ctx.db.handle();
|
||||
let res = match (override_status, current, desired) {
|
||||
(Some(status), _, _) => set_status(&mut db, &seed.manifest, &status).await,
|
||||
(None, StartStop::Start, StartStop::Start) => {
|
||||
set_status(
|
||||
&mut db,
|
||||
&seed.manifest,
|
||||
&MainStatus::Running {
|
||||
started: chrono::Utc::now(),
|
||||
health: Default::default(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
(None, StartStop::Start, StartStop::Stop) => {
|
||||
set_status(&mut db, &seed.manifest, &MainStatus::Stopping).await
|
||||
}
|
||||
(None, StartStop::Stop, StartStop::Start) => {
|
||||
set_status(&mut db, &seed.manifest, &MainStatus::Starting).await
|
||||
}
|
||||
(None, StartStop::Stop, StartStop::Stop) => {
|
||||
set_status(&mut db, &seed.manifest, &MainStatus::Stopped).await
|
||||
}
|
||||
};
|
||||
if let Err(err) = res {
|
||||
tracing::error!("Did not set status for {}", seed.container_name);
|
||||
tracing::debug!("{:?}", err);
|
||||
}
|
||||
tokio::select! {
|
||||
_ = desired_state_receiver.changed() =>{},
|
||||
_ = current_state_receiver.changed() => {},
|
||||
_ = override_main_status_receiver.changed() => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn starting_service(
|
||||
current_state: Arc<Sender<StartStop>>,
|
||||
desired_state: Arc<Sender<StartStop>>,
|
||||
seed: Arc<manager_seed::ManagerSeed>,
|
||||
persistent_container: ManagerPersistentContainer,
|
||||
running_service: &mut Option<NonDetachingJoinHandle<()>>,
|
||||
) {
|
||||
let set_running = {
|
||||
let current_state = current_state.clone();
|
||||
Arc::new(move || {
|
||||
current_state.send_modify(|x| *x = StartStop::Start);
|
||||
})
|
||||
};
|
||||
let set_stopped = { move || current_state.send_modify(|x| *x = StartStop::Stop) };
|
||||
let running_main_loop = async move {
|
||||
while desired_state.borrow().is_start() {
|
||||
let result = run_main(
|
||||
seed.clone(),
|
||||
persistent_container.clone(),
|
||||
set_running.clone(),
|
||||
)
|
||||
.await;
|
||||
set_stopped();
|
||||
run_main_log_result(result, seed.clone()).await;
|
||||
}
|
||||
};
|
||||
*running_service = Some(tokio::spawn(running_main_loop).into());
|
||||
}
|
||||
|
||||
async fn run_main_log_result(result: RunMainResult, seed: Arc<manager_seed::ManagerSeed>) {
|
||||
match result {
|
||||
Ok(Ok(NoOutput)) => (), // restart
|
||||
Ok(Err(e)) => {
|
||||
#[cfg(feature = "unstable")]
|
||||
{
|
||||
use crate::notifications::NotificationLevel;
|
||||
let mut db = seed.ctx.db.handle();
|
||||
let started = crate::db::DatabaseModel::new()
|
||||
.package_data()
|
||||
.idx_model(&seed.manifest.id)
|
||||
.and_then(|pde| pde.installed())
|
||||
.map::<_, MainStatus>(|i| i.status().main())
|
||||
.get(&mut db)
|
||||
.await;
|
||||
match started.as_deref() {
|
||||
Ok(Some(MainStatus::Running { .. })) => {
|
||||
let res = seed.ctx.notification_manager
|
||||
.notify(
|
||||
&mut db,
|
||||
Some(seed.manifest.id.clone()),
|
||||
NotificationLevel::Warning,
|
||||
String::from("Service Crashed"),
|
||||
format!("The service {} has crashed with the following exit code: {}\nDetails: {}", seed.manifest.id.clone(), e.0, e.1),
|
||||
(),
|
||||
Some(3600) // 1 hour
|
||||
)
|
||||
.await;
|
||||
if let Err(e) = res {
|
||||
tracing::error!("Failed to issue notification: {}", e);
|
||||
tracing::debug!("{:?}", e);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
tracing::error!("service just started. not issuing crash notification")
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::error!(
|
||||
"The service {} has crashed with the following exit code: {}",
|
||||
seed.manifest.id.clone(),
|
||||
e.0
|
||||
);
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(15)).await;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("failed to start service: {}", e);
|
||||
tracing::debug!("{:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Used only in the mod where we are doing a backup
|
||||
#[instrument(skip(db, manifest))]
|
||||
pub(super) async fn get_status(db: &mut PatchDbHandle, manifest: &Manifest) -> MainStatus {
|
||||
async move {
|
||||
Ok::<_, Error>(
|
||||
crate::db::DatabaseModel::new()
|
||||
.package_data()
|
||||
.idx_model(&manifest.id)
|
||||
.expect(db)
|
||||
.await?
|
||||
.installed()
|
||||
.expect(db)
|
||||
.await?
|
||||
.status()
|
||||
.main()
|
||||
.get(db)
|
||||
.await?
|
||||
.clone(),
|
||||
)
|
||||
}
|
||||
.map(|x| x.unwrap_or_else(|_| MainStatus::Stopped))
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip(db, manifest))]
|
||||
async fn set_status(
|
||||
db: &mut PatchDbHandle,
|
||||
manifest: &Manifest,
|
||||
main_status: &MainStatus,
|
||||
) -> Result<(), Error> {
|
||||
if crate::db::DatabaseModel::new()
|
||||
.package_data()
|
||||
.idx_model(&manifest.id)
|
||||
.expect(db)
|
||||
.await?
|
||||
.installed()
|
||||
.exists(db)
|
||||
.await?
|
||||
{
|
||||
crate::db::DatabaseModel::new()
|
||||
.package_data()
|
||||
.idx_model(&manifest.id)
|
||||
.expect(db)
|
||||
.await?
|
||||
.installed()
|
||||
.expect(db)
|
||||
.await?
|
||||
.status()
|
||||
.main()
|
||||
.put(db, main_status)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
111
backend/src/manager/manager_map.rs
Normal file
111
backend/src/manager/manager_map.rs
Normal file
@@ -0,0 +1,111 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use color_eyre::eyre::eyre;
|
||||
use patch_db::DbHandle;
|
||||
use sqlx::{Executor, Postgres};
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::instrument;
|
||||
|
||||
use super::Manager;
|
||||
use crate::context::RpcContext;
|
||||
use crate::s9pk::manifest::{Manifest, PackageId};
|
||||
use crate::util::Version;
|
||||
use crate::Error;
|
||||
|
||||
/// This is the structure to contain all the service managers
|
||||
#[derive(Default)]
|
||||
pub struct ManagerMap(RwLock<BTreeMap<(PackageId, Version), Arc<Manager>>>);
|
||||
impl ManagerMap {
|
||||
#[instrument(skip_all)]
|
||||
pub async fn init<Db: DbHandle, Ex>(
|
||||
&self,
|
||||
ctx: &RpcContext,
|
||||
db: &mut Db,
|
||||
secrets: &mut Ex,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
for<'a> &'a mut Ex: Executor<'a, Database = Postgres>,
|
||||
{
|
||||
let mut res = BTreeMap::new();
|
||||
for package in crate::db::DatabaseModel::new()
|
||||
.package_data()
|
||||
.keys(db)
|
||||
.await?
|
||||
{
|
||||
let man: Manifest = if let Some(manifest) = crate::db::DatabaseModel::new()
|
||||
.package_data()
|
||||
.idx_model(&package)
|
||||
.and_then(|pkg| pkg.installed())
|
||||
.map(|m| m.manifest())
|
||||
.get(db)
|
||||
.await?
|
||||
.to_owned()
|
||||
{
|
||||
manifest
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
|
||||
res.insert(
|
||||
(package, man.version.clone()),
|
||||
Arc::new(Manager::new(ctx.clone(), man).await?),
|
||||
);
|
||||
}
|
||||
*self.0.write().await = res;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Used during the install process
|
||||
#[instrument(skip_all)]
|
||||
pub async fn add(&self, ctx: RpcContext, manifest: Manifest) -> Result<(), Error> {
|
||||
let mut lock = self.0.write().await;
|
||||
let id = (manifest.id.clone(), manifest.version.clone());
|
||||
if let Some(man) = lock.remove(&id) {
|
||||
man.exit().await;
|
||||
}
|
||||
lock.insert(id, Arc::new(Manager::new(ctx, manifest).await?));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This is ran during the cleanup, so when we are uninstalling the service
|
||||
#[instrument(skip_all)]
|
||||
pub async fn remove(&self, id: &(PackageId, Version)) {
|
||||
if let Some(man) = self.0.write().await.remove(id) {
|
||||
man.exit().await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Used during a shutdown
|
||||
#[instrument(skip_all)]
|
||||
pub async fn empty(&self) -> Result<(), Error> {
|
||||
let res =
|
||||
futures::future::join_all(std::mem::take(&mut *self.0.write().await).into_iter().map(
|
||||
|((id, version), man)| async move {
|
||||
tracing::debug!("Manager for {}@{} shutting down", id, version);
|
||||
man.shutdown().await;
|
||||
tracing::debug!("Manager for {}@{} is shutdown", id, version);
|
||||
if let Err(e) = Arc::try_unwrap(man) {
|
||||
tracing::trace!(
|
||||
"Manager for {}@{} still has {} other open references",
|
||||
id,
|
||||
version,
|
||||
Arc::strong_count(&e) - 1
|
||||
);
|
||||
}
|
||||
Ok::<_, Error>(())
|
||||
},
|
||||
))
|
||||
.await;
|
||||
res.into_iter().fold(Ok(()), |res, x| match (res, x) {
|
||||
(Ok(()), x) => x,
|
||||
(Err(e), Ok(())) => Err(e),
|
||||
(Err(e1), Err(e2)) => Err(Error::new(eyre!("{}, {}", e1.source, e2.source), e1.kind)),
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn get(&self, id: &(PackageId, Version)) -> Option<Arc<Manager>> {
|
||||
self.0.read().await.get(id).cloned()
|
||||
}
|
||||
}
|
||||
63
backend/src/manager/manager_seed.rs
Normal file
63
backend/src/manager/manager_seed.rs
Normal file
@@ -0,0 +1,63 @@
|
||||
use bollard::container::{StopContainerOptions, WaitContainerOptions};
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
use crate::context::RpcContext;
|
||||
use crate::s9pk::manifest::Manifest;
|
||||
use crate::Error;
|
||||
|
||||
/// This is helper structure for a service, the seed of the data that is needed for the manager_container
|
||||
pub struct ManagerSeed {
|
||||
pub ctx: RpcContext,
|
||||
pub manifest: Manifest,
|
||||
pub container_name: String,
|
||||
}
|
||||
|
||||
impl ManagerSeed {
|
||||
pub async fn stop_container(&self) -> Result<(), Error> {
|
||||
match self
|
||||
.ctx
|
||||
.docker
|
||||
.stop_container(
|
||||
&self.container_name,
|
||||
Some(StopContainerOptions {
|
||||
t: self
|
||||
.manifest
|
||||
.containers
|
||||
.as_ref()
|
||||
.and_then(|c| c.main.sigterm_timeout)
|
||||
.map(|d| d.as_secs())
|
||||
.unwrap_or(30) as i64,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(bollard::errors::Error::DockerResponseServerError {
|
||||
status_code: 404, // NOT FOUND
|
||||
..
|
||||
})
|
||||
| Err(bollard::errors::Error::DockerResponseServerError {
|
||||
status_code: 409, // CONFLICT
|
||||
..
|
||||
})
|
||||
| Err(bollard::errors::Error::DockerResponseServerError {
|
||||
status_code: 304, // NOT MODIFIED
|
||||
..
|
||||
}) => (), // Already stopped
|
||||
a => a?,
|
||||
}
|
||||
|
||||
// Wait for the container to stop
|
||||
{
|
||||
let mut waiting = self.ctx.docker.wait_container(
|
||||
&self.container_name,
|
||||
Some(WaitContainerOptions {
|
||||
condition: "not-running",
|
||||
}),
|
||||
);
|
||||
while let Some(_) = waiting.next().await {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
101
backend/src/manager/persistent_container.rs
Normal file
101
backend/src/manager/persistent_container.rs
Normal file
@@ -0,0 +1,101 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use color_eyre::eyre::eyre;
|
||||
use helpers::UnixRpcClient;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::watch::{self, Receiver};
|
||||
use tracing::instrument;
|
||||
|
||||
use super::manager_seed::ManagerSeed;
|
||||
use super::{
|
||||
add_network_for_main, get_long_running_ip, long_running_docker, remove_network_for_main,
|
||||
GetRunningIp,
|
||||
};
|
||||
use crate::procedure::docker::DockerContainer;
|
||||
use crate::util::NonDetachingJoinHandle;
|
||||
use crate::Error;
|
||||
|
||||
/// Persistant container are the old containers that need to run all the time
|
||||
/// The goal is that all services will be persistent containers, waiting to run the main system.
|
||||
pub struct PersistentContainer {
|
||||
_running_docker: NonDetachingJoinHandle<()>,
|
||||
pub rpc_client: Receiver<Arc<UnixRpcClient>>,
|
||||
}
|
||||
|
||||
impl PersistentContainer {
|
||||
#[instrument(skip_all)]
|
||||
pub async fn init(seed: &Arc<ManagerSeed>) -> Result<Option<Self>, Error> {
|
||||
Ok(if let Some(containers) = &seed.manifest.containers {
|
||||
let (running_docker, rpc_client) =
|
||||
spawn_persistent_container(seed.clone(), containers.main.clone()).await?;
|
||||
Some(Self {
|
||||
_running_docker: running_docker,
|
||||
rpc_client,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
})
|
||||
}
|
||||
|
||||
pub fn rpc_client(&self) -> Arc<UnixRpcClient> {
|
||||
self.rpc_client.borrow().clone()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn spawn_persistent_container(
|
||||
seed: Arc<ManagerSeed>,
|
||||
container: DockerContainer,
|
||||
) -> Result<(NonDetachingJoinHandle<()>, Receiver<Arc<UnixRpcClient>>), Error> {
|
||||
let (send_inserter, inserter) = oneshot::channel();
|
||||
Ok((
|
||||
tokio::task::spawn(async move {
|
||||
let mut inserter_send: Option<watch::Sender<Arc<UnixRpcClient>>> = None;
|
||||
let mut send_inserter: Option<oneshot::Sender<Receiver<Arc<UnixRpcClient>>>> = Some(send_inserter);
|
||||
loop {
|
||||
if let Err(e) = async {
|
||||
let (mut runtime, inserter) =
|
||||
long_running_docker(&seed, &container).await?;
|
||||
|
||||
|
||||
let ip = match get_long_running_ip(&seed, &mut runtime).await {
|
||||
GetRunningIp::Ip(x) => x,
|
||||
GetRunningIp::Error(e) => return Err(e),
|
||||
GetRunningIp::EarlyExit(e) => {
|
||||
tracing::error!("Early Exit");
|
||||
tracing::debug!("{:?}", e);
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
let svc = add_network_for_main(&seed, ip).await?;
|
||||
|
||||
if let Some(inserter_send) = inserter_send.as_mut() {
|
||||
let _ = inserter_send.send(Arc::new(inserter));
|
||||
} else {
|
||||
let (s, r) = watch::channel(Arc::new(inserter));
|
||||
inserter_send = Some(s);
|
||||
if let Some(send_inserter) = send_inserter.take() {
|
||||
let _ = send_inserter.send(r);
|
||||
}
|
||||
}
|
||||
|
||||
let res = tokio::select! {
|
||||
a = runtime.running_output => a.map_err(|_| Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)).map(|_| ()),
|
||||
};
|
||||
|
||||
remove_network_for_main(svc).await?;
|
||||
|
||||
res
|
||||
}.await {
|
||||
tracing::error!("Error in persistent container: {}", e);
|
||||
tracing::debug!("{:?}", e);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
}
|
||||
})
|
||||
.into(),
|
||||
inserter.await.map_err(|_| Error::new(eyre!("Container handle dropped before inserter sent"), crate::ErrorKind::Unknown))?,
|
||||
))
|
||||
}
|
||||
26
backend/src/manager/start_stop.rs
Normal file
26
backend/src/manager/start_stop.rs
Normal file
@@ -0,0 +1,26 @@
|
||||
use crate::status::MainStatus;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub enum StartStop {
|
||||
Start,
|
||||
Stop,
|
||||
}
|
||||
|
||||
impl StartStop {
|
||||
pub(crate) fn is_start(&self) -> bool {
|
||||
matches!(self, StartStop::Start)
|
||||
}
|
||||
}
|
||||
impl From<MainStatus> for StartStop {
|
||||
fn from(value: MainStatus) -> Self {
|
||||
match value {
|
||||
MainStatus::Stopped => StartStop::Stop,
|
||||
MainStatus::Restarting => StartStop::Start,
|
||||
MainStatus::Stopping => StartStop::Stop,
|
||||
MainStatus::Starting => StartStop::Start,
|
||||
MainStatus::Running { started, health } => StartStop::Start,
|
||||
MainStatus::BackingUp { started, health } if started.is_some() => StartStop::Start,
|
||||
MainStatus::BackingUp { started, health } => StartStop::Stop,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,113 +0,0 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use chrono::Utc;
|
||||
|
||||
use super::{pause, resume, start, stop, ManagerSharedState, Status};
|
||||
use crate::status::MainStatus;
|
||||
use crate::Error;
|
||||
|
||||
/// Allocates a db handle. DO NOT CALL with a db handle already in scope
|
||||
async fn synchronize_once(shared: &ManagerSharedState) -> Result<Status, Error> {
|
||||
let mut db = shared.seed.ctx.db.handle();
|
||||
let mut status = crate::db::DatabaseModel::new()
|
||||
.package_data()
|
||||
.idx_model(&shared.seed.manifest.id)
|
||||
.expect(&mut db)
|
||||
.await?
|
||||
.installed()
|
||||
.expect(&mut db)
|
||||
.await?
|
||||
.status()
|
||||
.main()
|
||||
.get_mut(&mut db)
|
||||
.await?;
|
||||
let manager_status = *shared.status.1.borrow();
|
||||
match manager_status {
|
||||
Status::Stopped => match &mut *status {
|
||||
MainStatus::Stopped => (),
|
||||
MainStatus::Stopping => {
|
||||
*status = MainStatus::Stopped;
|
||||
}
|
||||
MainStatus::Restarting => {
|
||||
*status = MainStatus::Starting { restarting: true };
|
||||
}
|
||||
MainStatus::Starting { .. } => {
|
||||
start(shared).await?;
|
||||
}
|
||||
MainStatus::Running { started, .. } => {
|
||||
*started = Utc::now();
|
||||
start(shared).await?;
|
||||
}
|
||||
MainStatus::BackingUp { .. } => (),
|
||||
},
|
||||
Status::Starting => match *status {
|
||||
MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restarting => {
|
||||
stop(shared).await?;
|
||||
}
|
||||
MainStatus::Starting { .. } | MainStatus::Running { .. } => (),
|
||||
MainStatus::BackingUp { .. } => {
|
||||
pause(shared).await?;
|
||||
}
|
||||
},
|
||||
Status::Running => match *status {
|
||||
MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restarting => {
|
||||
stop(shared).await?;
|
||||
}
|
||||
MainStatus::Starting { .. } => {
|
||||
*status = MainStatus::Running {
|
||||
started: Utc::now(),
|
||||
health: BTreeMap::new(),
|
||||
};
|
||||
}
|
||||
MainStatus::Running { .. } => (),
|
||||
MainStatus::BackingUp { .. } => {
|
||||
pause(shared).await?;
|
||||
}
|
||||
},
|
||||
Status::Paused => match *status {
|
||||
MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restarting => {
|
||||
stop(shared).await?;
|
||||
}
|
||||
MainStatus::Starting { .. } | MainStatus::Running { .. } => {
|
||||
resume(shared).await?;
|
||||
}
|
||||
MainStatus::BackingUp { .. } => (),
|
||||
},
|
||||
Status::Shutdown => (),
|
||||
}
|
||||
status.save(&mut db).await?;
|
||||
Ok(manager_status)
|
||||
}
|
||||
|
||||
pub async fn synchronizer(shared: &ManagerSharedState) {
|
||||
let mut status_recv = shared.status.0.subscribe();
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(5)) => (),
|
||||
_ = shared.synchronize_now.notified() => (),
|
||||
_ = status_recv.changed() => (),
|
||||
}
|
||||
let status = match synchronize_once(shared).await {
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Synchronizer for {}@{} failed: {}",
|
||||
shared.seed.manifest.id,
|
||||
shared.seed.manifest.version,
|
||||
e
|
||||
);
|
||||
tracing::debug!("{:?}", e);
|
||||
continue;
|
||||
}
|
||||
Ok(status) => status,
|
||||
};
|
||||
tracing::trace!("{} status synchronized", shared.seed.manifest.id);
|
||||
shared.synchronized.notify_waiters();
|
||||
match status {
|
||||
Status::Shutdown => {
|
||||
break;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
30
backend/src/manager/transition_state.rs
Normal file
30
backend/src/manager/transition_state.rs
Normal file
@@ -0,0 +1,30 @@
|
||||
use helpers::NonDetachingJoinHandle;
|
||||
|
||||
/// Used only in the manager/mod and is used to keep track of the state of the manager during the
|
||||
/// transitional states
|
||||
pub(super) enum TransitionState {
|
||||
BackingUp(NonDetachingJoinHandle<()>),
|
||||
Restarting(NonDetachingJoinHandle<()>),
|
||||
Configuring(NonDetachingJoinHandle<()>),
|
||||
None,
|
||||
}
|
||||
|
||||
impl TransitionState {
|
||||
pub(super) fn join_handle(&self) -> Option<&NonDetachingJoinHandle<()>> {
|
||||
Some(match self {
|
||||
TransitionState::BackingUp(a) => a,
|
||||
TransitionState::Restarting(a) => a,
|
||||
TransitionState::Configuring(a) => a,
|
||||
TransitionState::None => return None,
|
||||
})
|
||||
}
|
||||
pub(super) fn abort(&self) {
|
||||
self.join_handle().map(|transition| transition.abort());
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TransitionState {
|
||||
fn default() -> Self {
|
||||
TransitionState::None
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user