improve daemons init system (#2960)

* repeatable command launch fn

* allow js fn for daemon exec

* improve daemon init system

* fixes from testing
This commit is contained in:
Aiden McClelland
2025-06-06 14:35:03 -06:00
committed by GitHub
parent 586d950b8c
commit 2464d255d5
13 changed files with 187 additions and 162 deletions

View File

@@ -38,7 +38,7 @@
},
"../sdk/dist": {
"name": "@start9labs/start-sdk",
"version": "0.4.0-beta.26",
"version": "0.4.0-beta.27",
"license": "MIT",
"dependencies": {
"@iarna/toml": "^3.0.0",

View File

@@ -67,20 +67,14 @@ export class MainLoop {
this.system.manifest.volumes,
`Main - ${currentCommand.join(" ")}`,
)
const daemon = await Daemon.of()(
this.effects,
subcontainer,
currentCommand,
{
runAsInit: true,
env: {
TINI_SUBREAPER: "true",
},
sigtermTimeout: utils.inMs(
this.system.manifest.main["sigterm-timeout"],
),
const daemon = await Daemon.of()(this.effects, subcontainer, {
command: currentCommand,
runAsInit: true,
env: {
TINI_SUBREAPER: "true",
},
)
sigtermTimeout: utils.inMs(this.system.manifest.main["sigterm-timeout"]),
})
daemon.start()
return {

View File

@@ -135,12 +135,9 @@ export const polyfillEffects = (
[input.command, ...(input.args || [])].join(" "),
)
const daemon = promiseSubcontainer.then((subcontainer) =>
daemons.runCommand()(
effects,
subcontainer,
[input.command, ...(input.args || [])],
{},
),
daemons.runCommand()(effects, subcontainer, {
command: [input.command, ...(input.args || [])],
}),
)
return {
wait: () =>

View File

@@ -265,6 +265,7 @@ pub async fn run_script<P: AsRef<Path>>(path: P, mut progress: PhaseProgressTrac
.input(Some(&mut reader))
.invoke(ErrorKind::Unknown)
.await?;
// TODO: inherit?
Ok::<_, Error>(())
}

View File

@@ -140,8 +140,6 @@ pub struct GetOsVersionParams {
#[ts(type = "string | null")]
#[arg(long)]
pub target_version: Option<VersionRange>,
#[arg(long)]
pub include_prerelease: Option<bool>,
#[arg(long = "id")]
server_id: Option<String>,
#[ts(type = "string | null")]
@@ -158,7 +156,6 @@ pub async fn get_version(
GetOsVersionParams {
source_version: source,
target_version: target,
include_prerelease,
server_id,
platform,
device_info,
@@ -166,9 +163,6 @@ pub async fn get_version(
) -> Result<BTreeMap<Version, OsVersionInfo>, Error> {
let source = source.or_else(|| device_info.as_ref().map(|d| d.os.version.clone()));
let platform = platform.or_else(|| device_info.as_ref().map(|d| d.os.platform.clone()));
let include_prerelease = include_prerelease
.or_else(|| source.as_ref().map(|s| !s.prerelease().is_empty()))
.unwrap_or(cfg!(feature = "dev"));
if let (Some(pool), Some(server_id), Some(arch)) = (&ctx.pool, server_id, &platform) {
let created_at = Utc::now();
@@ -192,10 +186,9 @@ pub async fn get_version(
.into_iter()
.map(|(v, i)| i.de().map(|i| (v, i)))
.filter_ok(|(version, info)| {
(version.prerelease().is_empty() || include_prerelease)
&& platform
.as_ref()
.map_or(true, |p| info.squashfs.contains_key(p))
platform
.as_ref()
.map_or(true, |p| info.squashfs.contains_key(p))
&& version.satisfies(&target)
&& source
.as_ref()

View File

@@ -7,13 +7,14 @@ import { Drop, splitCommand } from "../util"
import * as cp from "child_process"
import * as fs from "node:fs/promises"
import { Mounts } from "./Mounts"
import { DaemonCommandType } from "./Daemons"
export class CommandController<Manifest extends T.SDKManifest> extends Drop {
private constructor(
readonly runningAnswer: Promise<unknown>,
readonly runningAnswer: Promise<null>,
private state: { exited: boolean },
private readonly subcontainer: SubContainer<Manifest>,
private process: cp.ChildProcess,
private process: cp.ChildProcess | AbortController,
readonly sigtermTimeout: number = DEFAULT_SIGTERM_TIMEOUT,
) {
super()
@@ -22,25 +23,39 @@ export class CommandController<Manifest extends T.SDKManifest> extends Drop {
return async (
effects: T.Effects,
subcontainer: SubContainer<Manifest>,
command: T.CommandType,
options: {
// Defaults to the DEFAULT_SIGTERM_TIMEOUT = 30_000ms
sigtermTimeout?: number
runAsInit?: boolean
env?:
| {
[variable: string]: string
}
| undefined
cwd?: string | undefined
user?: string | undefined
onStdout?: (chunk: Buffer | string | any) => void
onStderr?: (chunk: Buffer | string | any) => void
},
exec: DaemonCommandType,
) => {
try {
if ("fn" in exec) {
const abort = new AbortController()
const cell: { ctrl: CommandController<Manifest> } = {
ctrl: new CommandController(
exec.fn(subcontainer, abort).then(async (command) => {
if (command && !abort.signal.aborted) {
Object.assign(
cell.ctrl,
await CommandController.of<Manifest>()(
effects,
subcontainer,
command,
),
)
return await cell.ctrl.runningAnswer
} else {
cell.ctrl.state.exited = true
}
return null
}),
{ exited: false },
subcontainer,
abort,
exec.sigtermTimeout,
),
}
return cell.ctrl
}
let commands: string[]
if (T.isUseEntrypoint(command)) {
if (T.isUseEntrypoint(exec.command)) {
const imageMeta: T.ImageMetadata = await fs
.readFile(`/media/startos/images/${subcontainer.imageId}.json`, {
encoding: "utf8",
@@ -49,24 +64,24 @@ export class CommandController<Manifest extends T.SDKManifest> extends Drop {
.then(JSON.parse)
commands = imageMeta.entrypoint ?? []
commands = commands.concat(
...(command.overridCmd ?? imageMeta.cmd ?? []),
...(exec.command.overridCmd ?? imageMeta.cmd ?? []),
)
} else commands = splitCommand(command)
} else commands = splitCommand(exec.command)
let childProcess: cp.ChildProcess
if (options.runAsInit) {
if (exec.runAsInit) {
childProcess = await subcontainer.launch(commands, {
env: options.env,
env: exec.env,
})
} else {
childProcess = await subcontainer.spawn(commands, {
env: options.env,
stdio: options.onStdout || options.onStderr ? "pipe" : "inherit",
env: exec.env,
stdio: exec.onStdout || exec.onStderr ? "pipe" : "inherit",
})
}
if (options.onStdout) childProcess.stdout?.on("data", options.onStdout)
if (options.onStderr) childProcess.stderr?.on("data", options.onStderr)
if (exec.onStdout) childProcess.stdout?.on("data", exec.onStdout)
if (exec.onStderr) childProcess.stderr?.on("data", exec.onStderr)
const state = { exited: false }
const answer = new Promise<null>((resolve, reject) => {
@@ -98,7 +113,7 @@ export class CommandController<Manifest extends T.SDKManifest> extends Drop {
state,
subcontainer,
childProcess,
options.sigtermTimeout,
exec.sigtermTimeout,
)
} catch (e) {
await subcontainer.destroy()
@@ -112,10 +127,22 @@ export class CommandController<Manifest extends T.SDKManifest> extends Drop {
this.term()
}, timeout)
try {
return await this.runningAnswer
if (timeout > 0 && this.process instanceof AbortController)
await Promise.race([
this.runningAnswer,
new Promise((_, reject) =>
setTimeout(
() =>
reject(new Error("Timed out waiting for js command to exit")),
timeout * 2,
),
),
])
else await this.runningAnswer
} finally {
if (!this.state.exited) {
this.process.kill("SIGKILL")
if (this.process instanceof AbortController) this.process.abort()
else this.process.kill("SIGKILL")
}
await this.subcontainer.destroy()
}
@@ -123,9 +150,12 @@ export class CommandController<Manifest extends T.SDKManifest> extends Drop {
async term({ signal = SIGTERM, timeout = this.sigtermTimeout } = {}) {
try {
if (!this.state.exited) {
if (this.process instanceof AbortController) return this.process.abort()
if (signal !== "SIGKILL") {
setTimeout(() => {
if (!this.state.exited) this.process.kill("SIGKILL")
if (this.process instanceof AbortController) this.process.abort()
else this.process.kill("SIGKILL")
}, timeout)
}
if (!this.process.kill(signal)) {
@@ -135,7 +165,18 @@ export class CommandController<Manifest extends T.SDKManifest> extends Drop {
}
}
await this.runningAnswer
if (this.process instanceof AbortController)
await Promise.race([
this.runningAnswer,
new Promise((_, reject) =>
setTimeout(
() =>
reject(new Error("Timed out waiting for js command to exit")),
timeout * 2,
),
),
])
else await this.runningAnswer
} finally {
await this.subcontainer.destroy()
}

View File

@@ -7,6 +7,7 @@ import {
SubContainerRc,
} from "../util/SubContainer"
import { CommandController } from "./CommandController"
import { DaemonCommandType } from "./Daemons"
import { Oneshot } from "./Oneshot"
const TIMEOUT_INCREMENT_MS = 1000
@@ -20,11 +21,11 @@ export class Daemon<Manifest extends T.SDKManifest> extends Drop {
private commandController: CommandController<Manifest> | null = null
private shouldBeRunning = false
protected exitedSuccess = false
private onExitFns: ((success: boolean) => void)[] = []
protected constructor(
private subcontainer: SubContainer<Manifest>,
private startCommand: () => Promise<CommandController<Manifest>>,
private startCommand: (() => Promise<CommandController<Manifest>>) | null,
readonly oneshot: boolean = false,
protected onExitSuccessFns: (() => void)[] = [],
) {
super()
}
@@ -35,29 +36,13 @@ export class Daemon<Manifest extends T.SDKManifest> extends Drop {
return async (
effects: T.Effects,
subcontainer: SubContainer<Manifest>,
command: T.CommandType,
options: {
runAsInit?: boolean
env?:
| {
[variable: string]: string
}
| undefined
cwd?: string | undefined
user?: string | undefined
onStdout?: (chunk: Buffer | string | any) => void
onStderr?: (chunk: Buffer | string | any) => void
sigtermTimeout?: number
},
exec: DaemonCommandType | null,
) => {
if (subcontainer.isOwned()) subcontainer = subcontainer.rc()
const startCommand = () =>
CommandController.of<Manifest>()(
effects,
subcontainer.rc(),
command,
options,
)
const startCommand = exec
? () =>
CommandController.of<Manifest>()(effects, subcontainer.rc(), exec)
: null
return new Daemon(subcontainer, startCommand)
}
}
@@ -66,35 +51,35 @@ export class Daemon<Manifest extends T.SDKManifest> extends Drop {
return
}
this.shouldBeRunning = true
this.exitedSuccess = false
let timeoutCounter = 0
;(async () => {
while (this.shouldBeRunning) {
while (this.startCommand && this.shouldBeRunning) {
if (this.commandController)
await this.commandController
.term({})
.catch((err) => console.error(err))
this.commandController = await this.startCommand()
if (
(await this.commandController.wait().then(
try {
this.commandController = await this.startCommand()
const success = await this.commandController.wait().then(
(_) => true,
(err) => {
console.error(err)
return false
},
)) &&
this.oneshot
) {
for (const fn of this.onExitSuccessFns) {
)
for (const fn of this.onExitFns) {
try {
fn()
fn(success)
} catch (e) {
console.error("EXIT_SUCCESS handler", e)
console.error("EXIT handler", e)
}
}
this.onExitSuccessFns = []
this.exitedSuccess = true
break
if (success && this.oneshot) {
this.exitedSuccess = true
break
}
} catch (e) {
console.error(e)
}
await new Promise((resolve) => setTimeout(resolve, timeoutCounter))
timeoutCounter += TIMEOUT_INCREMENT_MS
@@ -115,15 +100,20 @@ export class Daemon<Manifest extends T.SDKManifest> extends Drop {
timeout?: number | undefined
}) {
this.shouldBeRunning = false
this.exitedSuccess = false
await this.commandController
?.term({ ...termOptions })
.catch((e) => console.error(asError(e)))
this.commandController = null
this.onExitFns = []
await this.subcontainer.destroy()
}
subcontainerRc(): SubContainerRc<Manifest> {
return this.subcontainer.rc()
}
onExit(fn: (success: boolean) => void) {
this.onExitFns.push(fn)
}
onDrop(): void {
this.stop().catch((e) => console.error(asError(e)))
}

View File

@@ -4,8 +4,7 @@ import { HealthCheckResult } from "../health/checkFns"
import { Trigger } from "../trigger"
import * as T from "../../../base/lib/types"
import { Mounts } from "./Mounts"
import { MountOptions, SubContainer } from "../util/SubContainer"
import { SubContainer } from "../util/SubContainer"
import { promisify } from "node:util"
import * as CP from "node:child_process"
@@ -50,20 +49,40 @@ export type Ready = {
trigger?: Trigger
}
type NewDaemonParams<Manifest extends T.SDKManifest> = {
/** The command line command to start the daemon */
export type ExecCommandOptions = {
command: T.CommandType
/** Information about the subcontainer in which the daemon runs */
subcontainer: SubContainer<Manifest>
runAsInit?: boolean
env?: Record<string, string>
cwd?: string
user?: string
// Defaults to the DEFAULT_SIGTERM_TIMEOUT = 30_000ms
sigtermTimeout?: number
runAsInit?: boolean
env?:
| {
[variable: string]: string
}
| undefined
cwd?: string | undefined
user?: string | undefined
onStdout?: (chunk: Buffer | string | any) => void
onStderr?: (chunk: Buffer | string | any) => void
}
export type ExecFnOptions = {
fn: (
subcontainer: SubContainer<Manifest>,
abort: AbortController,
) => Promise<ExecCommandOptions | null>
// Defaults to the DEFAULT_SIGTERM_TIMEOUT = 30_000ms
sigtermTimeout?: number
}
export type DaemonCommandType = ExecCommandOptions | ExecFnOptions
type NewDaemonParams<Manifest extends T.SDKManifest> = {
/** What to run as the daemon: either an async fn or a commandline command to run in the subcontainer */
exec: DaemonCommandType | null
/** Information about the subcontainer in which the daemon runs */
subcontainer: SubContainer<Manifest>
}
type AddDaemonParams<
Manifest extends T.SDKManifest,
Ids extends string,
@@ -84,6 +103,7 @@ type AddOneshotParams<
Ids extends string,
Id extends string,
> = NewDaemonParams<Manifest> & {
exec: DaemonCommandType
/** An array of IDs of prior daemons whose successful initializations are required before this daemon will initialize */
requires: Exclude<Ids, Id>[]
}
@@ -172,10 +192,7 @@ export class Daemons<Manifest extends T.SDKManifest, Ids extends string>
: Daemon.of<Manifest>()(
this.effects,
options.subcontainer,
options.command,
{
...options,
},
options.exec,
)
const healthDaemon = new HealthDaemon(
daemon,
@@ -221,10 +238,7 @@ export class Daemons<Manifest extends T.SDKManifest, Ids extends string>
const daemon = Oneshot.of<Manifest>()(
this.effects,
options.subcontainer,
options.command,
{
...options,
},
options.exec,
)
const healthDaemon = new HealthDaemon<Manifest>(
daemon,

View File

@@ -90,15 +90,23 @@ export class HealthDaemon<Manifest extends SDKManifest> {
this.healthCheckCleanup?.()
}
private async setupHealthCheck() {
if (this.ready === "EXIT_SUCCESS") {
const daemon = await this.daemon
if (daemon.isOneshot()) {
daemon.onExitSuccess(() =>
this.setHealth({ result: "success", message: null }),
)
const daemon = await this.daemon
daemon.onExit((success) => {
if (success && this.ready === "EXIT_SUCCESS") {
this.setHealth({ result: "success", message: null })
} else if (!success) {
this.setHealth({
result: "failure",
message: `${this.id} daemon crashed`,
})
} else if (!daemon.isOneshot()) {
this.setHealth({
result: "failure",
message: `${this.id} daemon exited`,
})
}
return
}
})
if (this.ready === "EXIT_SUCCESS") return
if (this.healthCheckCleanup) return
const trigger = (this.ready.trigger ?? defaultTrigger)(() => ({
lastResult: this._health.result,

View File

@@ -2,6 +2,7 @@ import * as T from "../../../base/lib/types"
import { SubContainer, SubContainerOwned } from "../util/SubContainer"
import { CommandController } from "./CommandController"
import { Daemon } from "./Daemon"
import { DaemonCommandType } from "./Daemons"
/**
* This is a wrapper around CommandController that has a state of off, where the command shouldn't be running
@@ -14,37 +15,14 @@ export class Oneshot<Manifest extends T.SDKManifest> extends Daemon<Manifest> {
return async (
effects: T.Effects,
subcontainer: SubContainer<Manifest>,
command: T.CommandType,
options: {
env?:
| {
[variable: string]: string
}
| undefined
cwd?: string | undefined
user?: string | undefined
onStdout?: (chunk: Buffer | string | any) => void
onStderr?: (chunk: Buffer | string | any) => void
sigtermTimeout?: number
},
exec: DaemonCommandType | null,
) => {
if (subcontainer.isOwned()) subcontainer = subcontainer.rc()
const startCommand = () =>
CommandController.of<Manifest>()(
effects,
subcontainer.rc(),
command,
options,
)
return new Oneshot(subcontainer, startCommand, true, [])
}
}
onExitSuccess(fn: () => void) {
if (this.exitedSuccess) {
fn()
} else {
this.onExitSuccessFns.push(fn)
const startCommand = exec
? () =>
CommandController.of<Manifest>()(effects, subcontainer.rc(), exec)
: null
return new Oneshot(subcontainer, startCommand, true)
}
}
}

View File

@@ -92,6 +92,7 @@ export interface SubContainer<
command: string[],
options?: CommandOptions & ExecOptions,
timeoutMs?: number | null,
abort?: AbortController,
): Promise<{
throw: () => { stdout: string | Buffer; stderr: string | Buffer }
exitCode: number | null
@@ -111,6 +112,7 @@ export interface SubContainer<
command: string[],
options?: CommandOptions & ExecOptions,
timeoutMs?: number | null,
abort?: AbortController,
): Promise<{
stdout: string | Buffer
stderr: string | Buffer
@@ -378,6 +380,7 @@ export class SubContainerOwned<
command: string[],
options?: CommandOptions & ExecOptions,
timeoutMs: number | null = 30000,
abort?: AbortController,
): Promise<{
throw: () => { stdout: string | Buffer; stderr: string | Buffer }
exitCode: number | null
@@ -417,6 +420,7 @@ export class SubContainerOwned<
],
options || {},
)
abort?.signal.addEventListener("abort", () => child.kill("SIGKILL"))
if (options?.input) {
await new Promise<null>((resolve, reject) => {
try {
@@ -489,12 +493,15 @@ export class SubContainerOwned<
async execFail(
command: string[],
options?: CommandOptions & ExecOptions,
timeoutMs: number | null = 30000,
timeoutMs?: number | null,
abort?: AbortController,
): Promise<{
stdout: string | Buffer
stderr: string | Buffer
}> {
return this.exec(command, options, timeoutMs).then((res) => res.throw())
return this.exec(command, options, timeoutMs, abort).then((res) =>
res.throw(),
)
}
async launch(
@@ -711,7 +718,8 @@ export class SubContainerRc<
async exec(
command: string[],
options?: CommandOptions & ExecOptions,
timeoutMs: number | null = 30000,
timeoutMs?: number | null,
abort?: AbortController,
): Promise<{
throw: () => { stdout: string | Buffer; stderr: string | Buffer }
exitCode: number | null
@@ -719,7 +727,7 @@ export class SubContainerRc<
stdout: string | Buffer
stderr: string | Buffer
}> {
return this.subcontainer.exec(command, options, timeoutMs)
return this.subcontainer.exec(command, options, timeoutMs, abort)
}
/**
@@ -732,12 +740,13 @@ export class SubContainerRc<
async execFail(
command: string[],
options?: CommandOptions & ExecOptions,
timeoutMs: number | null = 30000,
timeoutMs?: number | null,
abort?: AbortController,
): Promise<{
stdout: string | Buffer
stderr: string | Buffer
}> {
return this.subcontainer.execFail(command, options, timeoutMs)
return this.subcontainer.execFail(command, options, timeoutMs, abort)
}
async launch(

View File

@@ -1,12 +1,12 @@
{
"name": "@start9labs/start-sdk",
"version": "0.4.0-beta.26",
"version": "0.4.0-beta.27",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@start9labs/start-sdk",
"version": "0.4.0-beta.26",
"version": "0.4.0-beta.27",
"license": "MIT",
"dependencies": {
"@iarna/toml": "^3.0.0",

View File

@@ -1,6 +1,6 @@
{
"name": "@start9labs/start-sdk",
"version": "0.4.0-beta.26",
"version": "0.4.0-beta.27",
"description": "Software development kit to facilitate packaging services for StartOS",
"main": "./package/lib/index.js",
"types": "./package/lib/index.d.ts",