fix: Making the daemons keep up the status. (#2617)

* complete get_primary_url fn

* complete clear_network_interfaces fn

* formatting

* complete remove_address fn

* get_system_smtp wip

* complete get_system_smtp and set_system_smtp

* add SetSystemSmtpParams struct

* add set_system_smtp subcommand

* Remove 'Copy' implementation from `HostAddress`

Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>

* Refactor `get_host_primary` fn and clone  resulting `HostAddress`

Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>

* misc fixes and debug info

* seed hosts with a tor address

* fix: Making the daemons keep up the status.

* wipFix: Making a service start

* fix: Both the start + stop of the service.

* fix: Weird edge case of failure and kids

---------

Co-authored-by: Shadowy Super Coder <musashidisciple@proton.me>
Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
This commit is contained in:
Jade
2024-05-13 10:50:25 -06:00
committed by GitHub
parent 800b0763e4
commit 0b8a142de0
14 changed files with 467 additions and 168 deletions

View File

@@ -0,0 +1,108 @@
import { NO_TIMEOUT, SIGTERM } from "../StartSdk"
import { SDKManifest } from "../manifest/ManifestTypes"
import { Effects, ValidIfNoStupidEscape } from "../types"
import { MountOptions, Overlay } from "../util/Overlay"
import { splitCommand } from "../util/splitCommand"
import { cpExecFile } from "./Daemons"
export class CommandController {
private constructor(
readonly runningAnswer: Promise<unknown>,
readonly overlay: Overlay,
readonly pid: number | undefined,
) {}
static of<Manifest extends SDKManifest>() {
return async <A extends string>(
effects: Effects,
imageId: {
id: Manifest["images"][number]
sharedRun?: boolean
},
command: ValidIfNoStupidEscape<A> | [string, ...string[]],
options: {
mounts?: { path: string; options: MountOptions }[]
overlay?: Overlay
env?:
| {
[variable: string]: string
}
| undefined
cwd?: string | undefined
user?: string | undefined
onStdout?: (x: Buffer) => void
onStderr?: (x: Buffer) => void
},
) => {
const commands = splitCommand(command)
const overlay = options.overlay || (await Overlay.of(effects, imageId))
for (let mount of options.mounts || []) {
await overlay.mount(mount.options, mount.path)
}
const childProcess = await overlay.spawn(commands, {
env: options.env,
})
const answer = new Promise<null>((resolve, reject) => {
childProcess.stdout.on(
"data",
options.onStdout ??
((data: any) => {
console.log(data.toString())
}),
)
childProcess.stderr.on(
"data",
options.onStderr ??
((data: any) => {
console.error(data.toString())
}),
)
childProcess.on("exit", (code: any) => {
if (code === 0) {
return resolve(null)
}
return reject(new Error(`${commands[0]} exited with code ${code}`))
})
})
const pid = childProcess.pid
return new CommandController(answer, overlay, pid)
}
}
async wait() {
try {
return await this.runningAnswer
} finally {
await cpExecFile("pkill", ["-9", "-s", String(this.pid)]).catch((_) => {})
await this.overlay.destroy().catch((_) => {})
}
}
async term({ signal = SIGTERM, timeout = NO_TIMEOUT } = {}) {
try {
await cpExecFile("pkill", [
`-${signal.replace("SIG", "")}`,
"-s",
String(this.pid),
])
if (timeout > NO_TIMEOUT) {
const didTimeout = await Promise.race([
new Promise((resolve) => setTimeout(resolve, timeout)).then(
() => true,
),
this.runningAnswer.then(() => false),
])
if (didTimeout) {
await cpExecFile("pkill", [`-9`, "-s", String(this.pid)]).catch(
(_: any) => {},
)
}
} else {
await this.runningAnswer
}
} finally {
await this.overlay.destroy()
}
}
}

79
sdk/lib/mainFn/Daemon.ts Normal file
View File

@@ -0,0 +1,79 @@
import { SDKManifest } from "../manifest/ManifestTypes"
import { Effects, ValidIfNoStupidEscape } from "../types"
import { MountOptions, Overlay } from "../util/Overlay"
import { CommandController } from "./CommandController"
const TIMEOUT_INCREMENT_MS = 1000
const MAX_TIMEOUT_MS = 30000
/**
* This is a wrapper around CommandController that has a state of off, where the command shouldn't be running
* and the others state of running, where it will keep a living running command
*/
export class Daemon {
private commandController: CommandController | null = null
private shouldBeRunning = false
private constructor(private startCommand: () => Promise<CommandController>) {}
static of<Manifest extends SDKManifest>() {
return async <A extends string>(
effects: Effects,
imageId: {
id: Manifest["images"][number]
sharedRun?: boolean
},
command: ValidIfNoStupidEscape<A> | [string, ...string[]],
options: {
mounts?: { path: string; options: MountOptions }[]
overlay?: Overlay
env?:
| {
[variable: string]: string
}
| undefined
cwd?: string | undefined
user?: string | undefined
onStdout?: (x: Buffer) => void
onStderr?: (x: Buffer) => void
},
) => {
const startCommand = () =>
CommandController.of<Manifest>()(effects, imageId, command, options)
return new Daemon(startCommand)
}
}
async start() {
if (this.commandController) {
return
}
this.shouldBeRunning = true
let timeoutCounter = 0
new Promise(async () => {
while (this.shouldBeRunning) {
this.commandController = await this.startCommand()
await this.commandController.wait().catch((err) => console.error(err))
await new Promise((resolve) => setTimeout(resolve, timeoutCounter))
timeoutCounter += TIMEOUT_INCREMENT_MS
timeoutCounter = Math.max(MAX_TIMEOUT_MS, timeoutCounter)
}
}).catch((err) => {
console.error(err)
})
}
async term(termOptions?: {
signal?: NodeJS.Signals | undefined
timeout?: number | undefined
}) {
return this.stop(termOptions)
}
async stop(termOptions?: {
signal?: NodeJS.Signals | undefined
timeout?: number | undefined
}) {
this.shouldBeRunning = false
await this.commandController
?.term(termOptions)
.catch((e) => console.error(e))
this.commandController = null
}
}

View File

@@ -13,98 +13,38 @@ import { splitCommand } from "../util/splitCommand"
import { promisify } from "node:util"
import * as CP from "node:child_process"
export { Daemon } from "./Daemon"
export { CommandController } from "./CommandController"
import { HealthDaemon } from "./HealthDaemon"
import { Daemon } from "./Daemon"
import { CommandController } from "./CommandController"
const cpExec = promisify(CP.exec)
const cpExecFile = promisify(CP.execFile)
type Daemon<
export const cpExecFile = promisify(CP.execFile)
export type Ready = {
display: string | null
fn: () => Promise<CheckResult> | CheckResult
trigger?: Trigger
}
type DaemonsParams<
Manifest extends SDKManifest,
Ids extends string,
Command extends string,
Id extends string,
> = {
id: "" extends Id ? never : Id
command: ValidIfNoStupidEscape<Command> | [string, ...string[]]
image: { id: Manifest["images"][number]; sharedRun?: boolean }
mounts: Mounts<Manifest>
mounts: { path: string; options: MountOptions }[]
env?: Record<string, string>
ready: {
display: string | null
fn: () => Promise<CheckResult> | CheckResult
trigger?: Trigger
}
ready: Ready
requires: Exclude<Ids, Id>[]
}
type ErrorDuplicateId<Id extends string> = `The id '${Id}' is already used`
export const runDaemon =
<Manifest extends SDKManifest>() =>
async <A extends string>(
effects: Effects,
image: { id: Manifest["images"][number]; sharedRun?: boolean },
command: ValidIfNoStupidEscape<A> | [string, ...string[]],
options: CommandOptions & {
mounts?: { path: string; options: MountOptions }[]
overlay?: Overlay
},
): Promise<DaemonReturned> => {
const commands = splitCommand(command)
const overlay = options.overlay || (await Overlay.of(effects, image))
for (let mount of options.mounts || []) {
await overlay.mount(mount.options, mount.path)
}
const childProcess = await overlay.spawn(commands, {
env: options.env,
})
const answer = new Promise<null>((resolve, reject) => {
childProcess.stdout.on("data", (data: any) => {
console.log(data.toString())
})
childProcess.stderr.on("data", (data: any) => {
console.error(data.toString())
})
childProcess.on("exit", (code: any) => {
if (code === 0) {
return resolve(null)
}
return reject(new Error(`${commands[0]} exited with code ${code}`))
})
})
const pid = childProcess.pid
return {
async wait() {
try {
return await answer
} finally {
await cpExecFile("pkill", ["-9", "-s", String(pid)]).catch((_) => {})
}
},
async term({ signal = SIGTERM, timeout = NO_TIMEOUT } = {}) {
try {
await cpExecFile("pkill", [`-${signal}`, "-s", String(pid)])
if (timeout > NO_TIMEOUT) {
const didTimeout = await Promise.race([
new Promise((resolve) => setTimeout(resolve, timeout)).then(
() => true,
),
answer.then(() => false),
])
if (didTimeout) {
await cpExecFile("pkill", [`-9`, "-s", String(pid)]).catch(
(_) => {},
)
}
} else {
await answer
}
} finally {
await overlay.destroy()
}
},
}
}
export const runCommand = <Manifest extends SDKManifest>() =>
CommandController.of<Manifest>()
/**
* A class for defining and controlling the service daemons
@@ -133,7 +73,9 @@ export class Daemons<Manifest extends SDKManifest, Ids extends string> {
private constructor(
readonly effects: Effects,
readonly started: (onTerm: () => PromiseLike<void>) => PromiseLike<void>,
readonly daemons?: Daemon<Manifest, Ids, "command", Ids>[],
readonly daemons: Promise<Daemon>[],
readonly ids: Ids[],
readonly healthDaemons: HealthDaemon[],
) {}
/**
* Returns an empty new Daemons class with the provided config.
@@ -150,7 +92,13 @@ export class Daemons<Manifest extends SDKManifest, Ids extends string> {
started: (onTerm: () => PromiseLike<void>) => PromiseLike<void>
healthReceipts: HealthReceipt[]
}) {
return new Daemons<Manifest, never>(config.effects, config.started)
return new Daemons<Manifest, never>(
config.effects,
config.started,
[],
[],
[],
)
}
/**
* Returns the complete list of daemons, including the one defined here
@@ -165,73 +113,60 @@ export class Daemons<Manifest extends SDKManifest, Ids extends string> {
ErrorDuplicateId<Id> extends Id ? never :
Id extends Ids ? ErrorDuplicateId<Id> :
Id,
newDaemon: Omit<Daemon<Manifest, Ids, Command, Id>, "id">,
options: DaemonsParams<Manifest, Ids, Command, Id>,
) {
const daemons = ((this?.daemons ?? []) as any[]).concat({
...newDaemon,
const daemonIndex = this.daemons.length
const daemon = Daemon.of()(
this.effects,
options.image,
options.command,
options,
)
const healthDaemon = new HealthDaemon(
daemon,
daemonIndex,
options.requires
.map((x) => this.ids.indexOf(id as any))
.filter((x) => x >= 0)
.map((id) => this.healthDaemons[id]),
id,
})
return new Daemons<Manifest, Ids | Id>(this.effects, this.started, daemons)
this.ids,
options.ready,
this.effects,
)
const daemons = this.daemons.concat(daemon)
const ids = [...this.ids, id] as (Ids | Id)[]
const healthDaemons = [...this.healthDaemons, healthDaemon]
return new Daemons<Manifest, Ids | Id>(
this.effects,
this.started,
daemons,
ids,
healthDaemons,
)
}
async build() {
const daemonsStarted = {} as Record<Ids, Promise<DaemonReturned>>
const { effects } = this
const daemons = this.daemons ?? []
for (const daemon of daemons) {
const requiredPromise = Promise.all(
daemon.requires?.map((id) => daemonsStarted[id]) ?? [],
)
daemonsStarted[daemon.id] = requiredPromise.then(async () => {
const { command, image } = daemon
const child = runDaemon<Manifest>()(effects, image, command, {
env: daemon.env,
mounts: daemon.mounts.build(),
})
let currentInput: TriggerInput = {}
const getCurrentInput = () => currentInput
const trigger = (daemon.ready.trigger ?? defaultTrigger)(
getCurrentInput,
)
return new Promise(async (resolve) => {
for (
let res = await trigger.next();
!res.done;
res = await trigger.next()
) {
const response = await Promise.resolve(daemon.ready.fn()).catch(
(err) =>
({
status: "failure",
message: "message" in err ? err.message : String(err),
}) as CheckResult,
)
currentInput.lastResult = response.status || null
if (!currentInput.hadSuccess && response.status === "success") {
currentInput.hadSuccess = true
resolve(child)
}
}
resolve(child)
})
})
}
this.updateMainHealth()
this.healthDaemons.forEach((x) =>
x.addWatcher(() => this.updateMainHealth()),
)
return {
async term(options?: { signal?: Signals; timeout?: number }) {
await Promise.all(
Object.values<Promise<DaemonReturned>>(daemonsStarted).map((x) =>
x.then((x) => x.term(options)),
),
)
},
async wait() {
await Promise.all(
Object.values<Promise<DaemonReturned>>(daemonsStarted).map((x) =>
x.then((x) => x.wait()),
),
)
term: async (options?: { signal?: Signals; timeout?: number }) => {
try {
await Promise.all(this.healthDaemons.map((x) => x.term(options)))
} finally {
this.effects.setMainStatus({ status: "stopped" })
}
},
}
}
private updateMainHealth() {
if (this.healthDaemons.every((x) => x.health.status === "success")) {
this.effects.setMainStatus({ status: "running" })
} else {
this.effects.setMainStatus({ status: "starting" })
}
}
}

View File

@@ -0,0 +1,152 @@
import { CheckResult } from "../health/checkFns"
import { defaultTrigger } from "../trigger/defaultTrigger"
import { Ready } from "./Daemons"
import { Daemon } from "./Daemon"
import { Effects } from "../types"
const oncePromise = <T>() => {
let resolve: (value: T) => void
const promise = new Promise<T>((res) => {
resolve = res
})
return { resolve: resolve!, promise }
}
/**
* Wanted a structure that deals with controlling daemons by their health status
* States:
* -- Waiting for dependencies to be success
* -- Running: Daemon is running and the status is in the health
*
*/
export class HealthDaemon {
#health: CheckResult = { status: "starting", message: null }
#healthWatchers: Array<() => unknown> = []
#running = false
#hadSuccess = false
constructor(
readonly daemon: Promise<Daemon>,
readonly daemonIndex: number,
readonly dependencies: HealthDaemon[],
readonly id: string,
readonly ids: string[],
readonly ready: Ready,
readonly effects: Effects,
) {
this.updateStatus()
this.dependencies.forEach((d) => d.addWatcher(() => this.updateStatus()))
}
/** Run after we want to do cleanup */
async term(termOptions?: {
signal?: NodeJS.Signals | undefined
timeout?: number | undefined
}) {
this.#healthWatchers = []
this.#running = false
this.#healthCheckCleanup?.()
await this.daemon.then((d) => d.stop(termOptions))
}
/** Want to add another notifier that the health might have changed */
addWatcher(watcher: () => unknown) {
this.#healthWatchers.push(watcher)
}
get health() {
return Object.freeze(this.#health)
}
private async changeRunning(newStatus: boolean) {
if (this.#running === newStatus) return
this.#running = newStatus
if (newStatus) {
;(await this.daemon).start()
this.setupHealthCheck()
} else {
;(await this.daemon).stop()
this.turnOffHealthCheck()
this.setHealth({ status: "starting", message: null })
}
}
#healthCheckCleanup: (() => void) | null = null
private turnOffHealthCheck() {
this.#healthCheckCleanup?.()
}
private async setupHealthCheck() {
if (this.#healthCheckCleanup) return
const trigger = (this.ready.trigger ?? defaultTrigger)(() => ({
hadSuccess: this.#hadSuccess,
lastResult: this.#health.status,
}))
const { promise: status, resolve: setStatus } = oncePromise<{
done: true
}>()
new Promise(async () => {
for (
let res = await Promise.race([status, trigger.next()]);
!res.done;
res = await Promise.race([status, trigger.next()])
) {
const response: CheckResult = await Promise.resolve(
this.ready.fn(),
).catch((err) => {
console.error(err)
return {
status: "failure",
message: "message" in err ? err.message : String(err),
}
})
this.setHealth(response)
if (response.status === "success") {
this.#hadSuccess = true
}
}
}).catch((err) => console.error(`Daemon ${this.id} failed: ${err}`))
this.#healthCheckCleanup = () => {
setStatus({ done: true })
this.#healthCheckCleanup = null
}
}
private setHealth(health: CheckResult) {
this.#health = health
this.#healthWatchers.forEach((watcher) => watcher())
const display = this.ready.display
const status = health.status
if (!display) {
return
}
if (
status === "success" ||
status === "disabled" ||
status === "starting"
) {
this.effects.setHealth({
result: status,
message: health.message,
id: display,
name: display,
})
} else {
this.effects.setHealth({
result: health.status,
message: health.message || "",
id: display,
name: display,
})
}
}
private async updateStatus() {
const healths = this.dependencies.map((d) => d.#health)
this.changeRunning(healths.every((x) => x.status === "success"))
}
}

View File

@@ -1,4 +1,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { Status } from "./Status"
export type SetMainStatus = { status: Status }
export type SetMainStatus = { status: "running" | "stopped" | "starting" }

View File

@@ -1,5 +1,9 @@
import { Effects } from "../types"
import { CheckDependenciesParam, ExecuteAction } from ".././osBindings"
import {
CheckDependenciesParam,
ExecuteAction,
SetMainStatus,
} from ".././osBindings"
import { CreateOverlayedImageParams } from ".././osBindings"
import { DestroyOverlayedImageParams } from ".././osBindings"
import { BindParams } from ".././osBindings"
@@ -66,6 +70,7 @@ describe("startosTypeValidation ", () => {
mount: {} as MountParams,
checkDependencies: {} as CheckDependenciesParam,
getDependencies: undefined,
setMainStatus: {} as SetMainStatus,
})
typeEquality<Parameters<Effects["executeAction"]>[0]>(
testInput as ExecuteAction,

View File

@@ -4,6 +4,7 @@ import {
DependencyRequirement,
SetHealth,
HealthCheckResult,
SetMainStatus,
} from "./osBindings"
import { MainEffects, ServiceInterfaceType, Signals } from "./StartSdk"
@@ -163,7 +164,7 @@ export type CommandType<A extends string> =
| [string, ...string[]]
export type DaemonReturned = {
wait(): Promise<null>
wait(): Promise<unknown>
term(options?: { signal?: Signals; timeout?: number }): Promise<void>
}
@@ -380,6 +381,8 @@ export type Effects = {
}): Promise<void>
}
setMainStatus(o: SetMainStatus): Promise<void>
getSystemSmtp(input: {
callback: (config: unknown, previousConfig: unknown) => void
}): Promise<SmtpValue>

View File

@@ -1,6 +1,6 @@
{
"name": "@start9labs/start-sdk",
"version": "0.4.0-rev0.lib0.rc8.beta10",
"version": "0.3.6-alpha1",
"description": "Software development kit to facilitate packaging services for StartOS",
"main": "./cjs/lib/index.js",
"types": "./cjs/lib/index.d.ts",