From 093a5d4ddf8f7a6ea970564b5b38e1306789a6cc Mon Sep 17 00:00:00 2001 From: J H Date: Wed, 6 Mar 2024 09:38:55 -0700 Subject: [PATCH] chore: Simplify the state into one --- .../Systems/SystemForEmbassy/index.ts | 4 +- .../src/Adapters/Systems/SystemForStartOs.ts | 3 +- container-runtime/src/Models/Duration.ts | 6 + core/startos/src/service/control.rs | 21 +-- core/startos/src/service/mod.rs | 158 +++++++----------- .../src/service/persistent_container.rs | 68 +++++--- core/startos/src/service/transition/mod.rs | 36 ++-- .../startos/src/service/transition/restart.rs | 31 +++- 8 files changed, 174 insertions(+), 153 deletions(-) create mode 100644 container-runtime/src/Models/Duration.ts diff --git a/container-runtime/src/Adapters/Systems/SystemForEmbassy/index.ts b/container-runtime/src/Adapters/Systems/SystemForEmbassy/index.ts index 4814da9b2..8495a669b 100644 --- a/container-runtime/src/Adapters/Systems/SystemForEmbassy/index.ts +++ b/container-runtime/src/Adapters/Systems/SystemForEmbassy/index.ts @@ -2,6 +2,7 @@ import { types as T, util, EmVer } from "@start9labs/start-sdk" import * as fs from "fs/promises" import { PolyfillEffects } from "./polyfillEffects" +import { Duration, duration } from "../../../Models/Duration" import { ExecuteResult, System } from "../../../Interfaces/System" import { matchManifest, Manifest, Procedure } from "./matchManifest" import { create } from "domain" @@ -202,7 +203,7 @@ export class SystemForEmbassy implements System { private async mainStop( effects: HostSystemStartOs, options?: { timeout?: number }, - ): Promise { + ): Promise { const { currentRunning } = this delete this.currentRunning if (currentRunning) { @@ -210,6 +211,7 @@ export class SystemForEmbassy implements System { timeout: options?.timeout || this.manifest.main["sigterm-timeout"], }) } + return duration(this.manifest.main["sigterm-timeout"], "s") } private async createBackup(effects: HostSystemStartOs): Promise { const backup = this.manifest.backup.create diff --git a/container-runtime/src/Adapters/Systems/SystemForStartOs.ts b/container-runtime/src/Adapters/Systems/SystemForStartOs.ts index 95afb5fb4..7549bf0f2 100644 --- a/container-runtime/src/Adapters/Systems/SystemForStartOs.ts +++ b/container-runtime/src/Adapters/Systems/SystemForStartOs.ts @@ -4,6 +4,7 @@ import { string } from "ts-matches" import { HostSystemStartOs } from "../HostSystemStartOs" import { Effects } from "../../Models/Effects" import { RpcResult } from "../RpcListener" +import { duration } from "../../Models/Duration" const LOCATION = "/usr/lib/startos/package/startos" export class SystemForStartOs implements System { private onTerm: (() => Promise) | undefined @@ -82,7 +83,7 @@ export class SystemForStartOs implements System { await effects.setMainStatus({ status: "stopped" }) if (this.onTerm) await this.onTerm() delete this.onTerm - return + return duration(30, "s") } case "/config/set": { const path = `${LOCATION}/procedures/config` diff --git a/container-runtime/src/Models/Duration.ts b/container-runtime/src/Models/Duration.ts new file mode 100644 index 000000000..8c701a703 --- /dev/null +++ b/container-runtime/src/Models/Duration.ts @@ -0,0 +1,6 @@ +export type TimeUnit = "d" | "h" | "s" | "ms" +export type Duration = `${number}${TimeUnit}` + +export function duration(timeValue: number, timeUnit: TimeUnit = "s") { + return `${timeValue}${timeUnit}` as Duration +} diff --git a/core/startos/src/service/control.rs b/core/startos/src/service/control.rs index 17c432755..88d66d97c 100644 --- a/core/startos/src/service/control.rs +++ b/core/startos/src/service/control.rs @@ -9,7 +9,9 @@ struct Start; impl Handler for ServiceActor { type Response = (); async fn handle(&mut self, _: Start, _: &mut BackgroundJobs) -> Self::Response { - self.0.desired_state.send_replace(StartStop::Start); + self.0.persistent_container.state.send_modify(|x| { + x.desired_state = StartStop::Start; + }); self.0.synchronized.notified().await } } @@ -24,16 +26,15 @@ struct Stop; impl Handler for ServiceActor { type Response = (); async fn handle(&mut self, _: Stop, _: &mut BackgroundJobs) -> Self::Response { - self.0.desired_state.send_replace(StartStop::Stop); - if self.0.transition_state.borrow().as_ref().map(|t| t.kind()) - == Some(TransitionKind::Restarting) - { - if let Some(restart) = self.0.transition_state.send_replace(None) { - restart.abort().await; - } else { - #[cfg(feature = "unstable")] - unreachable!() + let mut transition_state = None; + self.0.persistent_container.state.send_modify(|x| { + x.desired_state = StartStop::Stop; + if x.transition_state.as_ref().map(|x| x.kind()) == Some(TransitionKind::Restarting) { + transition_state = std::mem::take(&mut x.transition_state); } + }); + if let Some(restart) = transition_state { + restart.abort().await; } self.0.synchronized.notified().await } diff --git a/core/startos/src/service/mod.rs b/core/startos/src/service/mod.rs index 5dba4cdb4..3335be1ca 100644 --- a/core/startos/src/service/mod.rs +++ b/core/startos/src/service/mod.rs @@ -1,5 +1,5 @@ -use std::sync::Arc; use std::time::Duration; +use std::{ops::Deref, sync::Arc}; use chrono::{DateTime, Utc}; use clap::Parser; @@ -70,23 +70,17 @@ impl Service { #[instrument(skip_all)] async fn new(ctx: RpcContext, s9pk: S9pk, start: StartStop) -> Result { let id = s9pk.as_manifest().id.clone(); - let desired_state = watch::channel(start).0; - let temp_desired_state = TempDesiredState(Arc::new(watch::channel(None).0)); let persistent_container = PersistentContainer::new( - &ctx, - s9pk, + &ctx, s9pk, + start, // desired_state.subscribe(), // temp_desired_state.subscribe(), ) .await?; let seed = Arc::new(ServiceActorSeed { id, - running_status: persistent_container.running_status.subscribe(), persistent_container, ctx, - desired_state, - temp_desired_state, - transition_state: Arc::new(watch::channel(None).0), synchronized: Arc::new(Notify::new()), }); seed.persistent_container @@ -383,7 +377,7 @@ impl Service { .await?; self.shutdown().await } - pub async fn backup(&self, guard: impl GenericMountGuard) -> Result { + pub async fn backup(&self, _guard: impl GenericMountGuard) -> Result { // TODO Err(Error::new(eyre!("not yet implemented"), ErrorKind::Unknown)) } @@ -395,45 +389,36 @@ struct RunningStatus { started: DateTime, } -pub(self) struct ServiceActorSeed { +struct ServiceActorSeed { ctx: RpcContext, id: PackageId, - // Needed to interact with the container for the service + /// Needed to interact with the container for the service persistent_container: PersistentContainer, - // Setting this value causes the service actor to try to bring the service to the specified state. This is done in the background job created in ServiceActor::init - desired_state: watch::Sender, - // Override the current desired state for the service during a transition (this is protected by a guard that sets this value to null on drop) - temp_desired_state: TempDesiredState, - // This represents a currently running task that affects the service's shown state, such as BackingUp or Restarting. - transition_state: Arc>>, - // This contains the start time and health check information for when the service is running. Note: Will be overwritting to the db, - running_status: watch::Receiver>, - // This is notified every time the background job created in ServiceActor::init responds to a change + /// This is notified every time the background job created in ServiceActor::init responds to a change synchronized: Arc, } impl ServiceActorSeed { + /// Used to indicate that we have finished the task of starting the service pub fn started(&self) { - self.persistent_container - .current_state - .send_replace(StartStop::Start); - self.persistent_container - .running_status - .send_modify(|running_status| { - *running_status = - Some( - std::mem::take(running_status).unwrap_or_else(|| RunningStatus { + self.persistent_container.state.send_modify(|state| { + state.running_status = + Some( + state + .running_status + .take() + .unwrap_or_else(|| RunningStatus { health: Default::default(), started: Utc::now(), }), - ); - }) + ); + }); } + /// Used to indicate that we have finished the task of stopping the service pub fn stopped(&self) { - self.persistent_container - .current_state - .send_replace(StartStop::Stop); - self.persistent_container.running_status.send_replace(None); + self.persistent_container.state.send_modify(|state| { + state.running_status = None; + }); } } struct ServiceActor(Arc); @@ -443,20 +428,41 @@ impl Actor for ServiceActor { let seed = self.0.clone(); jobs.add_job(async move { let id = seed.id.clone(); - let mut current = seed.persistent_container.current_state.subscribe(); - let mut desired = seed.desired_state.subscribe(); - let mut temp_desired = seed.temp_desired_state.subscribe(); - let mut transition = seed.transition_state.subscribe(); - let mut running = seed.running_status.clone(); + let mut current = seed.persistent_container.state.subscribe(); + loop { - let (desired_state, current_state, transition_kind, running_status) = dbg!( - temp_desired.borrow().unwrap_or(*desired.borrow()), - *current.borrow(), - transition.borrow().as_ref().map(|t| t.kind()), - running.borrow().clone(), - ); + let kinds = dbg!(current.borrow().kinds()); if let Err(e) = async { + let main_status = match ( + kinds.transition_state, + kinds.desired_state, + kinds.running_status, + ) { + (Some(TransitionKind::Restarting), _, _) => MainStatus::Restarting, + (Some(TransitionKind::BackingUp), _, Some(status)) => { + MainStatus::BackingUp { + started: Some(status.started), + health: status.health.clone(), + } + } + (Some(TransitionKind::BackingUp), _, None) => MainStatus::BackingUp { + started: None, + health: OrdMap::new(), + }, + (None, StartStop::Stop, None) => MainStatus::Stopped, + (None, StartStop::Stop, Some(_)) => MainStatus::Stopping { + timeout: seed.persistent_container.stop().await?.into(), + }, + (None, StartStop::Start, Some(status)) => MainStatus::Running { + started: status.started, + health: status.health.clone(), + }, + (None, StartStop::Start, None) => { + seed.persistent_container.start().await?; + MainStatus::Starting + } + }; seed.ctx .db .mutate(|d| { @@ -466,61 +472,13 @@ impl Actor for ServiceActor { .as_idx_mut(&id) .and_then(|p| p.as_installed_mut()) { - i.as_status_mut().as_main_mut().ser(&match ( - transition_kind, - desired_state, - current_state, - running_status, - ) { - (Some(TransitionKind::Restarting), _, _, _) => { - MainStatus::Restarting - } - (Some(TransitionKind::BackingUp), _, _, Some(status)) => { - MainStatus::BackingUp { - started: Some(status.started), - health: status.health.clone(), - } - } - (Some(TransitionKind::BackingUp), _, _, None) => { - MainStatus::BackingUp { - started: None, - health: OrdMap::new(), - } - } - (None, StartStop::Stop, StartStop::Stop, _) => { - MainStatus::Stopped - } - (None, StartStop::Stop, StartStop::Start, _) => { - MainStatus::Stopping { - timeout: todo!("sigterm timeout"), - } - } - (None, StartStop::Start, StartStop::Stop, _) - | (None, StartStop::Start, StartStop::Start, None) => { - MainStatus::Starting - } - (None, StartStop::Start, StartStop::Start, Some(status)) => { - MainStatus::Running { - started: status.started, - health: status.health.clone(), - } - } - })?; + i.as_status_mut().as_main_mut().ser(&main_status)?; } Ok(()) }) .await?; - match (desired_state, current_state) { - (StartStop::Start, StartStop::Stop) => { - seed.persistent_container.start().await - } - (StartStop::Stop, StartStop::Start) => { - seed.persistent_container - .stop(todo!("s9pk sigterm timeout")) - .await - } - _ => Ok(()), - } + + Ok::<_, Error>(()) } .await { @@ -538,10 +496,6 @@ impl Actor for ServiceActor { tokio::select! { _ = current.changed() => (), - _ = desired.changed() => (), - _ = temp_desired.changed() => (), - _ = transition.changed() => (), - _ = running.changed() => (), } } }) diff --git a/core/startos/src/service/persistent_container.rs b/core/startos/src/service/persistent_container.rs index eee353a07..28067cdd8 100644 --- a/core/startos/src/service/persistent_container.rs +++ b/core/startos/src/service/persistent_container.rs @@ -15,8 +15,11 @@ use tokio::process::Command; use tokio::sync::{oneshot, watch, Mutex, OnceCell}; use tracing::instrument; -use super::service_effect_handler::{service_effect_handler, EffectContext}; -use super::ServiceActorSeed; +use super::{ + service_effect_handler::{service_effect_handler, EffectContext}, + transition::{TempDesiredState, TransitionKind}, +}; +use super::{transition::TransitionState, ServiceActorSeed}; use crate::context::RpcContext; use crate::disk::mount::filesystem::bind::Bind; use crate::disk::mount::filesystem::idmapped::IdMapped; @@ -39,6 +42,43 @@ const RPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); struct ProcedureId(u64); +#[derive(Debug)] +pub struct ServiceState { + // This contains the start time and health check information for when the service is running. Note: Will be overwritting to the db, + pub(super) running_status: Option, + /// Setting this value causes the service actor to try to bring the service to the specified state. This is done in the background job created in ServiceActor::init + pub(super) desired_state: StartStop, + /// Override the current desired state for the service during a transition (this is protected by a guard that sets this value to null on drop) + pub(super) temp_desired_state: Option, + /// This represents a currently running task that affects the service's shown state, such as BackingUp or Restarting. + pub(super) transition_state: Option, +} + +#[derive(Debug)] +pub struct ServiceStateKinds { + pub transition_state: Option, + pub running_status: Option, + pub desired_state: StartStop, +} + +impl ServiceState { + pub fn new(desired_state: StartStop) -> Self { + Self { + running_status: Default::default(), + temp_desired_state: Default::default(), + transition_state: Default::default(), + desired_state, + } + } + pub fn kinds(&self) -> ServiceStateKinds { + ServiceStateKinds { + transition_state: self.transition_state.as_ref().map(|x| x.kind()), + desired_state: self.temp_desired_state.unwrap_or(self.desired_state), + running_status: self.running_status.clone(), + } + } +} + // @DRB On top of this we need to also have the procedures to have the effects and get the results back for them, maybe lock them to the running instance? /// This contains the LXC container running the javascript init system /// that can be used via a JSON RPC Client connected to a unix domain @@ -53,20 +93,12 @@ pub struct PersistentContainer { volumes: BTreeMap, assets: BTreeMap, pub(super) overlays: Arc>>, - pub(super) current_state: watch::Sender, - // pub(super) desired_state: watch::Receiver, - // pub(super) temp_desired_state: watch::Receiver>, - pub(super) running_status: watch::Sender>, + pub(super) state: Arc>, } impl PersistentContainer { #[instrument(skip_all)] - pub async fn new( - ctx: &RpcContext, - s9pk: S9pk, - // desired_state: watch::Receiver, - // temp_desired_state: watch::Receiver>, - ) -> Result { + pub async fn new(ctx: &RpcContext, s9pk: S9pk, start: StartStop) -> Result { let lxc_container = ctx.lxc_manager.create(LxcConfig::default()).await?; let rpc_client = lxc_container.connect_rpc(Some(RPC_CONNECT_TIMEOUT)).await?; let js_mount = MountGuard::mount( @@ -156,10 +188,7 @@ impl PersistentContainer { volumes, assets, overlays: Arc::new(Mutex::new(BTreeMap::new())), - current_state: watch::channel(StartStop::Stop).0, - // desired_state, - // temp_desired_state, - running_status: watch::channel(None).0, + state: Arc::new(watch::channel(ServiceState::new(start)).0), }) } @@ -280,10 +309,11 @@ impl PersistentContainer { } #[instrument(skip_all)] - pub async fn stop(&self, timeout: Option) -> Result<(), Error> { - self.execute(ProcedureName::StopMain, Value::Null, timeout) + pub async fn stop(&self) -> Result { + let timeout: Option = self + .execute(ProcedureName::StopMain, Value::Null, None) .await?; - Ok(()) + Ok(timeout.map(|a| *a).unwrap_or(Duration::from_secs(30))) } #[instrument(skip_all)] diff --git a/core/startos/src/service/transition/mod.rs b/core/startos/src/service/transition/mod.rs index b472434d4..cd7979cae 100644 --- a/core/startos/src/service/transition/mod.rs +++ b/core/startos/src/service/transition/mod.rs @@ -1,5 +1,5 @@ -use std::ops::Deref; use std::sync::Arc; +use std::{fmt::Display, ops::Deref}; use futures::{Future, FutureExt}; use tokio::sync::watch; @@ -8,6 +8,8 @@ use crate::service::start_stop::StartStop; use crate::util::actor::BackgroundJobs; use crate::util::future::{CancellationHandle, RemoteCancellable}; +use super::persistent_container::ServiceState; + pub mod backup; pub mod restart; @@ -23,6 +25,13 @@ pub struct TransitionState { cancel_handle: CancellationHandle, kind: TransitionKind, } +impl ::std::fmt::Debug for TransitionState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TransitionState") + .field("kind", &self.kind) + .finish_non_exhaustive() + } +} impl TransitionState { pub fn kind(&self) -> TransitionKind { @@ -52,23 +61,28 @@ impl Drop for TransitionState { } #[derive(Debug, Clone)] -pub struct TempDesiredState(pub(super) Arc>>); +pub struct TempDesiredState(pub(super) Arc>); impl TempDesiredState { + pub fn new(state: &Arc>) -> Self { + Self(state.clone()) + } pub fn stop(&self) { - self.0.send_replace(Some(StartStop::Stop)); + self.0 + .send_modify(|s| s.temp_desired_state = Some(StartStop::Stop)); } pub fn start(&self) { - self.0.send_replace(Some(StartStop::Start)); + self.0 + .send_modify(|s| s.temp_desired_state = Some(StartStop::Start)); } } impl Drop for TempDesiredState { fn drop(&mut self) { - self.0.send_replace(None); - } -} -impl Deref for TempDesiredState { - type Target = watch::Sender>; - fn deref(&self) -> &Self::Target { - &*self.0 + self.0.send_modify(|s| s.temp_desired_state = None); } } +// impl Deref for TempDesiredState { +// type Target = watch::Sender>; +// fn deref(&self) -> &Self::Target { +// &*self.0 +// } +// } diff --git a/core/startos/src/service/transition/restart.rs b/core/startos/src/service/transition/restart.rs index 71a889305..9c82d0282 100644 --- a/core/startos/src/service/transition/restart.rs +++ b/core/startos/src/service/transition/restart.rs @@ -1,32 +1,45 @@ +use std::sync::Arc; + use futures::FutureExt; use crate::prelude::*; -use crate::service::start_stop::StartStop; use crate::service::transition::{TransitionKind, TransitionState}; use crate::service::{Service, ServiceActor}; use crate::util::actor::{BackgroundJobs, Handler}; use crate::util::future::RemoteCancellable; +use super::TempDesiredState; + struct Restart; #[async_trait::async_trait] impl Handler for ServiceActor { type Response = (); async fn handle(&mut self, _: Restart, jobs: &mut BackgroundJobs) -> Self::Response { - let temp = self.0.temp_desired_state.clone(); - let mut current = self.0.persistent_container.current_state.subscribe(); + // So Need a handle to just a single field in the state + let temp = TempDesiredState::new(&self.0.persistent_container.state); + let mut current = self.0.persistent_container.state.subscribe(); let transition = RemoteCancellable::new(async move { temp.stop(); - current.wait_for(|s| *s == StartStop::Stop).await; + current.wait_for(|s| s.running_status.is_none()).await; temp.start(); - current.wait_for(|s| *s == StartStop::Start).await; + current.wait_for(|s| s.running_status.is_some()).await; + drop(temp); }); let cancel_handle = transition.cancellation_handle(); jobs.add_job(transition.map(|_| ())); let notified = self.0.synchronized.notified(); - if let Some(t) = self.0.transition_state.send_replace(Some(TransitionState { - kind: TransitionKind::Restarting, - cancel_handle, - })) { + + let mut old = None; + self.0.persistent_container.state.send_modify(|s| { + old = std::mem::replace( + &mut s.transition_state, + Some(TransitionState { + kind: TransitionKind::Restarting, + cancel_handle, + }), + ) + }); + if let Some(t) = old { t.abort().await; } notified.await