feature: Adding in the stopping state (#2677)

* feature: Adding in the stopping state

* chore: Deal with timeout in the sigterm for main

* chore: Update the timeout

* Update web/projects/ui/src/app/pages/apps-routes/app-list/app-list-pkg/app-list-pkg.component.ts

Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>

* Update web/projects/ui/src/app/pages/apps-routes/app-show/components/app-show-status/app-show-status.component.ts

Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>

---------

Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>
This commit is contained in:
Jade
2024-07-22 11:40:12 -06:00
committed by GitHub
parent 196561fed2
commit 3eb0093d2a
18 changed files with 282 additions and 156 deletions

View File

@@ -2,7 +2,7 @@ import { polyfillEffects } from "./polyfillEffects"
import { DockerProcedureContainer } from "./DockerProcedureContainer"
import { SystemForEmbassy } from "."
import { hostSystemStartOs } from "../../HostSystemStartOs"
import { Daemons, T, daemons } from "@start9labs/start-sdk"
import { Daemons, T, daemons, utils } from "@start9labs/start-sdk"
import { Daemon } from "@start9labs/start-sdk/cjs/lib/mainFn/Daemon"
import { Effects } from "../../../Models/Effects"
@@ -58,6 +58,9 @@ export class MainLoop {
currentCommand,
{
overlay: dockerProcedureContainer.overlay,
sigtermTimeout: utils.inMs(
this.system.manifest.main["sigterm-timeout"],
),
},
)
daemon.start()

View File

@@ -481,20 +481,20 @@ export class SystemForEmbassy implements System {
private async mainStop(
effects: Effects,
timeoutMs: number | null,
): Promise<Duration> {
const { currentRunning } = this
this.currentRunning?.clean()
delete this.currentRunning
if (currentRunning) {
await currentRunning.clean({
timeout: fromDuration(this.manifest.main["sigterm-timeout"]),
})
): Promise<void> {
try {
const { currentRunning } = this
this.currentRunning?.clean()
delete this.currentRunning
if (currentRunning) {
await currentRunning.clean({
timeout: utils.inMs(this.manifest.main["sigterm-timeout"]),
})
}
return
} finally {
await effects.setMainStatus({ status: "stopped" })
}
const durationValue = duration(
fromDuration(this.manifest.main["sigterm-timeout"]),
"s",
)
return durationValue
}
private async createBackup(
effects: Effects,

View File

@@ -139,10 +139,13 @@ export class SystemForStartOs implements System {
return
}
case "/main/stop": {
if (this.onTerm) await this.onTerm()
await effects.setMainStatus({ status: "stopped" })
delete this.onTerm
return duration(30, "s")
try {
if (this.onTerm) await this.onTerm()
delete this.onTerm
return
} finally {
await effects.setMainStatus({ status: "stopped" })
}
}
case "/config/set": {
const input = options.input as any // TODO

View File

@@ -10,6 +10,7 @@ use models::{HealthCheckId, PackageId, ProcedureName};
use persistent_container::PersistentContainer;
use rpc_toolkit::{from_fn_async, CallRemoteHandler, Empty, HandlerArgs, HandlerFor};
use serde::{Deserialize, Serialize};
use service_actor::ServiceActor;
use start_stop::StartStop;
use tokio::sync::Notify;
use ts_rs::TS;
@@ -45,6 +46,7 @@ mod dependencies;
pub mod persistent_container;
mod properties;
mod rpc;
mod service_actor;
pub mod service_effect_handler;
pub mod service_map;
mod start_stop;
@@ -482,124 +484,6 @@ impl ServiceActorSeed {
});
}
}
#[derive(Clone)]
struct ServiceActor(Arc<ServiceActorSeed>);
impl Actor for ServiceActor {
fn init(&mut self, jobs: &BackgroundJobQueue) {
let seed = self.0.clone();
jobs.add_job(async move {
let id = seed.id.clone();
let mut current = seed.persistent_container.state.subscribe();
loop {
let kinds = current.borrow().kinds();
if let Err(e) = async {
let main_status = match (
kinds.transition_state,
kinds.desired_state,
kinds.running_status,
) {
(Some(TransitionKind::Restarting), StartStop::Stop, Some(_)) => {
seed.persistent_container.stop().await?;
MainStatus::Restarting
}
(Some(TransitionKind::Restarting), StartStop::Start, _) => {
seed.persistent_container.start().await?;
MainStatus::Restarting
}
(Some(TransitionKind::Restarting), _, _) => MainStatus::Restarting,
(Some(TransitionKind::Restoring), _, _) => MainStatus::Restoring,
(Some(TransitionKind::BackingUp), StartStop::Stop, Some(status)) => {
seed.persistent_container.stop().await?;
MainStatus::BackingUp {
started: Some(status.started),
health: status.health.clone(),
}
}
(Some(TransitionKind::BackingUp), StartStop::Start, _) => {
seed.persistent_container.start().await?;
MainStatus::BackingUp {
started: None,
health: OrdMap::new(),
}
}
(Some(TransitionKind::BackingUp), _, _) => MainStatus::BackingUp {
started: None,
health: OrdMap::new(),
},
(None, StartStop::Stop, None) => MainStatus::Stopped,
(None, StartStop::Stop, Some(_)) => MainStatus::Stopping {
timeout: seed.persistent_container.stop().await?.into(),
},
(None, StartStop::Start, Some(status)) => MainStatus::Running {
started: status.started,
health: status.health.clone(),
},
(None, StartStop::Start, None) => {
seed.persistent_container.start().await?;
MainStatus::Starting
}
};
seed.ctx
.db
.mutate(|d| {
if let Some(i) = d.as_public_mut().as_package_data_mut().as_idx_mut(&id)
{
let previous = i.as_status().as_main().de()?;
let previous_health = previous.health();
let previous_started = previous.started();
let mut main_status = main_status;
match &mut main_status {
&mut MainStatus::Running { ref mut health, .. }
| &mut MainStatus::BackingUp { ref mut health, .. } => {
*health = previous_health.unwrap_or(health).clone();
}
_ => (),
};
match &mut main_status {
MainStatus::Running {
ref mut started, ..
} => {
*started = previous_started.unwrap_or(*started);
}
MainStatus::BackingUp {
ref mut started, ..
} => {
*started = previous_started.map(Some).unwrap_or(*started);
}
_ => (),
};
i.as_status_mut().as_main_mut().ser(&main_status)?;
}
Ok(())
})
.await?;
Ok::<_, Error>(())
}
.await
{
tracing::error!("error synchronizing state of service: {e}");
tracing::debug!("{e:?}");
seed.synchronized.notify_waiters();
tracing::error!("Retrying in {}s...", SYNC_RETRY_COOLDOWN_SECONDS);
tokio::time::sleep(Duration::from_secs(SYNC_RETRY_COOLDOWN_SECONDS)).await;
continue;
}
seed.synchronized.notify_waiters();
tokio::select! {
_ = current.changed() => (),
}
}
})
}
}
#[derive(Deserialize, Serialize, Parser, TS)]
pub struct ConnectParams {

View File

@@ -441,11 +441,11 @@ impl PersistentContainer {
}
#[instrument(skip_all)]
pub async fn stop(&self) -> Result<Duration, Error> {
pub async fn stop(&self) -> Result<(), Error> {
let timeout: Option<crate::util::serde::Duration> = self
.execute(Guid::new(), ProcedureName::StopMain, Value::Null, None)
.await?;
Ok(timeout.map(|a| *a).unwrap_or(Duration::from_secs(30)))
Ok(())
}
#[instrument(skip_all)]

View File

@@ -0,0 +1,159 @@
use std::sync::Arc;
use std::time::Duration;
use imbl::OrdMap;
use models::PackageId;
use super::start_stop::StartStop;
use crate::prelude::*;
use crate::service::transition::TransitionKind;
use crate::service::SYNC_RETRY_COOLDOWN_SECONDS;
use crate::status::MainStatus;
use crate::util::actor::background::BackgroundJobQueue;
use crate::util::actor::Actor;
use super::ServiceActorSeed;
#[derive(Clone)]
pub(super) struct ServiceActor(pub(super) Arc<ServiceActorSeed>);
enum ServiceActorLoopNext {
Wait,
DontWait,
}
impl Actor for ServiceActor {
fn init(&mut self, jobs: &BackgroundJobQueue) {
let seed = self.0.clone();
jobs.add_job(async move {
let id = seed.id.clone();
let mut current = seed.persistent_container.state.subscribe();
loop {
match service_actor_loop(&current, &seed, &id).await {
ServiceActorLoopNext::Wait => tokio::select! {
_ = current.changed() => (),
},
ServiceActorLoopNext::DontWait => (),
}
}
})
}
}
async fn service_actor_loop(
current: &tokio::sync::watch::Receiver<super::persistent_container::ServiceState>,
seed: &Arc<ServiceActorSeed>,
id: &PackageId,
) -> ServiceActorLoopNext {
let kinds = current.borrow().kinds();
if let Err(e) = async {
let main_status = match (
kinds.transition_state,
kinds.desired_state,
kinds.running_status,
) {
(Some(TransitionKind::Restarting), StartStop::Stop, Some(_)) => {
seed.persistent_container.stop().await?;
MainStatus::Restarting
}
(Some(TransitionKind::Restarting), StartStop::Start, _) => {
seed.persistent_container.start().await?;
MainStatus::Restarting
}
(Some(TransitionKind::Restarting), _, _) => MainStatus::Restarting,
(Some(TransitionKind::Restoring), _, _) => MainStatus::Restoring,
(Some(TransitionKind::BackingUp), StartStop::Stop, Some(status)) => {
seed.persistent_container.stop().await?;
MainStatus::BackingUp {
started: Some(status.started),
health: status.health.clone(),
}
}
(Some(TransitionKind::BackingUp), StartStop::Start, _) => {
seed.persistent_container.start().await?;
MainStatus::BackingUp {
started: None,
health: OrdMap::new(),
}
}
(Some(TransitionKind::BackingUp), _, _) => MainStatus::BackingUp {
started: None,
health: OrdMap::new(),
},
(None, StartStop::Stop, None) => MainStatus::Stopped,
(None, StartStop::Stop, Some(_)) => {
let task_seed = seed.clone();
seed.ctx
.db
.mutate(|d| {
if let Some(i) = d.as_public_mut().as_package_data_mut().as_idx_mut(&id) {
i.as_status_mut().as_main_mut().ser(&MainStatus::Stopping)?;
}
Ok(())
})
.await?;
task_seed.persistent_container.stop().await?;
MainStatus::Stopped
}
(None, StartStop::Start, Some(status)) => MainStatus::Running {
started: status.started,
health: status.health.clone(),
},
(None, StartStop::Start, None) => {
seed.persistent_container.start().await?;
MainStatus::Starting
}
};
seed.ctx
.db
.mutate(|d| {
if let Some(i) = d.as_public_mut().as_package_data_mut().as_idx_mut(&id) {
let previous = i.as_status().as_main().de()?;
let previous_health = previous.health();
let previous_started = previous.started();
let mut main_status = main_status;
match &mut main_status {
&mut MainStatus::Running { ref mut health, .. }
| &mut MainStatus::BackingUp { ref mut health, .. } => {
*health = previous_health.unwrap_or(health).clone();
}
_ => (),
};
match &mut main_status {
MainStatus::Running {
ref mut started, ..
} => {
*started = previous_started.unwrap_or(*started);
}
MainStatus::BackingUp {
ref mut started, ..
} => {
*started = previous_started.map(Some).unwrap_or(*started);
}
_ => (),
};
i.as_status_mut().as_main_mut().ser(&main_status)?;
}
Ok(())
})
.await?;
Ok::<_, Error>(())
}
.await
{
tracing::error!("error synchronizing state of service: {e}");
tracing::debug!("{e:?}");
seed.synchronized.notify_waiters();
tracing::error!("Retrying in {}s...", SYNC_RETRY_COOLDOWN_SECONDS);
tokio::time::sleep(Duration::from_secs(SYNC_RETRY_COOLDOWN_SECONDS)).await;
return ServiceActorLoopNext::DontWait;
}
seed.synchronized.notify_waiters();
ServiceActorLoopNext::Wait
}

View File

@@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::{collections::BTreeMap, sync::Arc};
use chrono::{DateTime, Utc};
use imbl::OrdMap;
@@ -6,8 +6,8 @@ use serde::{Deserialize, Serialize};
use ts_rs::TS;
use self::health_check::HealthCheckId;
use crate::prelude::*;
use crate::status::health_check::HealthCheckResult;
use crate::{prelude::*, util::GeneralGuard};
pub mod health_check;
#[derive(Clone, Debug, Deserialize, Serialize, HasModel, TS)]
@@ -26,10 +26,7 @@ pub enum MainStatus {
Stopped,
Restarting,
Restoring,
#[serde(rename_all = "camelCase")]
Stopping {
timeout: crate::util::serde::Duration,
},
Stopping,
Starting,
#[serde(rename_all = "camelCase")]
Running {

View File

@@ -1,3 +1,4 @@
import { DEFAULT_SIGTERM_TIMEOUT } from "."
import { NO_TIMEOUT, SIGKILL, SIGTERM } from "../StartSdk"
import { SDKManifest } from "../manifest/ManifestTypes"
import { Effects, ImageId, ValidIfNoStupidEscape } from "../types"
@@ -10,6 +11,7 @@ export class CommandController {
readonly runningAnswer: Promise<unknown>,
readonly overlay: Overlay,
readonly pid: number | undefined,
readonly sigtermTimeout: number = DEFAULT_SIGTERM_TIMEOUT,
) {}
static of<Manifest extends SDKManifest>() {
return async <A extends string>(
@@ -20,6 +22,8 @@ export class CommandController {
},
command: ValidIfNoStupidEscape<A> | [string, ...string[]],
options: {
// Defaults to the DEFAULT_SIGTERM_TIMEOUT = 30_000ms
sigtermTimeout?: number
mounts?: { path: string; options: MountOptions }[]
overlay?: Overlay
env?:
@@ -67,10 +71,14 @@ export class CommandController {
const pid = childProcess.pid
return new CommandController(answer, overlay, pid)
return new CommandController(answer, overlay, pid, options.sigtermTimeout)
}
}
async wait() {
async wait(timeout: number = NO_TIMEOUT) {
if (timeout > 0)
setTimeout(() => {
this.term()
}, timeout)
try {
return await this.runningAnswer
} finally {
@@ -82,7 +90,7 @@ export class CommandController {
await this.overlay.destroy().catch((_) => {})
}
}
async term({ signal = SIGTERM, timeout = NO_TIMEOUT } = {}) {
async term({ signal = SIGTERM, timeout = this.sigtermTimeout } = {}) {
if (this.pid === undefined) return
try {
await cpExecFile("pkill", [

View File

@@ -34,6 +34,7 @@ export class Daemon {
user?: string | undefined
onStdout?: (x: Buffer) => void
onStderr?: (x: Buffer) => void
sigtermTimeout?: number
},
) => {
const startCommand = () =>

View File

@@ -44,6 +44,7 @@ type DaemonsParams<
env?: Record<string, string>
ready: Ready
requires: Exclude<Ids, Id>[]
sigtermTimeout?: number
}
type ErrorDuplicateId<Id extends string> = `The id '${Id}' is already used`
@@ -136,6 +137,7 @@ export class Daemons<Manifest extends SDKManifest, Ids extends string> {
this.ids,
options.ready,
this.effects,
options.sigtermTimeout,
)
const daemons = this.daemons.concat(daemon)
const ids = [...this.ids, id] as (Ids | Id)[]

View File

@@ -3,6 +3,7 @@ import { defaultTrigger } from "../trigger/defaultTrigger"
import { Ready } from "./Daemons"
import { Daemon } from "./Daemon"
import { Effects } from "../types"
import { DEFAULT_SIGTERM_TIMEOUT } from "."
const oncePromise = <T>() => {
let resolve: (value: T) => void
@@ -32,6 +33,7 @@ export class HealthDaemon {
readonly ids: string[],
readonly ready: Ready,
readonly effects: Effects,
readonly sigtermTimeout: number = DEFAULT_SIGTERM_TIMEOUT,
) {
this.updateStatus()
this.dependencies.forEach((d) => d.addWatcher(() => this.updateStatus()))
@@ -46,7 +48,12 @@ export class HealthDaemon {
this.#running = false
this.#healthCheckCleanup?.()
await this.daemon.then((d) => d.stop(termOptions))
await this.daemon.then((d) =>
d.stop({
timeout: this.sigtermTimeout,
...termOptions,
}),
)
}
/** Want to add another notifier that the health might have changed */

View File

@@ -7,6 +7,7 @@ import "./Daemons"
import { SDKManifest } from "../manifest/ManifestTypes"
import { MainEffects } from "../StartSdk"
export const DEFAULT_SIGTERM_TIMEOUT = 30_000
/**
* Used to ensure that the main function is running with the valid proofs.
* We first do the folowing order of things

View File

@@ -1,5 +1,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { Duration } from "./Duration"
import type { HealthCheckId } from "./HealthCheckId"
import type { HealthCheckResult } from "./HealthCheckResult"
@@ -7,7 +6,7 @@ export type MainStatus =
| { status: "stopped" }
| { status: "restarting" }
| { status: "restoring" }
| { status: "stopping"; timeout: Duration }
| { status: "stopping" }
| { status: "starting" }
| {
status: "running"

34
sdk/lib/util/inMs.test.ts Normal file
View File

@@ -0,0 +1,34 @@
import { inMs } from "./inMs"
describe("inMs", () => {
test("28.001s", () => {
expect(inMs("28.001s")).toBe(28001)
})
test("28.123s", () => {
expect(inMs("28.123s")).toBe(28123)
})
test(".123s", () => {
expect(inMs(".123s")).toBe(123)
})
test("123ms", () => {
expect(inMs("123ms")).toBe(123)
})
test("1h", () => {
expect(inMs("1h")).toBe(3600000)
})
test("1m", () => {
expect(inMs("1m")).toBe(60000)
})
test("1m", () => {
expect(inMs("1d")).toBe(1000 * 60 * 60 * 24)
})
test("123", () => {
expect(() => inMs("123")).toThrowError("Invalid time format: 123")
})
test("123 as number", () => {
expect(inMs(123)).toBe(123)
})
test.only("undefined", () => {
expect(inMs(undefined)).toBe(undefined)
})
})

31
sdk/lib/util/inMs.ts Normal file
View File

@@ -0,0 +1,31 @@
import { DEFAULT_SIGTERM_TIMEOUT } from "../mainFn"
const matchTimeRegex = /^\s*(\d+)?(\.\d+)?\s*(ms|s|m|h|d)/
const unitMultiplier = (unit?: string) => {
if (!unit) return 1
if (unit === "ms") return 1
if (unit === "s") return 1000
if (unit === "m") return 1000 * 60
if (unit === "h") return 1000 * 60 * 60
if (unit === "d") return 1000 * 60 * 60 * 24
throw new Error(`Invalid unit: ${unit}`)
}
const digitsMs = (digits: string | null, multiplier: number) => {
if (!digits) return 0
const value = parseInt(digits.slice(1))
const divideBy = multiplier / Math.pow(10, digits.length - 1)
return Math.round(value * divideBy)
}
export const inMs = (time?: string | number) => {
if (typeof time === "number") return time
if (!time) return undefined
const matches = time.match(matchTimeRegex)
if (!matches) throw new Error(`Invalid time format: ${time}`)
const [_, leftHandSide, digits, unit] = matches
const multiplier = unitMultiplier(unit)
const firstValue = parseInt(leftHandSide || "0") * multiplier
const secondValue = digitsMs(digits, multiplier)
return firstValue + secondValue
}

View File

@@ -12,3 +12,4 @@ export { addressHostToUrl } from "./getServiceInterface"
export { hostnameInfoToAddress } from "./Hostname"
export * from "./typeHelpers"
export { getDefaultString } from "./getDefaultString"
export { inMs } from "./inMs"

View File

@@ -24,9 +24,7 @@ export class AppListPkgComponent {
}
get sigtermTimeout(): string | null {
return this.pkgMainStatus.status === 'stopping'
? this.pkgMainStatus.timeout
: null
return this.pkgMainStatus.status === 'stopping' ? '30s' : null // @dr-bonez TODO
}
launchUi(e: Event, interfaces: PackageDataEntry['serviceInterfaces']): void {

View File

@@ -76,9 +76,7 @@ export class AppShowStatusComponent {
}
get sigtermTimeout(): string | null {
return this.pkgStatus?.main.status === 'stopping'
? this.pkgStatus.main.timeout
: null
return this.pkgStatus?.main.status === 'stopping' ? '30s' : null // @dr-bonez TODO
}
launchUi(interfaces: PackageDataEntry['serviceInterfaces']): void {