From 2464d255d5117a2d78834e634057d199ab38af2d Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Fri, 6 Jun 2025 14:35:03 -0600 Subject: [PATCH] improve daemons init system (#2960) * repeatable command launch fn * allow js fn for daemon exec * improve daemon init system * fixes from testing --- container-runtime/package-lock.json | 2 +- .../Systems/SystemForEmbassy/MainLoop.ts | 20 ++-- .../SystemForEmbassy/polyfillEffects.ts | 9 +- core/startos/src/init.rs | 1 + core/startos/src/registry/os/version/mod.rs | 13 +-- sdk/package/lib/mainFn/CommandController.ts | 103 ++++++++++++------ sdk/package/lib/mainFn/Daemon.ts | 64 +++++------ sdk/package/lib/mainFn/Daemons.ts | 50 ++++++--- sdk/package/lib/mainFn/HealthDaemon.ts | 24 ++-- sdk/package/lib/mainFn/Oneshot.ts | 36 ++---- sdk/package/lib/util/SubContainer.ts | 21 +++- sdk/package/package-lock.json | 4 +- sdk/package/package.json | 2 +- 13 files changed, 187 insertions(+), 162 deletions(-) diff --git a/container-runtime/package-lock.json b/container-runtime/package-lock.json index cb64a31fa..2d79a9c6d 100644 --- a/container-runtime/package-lock.json +++ b/container-runtime/package-lock.json @@ -38,7 +38,7 @@ }, "../sdk/dist": { "name": "@start9labs/start-sdk", - "version": "0.4.0-beta.26", + "version": "0.4.0-beta.27", "license": "MIT", "dependencies": { "@iarna/toml": "^3.0.0", diff --git a/container-runtime/src/Adapters/Systems/SystemForEmbassy/MainLoop.ts b/container-runtime/src/Adapters/Systems/SystemForEmbassy/MainLoop.ts index a5539b47f..cb00697d4 100644 --- a/container-runtime/src/Adapters/Systems/SystemForEmbassy/MainLoop.ts +++ b/container-runtime/src/Adapters/Systems/SystemForEmbassy/MainLoop.ts @@ -67,20 +67,14 @@ export class MainLoop { this.system.manifest.volumes, `Main - ${currentCommand.join(" ")}`, ) - const daemon = await Daemon.of()( - this.effects, - subcontainer, - currentCommand, - { - runAsInit: true, - env: { - TINI_SUBREAPER: "true", - }, - sigtermTimeout: utils.inMs( - this.system.manifest.main["sigterm-timeout"], - ), + const daemon = await Daemon.of()(this.effects, subcontainer, { + command: currentCommand, + runAsInit: true, + env: { + TINI_SUBREAPER: "true", }, - ) + sigtermTimeout: utils.inMs(this.system.manifest.main["sigterm-timeout"]), + }) daemon.start() return { diff --git a/container-runtime/src/Adapters/Systems/SystemForEmbassy/polyfillEffects.ts b/container-runtime/src/Adapters/Systems/SystemForEmbassy/polyfillEffects.ts index 58a170d23..c5baf2586 100644 --- a/container-runtime/src/Adapters/Systems/SystemForEmbassy/polyfillEffects.ts +++ b/container-runtime/src/Adapters/Systems/SystemForEmbassy/polyfillEffects.ts @@ -135,12 +135,9 @@ export const polyfillEffects = ( [input.command, ...(input.args || [])].join(" "), ) const daemon = promiseSubcontainer.then((subcontainer) => - daemons.runCommand()( - effects, - subcontainer, - [input.command, ...(input.args || [])], - {}, - ), + daemons.runCommand()(effects, subcontainer, { + command: [input.command, ...(input.args || [])], + }), ) return { wait: () => diff --git a/core/startos/src/init.rs b/core/startos/src/init.rs index 3b7d834e6..d03a0c72a 100644 --- a/core/startos/src/init.rs +++ b/core/startos/src/init.rs @@ -265,6 +265,7 @@ pub async fn run_script>(path: P, mut progress: PhaseProgressTrac .input(Some(&mut reader)) .invoke(ErrorKind::Unknown) .await?; + // TODO: inherit? Ok::<_, Error>(()) } diff --git a/core/startos/src/registry/os/version/mod.rs b/core/startos/src/registry/os/version/mod.rs index ddbf92b24..6c9a58328 100644 --- a/core/startos/src/registry/os/version/mod.rs +++ b/core/startos/src/registry/os/version/mod.rs @@ -140,8 +140,6 @@ pub struct GetOsVersionParams { #[ts(type = "string | null")] #[arg(long)] pub target_version: Option, - #[arg(long)] - pub include_prerelease: Option, #[arg(long = "id")] server_id: Option, #[ts(type = "string | null")] @@ -158,7 +156,6 @@ pub async fn get_version( GetOsVersionParams { source_version: source, target_version: target, - include_prerelease, server_id, platform, device_info, @@ -166,9 +163,6 @@ pub async fn get_version( ) -> Result, Error> { let source = source.or_else(|| device_info.as_ref().map(|d| d.os.version.clone())); let platform = platform.or_else(|| device_info.as_ref().map(|d| d.os.platform.clone())); - let include_prerelease = include_prerelease - .or_else(|| source.as_ref().map(|s| !s.prerelease().is_empty())) - .unwrap_or(cfg!(feature = "dev")); if let (Some(pool), Some(server_id), Some(arch)) = (&ctx.pool, server_id, &platform) { let created_at = Utc::now(); @@ -192,10 +186,9 @@ pub async fn get_version( .into_iter() .map(|(v, i)| i.de().map(|i| (v, i))) .filter_ok(|(version, info)| { - (version.prerelease().is_empty() || include_prerelease) - && platform - .as_ref() - .map_or(true, |p| info.squashfs.contains_key(p)) + platform + .as_ref() + .map_or(true, |p| info.squashfs.contains_key(p)) && version.satisfies(&target) && source .as_ref() diff --git a/sdk/package/lib/mainFn/CommandController.ts b/sdk/package/lib/mainFn/CommandController.ts index 4d0617113..ea58f08f9 100644 --- a/sdk/package/lib/mainFn/CommandController.ts +++ b/sdk/package/lib/mainFn/CommandController.ts @@ -7,13 +7,14 @@ import { Drop, splitCommand } from "../util" import * as cp from "child_process" import * as fs from "node:fs/promises" import { Mounts } from "./Mounts" +import { DaemonCommandType } from "./Daemons" export class CommandController extends Drop { private constructor( - readonly runningAnswer: Promise, + readonly runningAnswer: Promise, private state: { exited: boolean }, private readonly subcontainer: SubContainer, - private process: cp.ChildProcess, + private process: cp.ChildProcess | AbortController, readonly sigtermTimeout: number = DEFAULT_SIGTERM_TIMEOUT, ) { super() @@ -22,25 +23,39 @@ export class CommandController extends Drop { return async ( effects: T.Effects, subcontainer: SubContainer, - command: T.CommandType, - options: { - // Defaults to the DEFAULT_SIGTERM_TIMEOUT = 30_000ms - sigtermTimeout?: number - runAsInit?: boolean - env?: - | { - [variable: string]: string - } - | undefined - cwd?: string | undefined - user?: string | undefined - onStdout?: (chunk: Buffer | string | any) => void - onStderr?: (chunk: Buffer | string | any) => void - }, + exec: DaemonCommandType, ) => { try { + if ("fn" in exec) { + const abort = new AbortController() + const cell: { ctrl: CommandController } = { + ctrl: new CommandController( + exec.fn(subcontainer, abort).then(async (command) => { + if (command && !abort.signal.aborted) { + Object.assign( + cell.ctrl, + await CommandController.of()( + effects, + subcontainer, + command, + ), + ) + return await cell.ctrl.runningAnswer + } else { + cell.ctrl.state.exited = true + } + return null + }), + { exited: false }, + subcontainer, + abort, + exec.sigtermTimeout, + ), + } + return cell.ctrl + } let commands: string[] - if (T.isUseEntrypoint(command)) { + if (T.isUseEntrypoint(exec.command)) { const imageMeta: T.ImageMetadata = await fs .readFile(`/media/startos/images/${subcontainer.imageId}.json`, { encoding: "utf8", @@ -49,24 +64,24 @@ export class CommandController extends Drop { .then(JSON.parse) commands = imageMeta.entrypoint ?? [] commands = commands.concat( - ...(command.overridCmd ?? imageMeta.cmd ?? []), + ...(exec.command.overridCmd ?? imageMeta.cmd ?? []), ) - } else commands = splitCommand(command) + } else commands = splitCommand(exec.command) let childProcess: cp.ChildProcess - if (options.runAsInit) { + if (exec.runAsInit) { childProcess = await subcontainer.launch(commands, { - env: options.env, + env: exec.env, }) } else { childProcess = await subcontainer.spawn(commands, { - env: options.env, - stdio: options.onStdout || options.onStderr ? "pipe" : "inherit", + env: exec.env, + stdio: exec.onStdout || exec.onStderr ? "pipe" : "inherit", }) } - if (options.onStdout) childProcess.stdout?.on("data", options.onStdout) - if (options.onStderr) childProcess.stderr?.on("data", options.onStderr) + if (exec.onStdout) childProcess.stdout?.on("data", exec.onStdout) + if (exec.onStderr) childProcess.stderr?.on("data", exec.onStderr) const state = { exited: false } const answer = new Promise((resolve, reject) => { @@ -98,7 +113,7 @@ export class CommandController extends Drop { state, subcontainer, childProcess, - options.sigtermTimeout, + exec.sigtermTimeout, ) } catch (e) { await subcontainer.destroy() @@ -112,10 +127,22 @@ export class CommandController extends Drop { this.term() }, timeout) try { - return await this.runningAnswer + if (timeout > 0 && this.process instanceof AbortController) + await Promise.race([ + this.runningAnswer, + new Promise((_, reject) => + setTimeout( + () => + reject(new Error("Timed out waiting for js command to exit")), + timeout * 2, + ), + ), + ]) + else await this.runningAnswer } finally { if (!this.state.exited) { - this.process.kill("SIGKILL") + if (this.process instanceof AbortController) this.process.abort() + else this.process.kill("SIGKILL") } await this.subcontainer.destroy() } @@ -123,9 +150,12 @@ export class CommandController extends Drop { async term({ signal = SIGTERM, timeout = this.sigtermTimeout } = {}) { try { if (!this.state.exited) { + if (this.process instanceof AbortController) return this.process.abort() + if (signal !== "SIGKILL") { setTimeout(() => { - if (!this.state.exited) this.process.kill("SIGKILL") + if (this.process instanceof AbortController) this.process.abort() + else this.process.kill("SIGKILL") }, timeout) } if (!this.process.kill(signal)) { @@ -135,7 +165,18 @@ export class CommandController extends Drop { } } - await this.runningAnswer + if (this.process instanceof AbortController) + await Promise.race([ + this.runningAnswer, + new Promise((_, reject) => + setTimeout( + () => + reject(new Error("Timed out waiting for js command to exit")), + timeout * 2, + ), + ), + ]) + else await this.runningAnswer } finally { await this.subcontainer.destroy() } diff --git a/sdk/package/lib/mainFn/Daemon.ts b/sdk/package/lib/mainFn/Daemon.ts index e4f801ca9..171b9b5dd 100644 --- a/sdk/package/lib/mainFn/Daemon.ts +++ b/sdk/package/lib/mainFn/Daemon.ts @@ -7,6 +7,7 @@ import { SubContainerRc, } from "../util/SubContainer" import { CommandController } from "./CommandController" +import { DaemonCommandType } from "./Daemons" import { Oneshot } from "./Oneshot" const TIMEOUT_INCREMENT_MS = 1000 @@ -20,11 +21,11 @@ export class Daemon extends Drop { private commandController: CommandController | null = null private shouldBeRunning = false protected exitedSuccess = false + private onExitFns: ((success: boolean) => void)[] = [] protected constructor( private subcontainer: SubContainer, - private startCommand: () => Promise>, + private startCommand: (() => Promise>) | null, readonly oneshot: boolean = false, - protected onExitSuccessFns: (() => void)[] = [], ) { super() } @@ -35,29 +36,13 @@ export class Daemon extends Drop { return async ( effects: T.Effects, subcontainer: SubContainer, - command: T.CommandType, - options: { - runAsInit?: boolean - env?: - | { - [variable: string]: string - } - | undefined - cwd?: string | undefined - user?: string | undefined - onStdout?: (chunk: Buffer | string | any) => void - onStderr?: (chunk: Buffer | string | any) => void - sigtermTimeout?: number - }, + exec: DaemonCommandType | null, ) => { if (subcontainer.isOwned()) subcontainer = subcontainer.rc() - const startCommand = () => - CommandController.of()( - effects, - subcontainer.rc(), - command, - options, - ) + const startCommand = exec + ? () => + CommandController.of()(effects, subcontainer.rc(), exec) + : null return new Daemon(subcontainer, startCommand) } } @@ -66,35 +51,35 @@ export class Daemon extends Drop { return } this.shouldBeRunning = true - this.exitedSuccess = false let timeoutCounter = 0 ;(async () => { - while (this.shouldBeRunning) { + while (this.startCommand && this.shouldBeRunning) { if (this.commandController) await this.commandController .term({}) .catch((err) => console.error(err)) - this.commandController = await this.startCommand() - if ( - (await this.commandController.wait().then( + try { + this.commandController = await this.startCommand() + const success = await this.commandController.wait().then( (_) => true, (err) => { console.error(err) return false }, - )) && - this.oneshot - ) { - for (const fn of this.onExitSuccessFns) { + ) + for (const fn of this.onExitFns) { try { - fn() + fn(success) } catch (e) { - console.error("EXIT_SUCCESS handler", e) + console.error("EXIT handler", e) } } - this.onExitSuccessFns = [] - this.exitedSuccess = true - break + if (success && this.oneshot) { + this.exitedSuccess = true + break + } + } catch (e) { + console.error(e) } await new Promise((resolve) => setTimeout(resolve, timeoutCounter)) timeoutCounter += TIMEOUT_INCREMENT_MS @@ -115,15 +100,20 @@ export class Daemon extends Drop { timeout?: number | undefined }) { this.shouldBeRunning = false + this.exitedSuccess = false await this.commandController ?.term({ ...termOptions }) .catch((e) => console.error(asError(e))) this.commandController = null + this.onExitFns = [] await this.subcontainer.destroy() } subcontainerRc(): SubContainerRc { return this.subcontainer.rc() } + onExit(fn: (success: boolean) => void) { + this.onExitFns.push(fn) + } onDrop(): void { this.stop().catch((e) => console.error(asError(e))) } diff --git a/sdk/package/lib/mainFn/Daemons.ts b/sdk/package/lib/mainFn/Daemons.ts index 7f50e0cb1..adc3c758a 100644 --- a/sdk/package/lib/mainFn/Daemons.ts +++ b/sdk/package/lib/mainFn/Daemons.ts @@ -4,8 +4,7 @@ import { HealthCheckResult } from "../health/checkFns" import { Trigger } from "../trigger" import * as T from "../../../base/lib/types" -import { Mounts } from "./Mounts" -import { MountOptions, SubContainer } from "../util/SubContainer" +import { SubContainer } from "../util/SubContainer" import { promisify } from "node:util" import * as CP from "node:child_process" @@ -50,20 +49,40 @@ export type Ready = { trigger?: Trigger } -type NewDaemonParams = { - /** The command line command to start the daemon */ +export type ExecCommandOptions = { command: T.CommandType - /** Information about the subcontainer in which the daemon runs */ - subcontainer: SubContainer - runAsInit?: boolean - env?: Record - cwd?: string - user?: string + // Defaults to the DEFAULT_SIGTERM_TIMEOUT = 30_000ms sigtermTimeout?: number + runAsInit?: boolean + env?: + | { + [variable: string]: string + } + | undefined + cwd?: string | undefined + user?: string | undefined onStdout?: (chunk: Buffer | string | any) => void onStderr?: (chunk: Buffer | string | any) => void } +export type ExecFnOptions = { + fn: ( + subcontainer: SubContainer, + abort: AbortController, + ) => Promise + // Defaults to the DEFAULT_SIGTERM_TIMEOUT = 30_000ms + sigtermTimeout?: number +} + +export type DaemonCommandType = ExecCommandOptions | ExecFnOptions + +type NewDaemonParams = { + /** What to run as the daemon: either an async fn or a commandline command to run in the subcontainer */ + exec: DaemonCommandType | null + /** Information about the subcontainer in which the daemon runs */ + subcontainer: SubContainer +} + type AddDaemonParams< Manifest extends T.SDKManifest, Ids extends string, @@ -84,6 +103,7 @@ type AddOneshotParams< Ids extends string, Id extends string, > = NewDaemonParams & { + exec: DaemonCommandType /** An array of IDs of prior daemons whose successful initializations are required before this daemon will initialize */ requires: Exclude[] } @@ -172,10 +192,7 @@ export class Daemons : Daemon.of()( this.effects, options.subcontainer, - options.command, - { - ...options, - }, + options.exec, ) const healthDaemon = new HealthDaemon( daemon, @@ -221,10 +238,7 @@ export class Daemons const daemon = Oneshot.of()( this.effects, options.subcontainer, - options.command, - { - ...options, - }, + options.exec, ) const healthDaemon = new HealthDaemon( daemon, diff --git a/sdk/package/lib/mainFn/HealthDaemon.ts b/sdk/package/lib/mainFn/HealthDaemon.ts index 73450993f..c1d9e6c2c 100644 --- a/sdk/package/lib/mainFn/HealthDaemon.ts +++ b/sdk/package/lib/mainFn/HealthDaemon.ts @@ -90,15 +90,23 @@ export class HealthDaemon { this.healthCheckCleanup?.() } private async setupHealthCheck() { - if (this.ready === "EXIT_SUCCESS") { - const daemon = await this.daemon - if (daemon.isOneshot()) { - daemon.onExitSuccess(() => - this.setHealth({ result: "success", message: null }), - ) + const daemon = await this.daemon + daemon.onExit((success) => { + if (success && this.ready === "EXIT_SUCCESS") { + this.setHealth({ result: "success", message: null }) + } else if (!success) { + this.setHealth({ + result: "failure", + message: `${this.id} daemon crashed`, + }) + } else if (!daemon.isOneshot()) { + this.setHealth({ + result: "failure", + message: `${this.id} daemon exited`, + }) } - return - } + }) + if (this.ready === "EXIT_SUCCESS") return if (this.healthCheckCleanup) return const trigger = (this.ready.trigger ?? defaultTrigger)(() => ({ lastResult: this._health.result, diff --git a/sdk/package/lib/mainFn/Oneshot.ts b/sdk/package/lib/mainFn/Oneshot.ts index b09d7380c..33200c4c1 100644 --- a/sdk/package/lib/mainFn/Oneshot.ts +++ b/sdk/package/lib/mainFn/Oneshot.ts @@ -2,6 +2,7 @@ import * as T from "../../../base/lib/types" import { SubContainer, SubContainerOwned } from "../util/SubContainer" import { CommandController } from "./CommandController" import { Daemon } from "./Daemon" +import { DaemonCommandType } from "./Daemons" /** * This is a wrapper around CommandController that has a state of off, where the command shouldn't be running @@ -14,37 +15,14 @@ export class Oneshot extends Daemon { return async ( effects: T.Effects, subcontainer: SubContainer, - command: T.CommandType, - options: { - env?: - | { - [variable: string]: string - } - | undefined - cwd?: string | undefined - user?: string | undefined - onStdout?: (chunk: Buffer | string | any) => void - onStderr?: (chunk: Buffer | string | any) => void - sigtermTimeout?: number - }, + exec: DaemonCommandType | null, ) => { if (subcontainer.isOwned()) subcontainer = subcontainer.rc() - const startCommand = () => - CommandController.of()( - effects, - subcontainer.rc(), - command, - options, - ) - return new Oneshot(subcontainer, startCommand, true, []) - } - } - - onExitSuccess(fn: () => void) { - if (this.exitedSuccess) { - fn() - } else { - this.onExitSuccessFns.push(fn) + const startCommand = exec + ? () => + CommandController.of()(effects, subcontainer.rc(), exec) + : null + return new Oneshot(subcontainer, startCommand, true) } } } diff --git a/sdk/package/lib/util/SubContainer.ts b/sdk/package/lib/util/SubContainer.ts index 29ebcbdde..8473d3b3a 100644 --- a/sdk/package/lib/util/SubContainer.ts +++ b/sdk/package/lib/util/SubContainer.ts @@ -92,6 +92,7 @@ export interface SubContainer< command: string[], options?: CommandOptions & ExecOptions, timeoutMs?: number | null, + abort?: AbortController, ): Promise<{ throw: () => { stdout: string | Buffer; stderr: string | Buffer } exitCode: number | null @@ -111,6 +112,7 @@ export interface SubContainer< command: string[], options?: CommandOptions & ExecOptions, timeoutMs?: number | null, + abort?: AbortController, ): Promise<{ stdout: string | Buffer stderr: string | Buffer @@ -378,6 +380,7 @@ export class SubContainerOwned< command: string[], options?: CommandOptions & ExecOptions, timeoutMs: number | null = 30000, + abort?: AbortController, ): Promise<{ throw: () => { stdout: string | Buffer; stderr: string | Buffer } exitCode: number | null @@ -417,6 +420,7 @@ export class SubContainerOwned< ], options || {}, ) + abort?.signal.addEventListener("abort", () => child.kill("SIGKILL")) if (options?.input) { await new Promise((resolve, reject) => { try { @@ -489,12 +493,15 @@ export class SubContainerOwned< async execFail( command: string[], options?: CommandOptions & ExecOptions, - timeoutMs: number | null = 30000, + timeoutMs?: number | null, + abort?: AbortController, ): Promise<{ stdout: string | Buffer stderr: string | Buffer }> { - return this.exec(command, options, timeoutMs).then((res) => res.throw()) + return this.exec(command, options, timeoutMs, abort).then((res) => + res.throw(), + ) } async launch( @@ -711,7 +718,8 @@ export class SubContainerRc< async exec( command: string[], options?: CommandOptions & ExecOptions, - timeoutMs: number | null = 30000, + timeoutMs?: number | null, + abort?: AbortController, ): Promise<{ throw: () => { stdout: string | Buffer; stderr: string | Buffer } exitCode: number | null @@ -719,7 +727,7 @@ export class SubContainerRc< stdout: string | Buffer stderr: string | Buffer }> { - return this.subcontainer.exec(command, options, timeoutMs) + return this.subcontainer.exec(command, options, timeoutMs, abort) } /** @@ -732,12 +740,13 @@ export class SubContainerRc< async execFail( command: string[], options?: CommandOptions & ExecOptions, - timeoutMs: number | null = 30000, + timeoutMs?: number | null, + abort?: AbortController, ): Promise<{ stdout: string | Buffer stderr: string | Buffer }> { - return this.subcontainer.execFail(command, options, timeoutMs) + return this.subcontainer.execFail(command, options, timeoutMs, abort) } async launch( diff --git a/sdk/package/package-lock.json b/sdk/package/package-lock.json index 5158e5339..0fcc2ee8e 100644 --- a/sdk/package/package-lock.json +++ b/sdk/package/package-lock.json @@ -1,12 +1,12 @@ { "name": "@start9labs/start-sdk", - "version": "0.4.0-beta.26", + "version": "0.4.0-beta.27", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@start9labs/start-sdk", - "version": "0.4.0-beta.26", + "version": "0.4.0-beta.27", "license": "MIT", "dependencies": { "@iarna/toml": "^3.0.0", diff --git a/sdk/package/package.json b/sdk/package/package.json index 1511057c0..1e6351f57 100644 --- a/sdk/package/package.json +++ b/sdk/package/package.json @@ -1,6 +1,6 @@ { "name": "@start9labs/start-sdk", - "version": "0.4.0-beta.26", + "version": "0.4.0-beta.27", "description": "Software development kit to facilitate packaging services for StartOS", "main": "./package/lib/index.js", "types": "./package/lib/index.d.ts",