import { NO_TIMEOUT, SIGKILL, SIGTERM, Signals } from "../StartSdk" import { HealthReceipt } from "../health/HealthReceipt" import { CheckResult } from "../health/checkFns" import { SDKManifest } from "../manifest/ManifestTypes" import { Trigger } from "../trigger" import { TriggerInput } from "../trigger/TriggerInput" import { defaultTrigger } from "../trigger/defaultTrigger" import { DaemonReturned, Effects, ValidIfNoStupidEscape } from "../types" import { Mounts } from "./Mounts" import { CommandOptions, MountOptions, Overlay } from "../util/Overlay" import { splitCommand } from "../util/splitCommand" import { promisify } from "node:util" import * as CP from "node:child_process" const cpExec = promisify(CP.exec) const cpExecFile = promisify(CP.execFile) type Daemon< Manifest extends SDKManifest, Ids extends string, Command extends string, Id extends string, > = { id: "" extends Id ? never : Id command: ValidIfNoStupidEscape | [string, ...string[]] image: { id: Manifest["images"][number]; sharedRun?: boolean } mounts: Mounts env?: Record ready: { display: string | null fn: () => Promise | CheckResult trigger?: Trigger } requires: Exclude[] } type ErrorDuplicateId = `The id '${Id}' is already used` export const runDaemon = () => async ( effects: Effects, image: { id: Manifest["images"][number]; sharedRun?: boolean }, command: ValidIfNoStupidEscape | [string, ...string[]], options: CommandOptions & { mounts?: { path: string; options: MountOptions }[] overlay?: Overlay }, ): Promise => { const commands = splitCommand(command) const overlay = options.overlay || (await Overlay.of(effects, image)) for (let mount of options.mounts || []) { await overlay.mount(mount.options, mount.path) } const childProcess = await overlay.spawn(commands, { env: options.env, }) const answer = new Promise((resolve, reject) => { childProcess.stdout.on("data", (data: any) => { console.log(data.toString()) }) childProcess.stderr.on("data", (data: any) => { console.error(data.toString()) }) childProcess.on("exit", (code: any) => { if (code === 0) { return resolve(null) } return reject(new Error(`${commands[0]} exited with code ${code}`)) }) }) const pid = childProcess.pid return { async wait() { try { return await answer } finally { await cpExecFile("pkill", ["-9", "-s", String(pid)]).catch((_) => {}) } }, async term({ signal = SIGTERM, timeout = NO_TIMEOUT } = {}) { try { await cpExecFile("pkill", [`-${signal}`, "-s", String(pid)]) if (timeout > NO_TIMEOUT) { const didTimeout = await Promise.race([ new Promise((resolve) => setTimeout(resolve, timeout)).then( () => true, ), answer.then(() => false), ]) if (didTimeout) { await cpExecFile("pkill", [`-9`, "-s", String(pid)]).catch( (_) => {}, ) } } else { await answer } } finally { await overlay.destroy() } }, } } /** * A class for defining and controlling the service daemons ```ts Daemons.of({ effects, started, interfaceReceipt, // Provide the interfaceReceipt to prove it was completed healthReceipts, // Provide the healthReceipts or [] to prove they were at least considered }).addDaemon('webui', { command: 'hello-world', // The command to start the daemon ready: { display: 'Web Interface', // The function to run to determine the health status of the daemon fn: () => checkPortListening(effects, 80, { successMessage: 'The web interface is ready', errorMessage: 'The web interface is not ready', }), }, requires: [], }) ``` */ export class Daemons { private constructor( readonly effects: Effects, readonly started: (onTerm: () => PromiseLike) => PromiseLike, readonly daemons?: Daemon[], ) {} /** * Returns an empty new Daemons class with the provided config. * * Call .addDaemon() on the returned class to add a daemon. * * Daemons run in the order they are defined, with latter daemons being capable of * depending on prior daemons * @param config * @returns */ static of(config: { effects: Effects started: (onTerm: () => PromiseLike) => PromiseLike healthReceipts: HealthReceipt[] }) { return new Daemons(config.effects, config.started) } /** * Returns the complete list of daemons, including the one defined here * @param id * @param newDaemon * @returns */ addDaemon( // prettier-ignore id: "" extends Id ? never : ErrorDuplicateId extends Id ? never : Id extends Ids ? ErrorDuplicateId : Id, newDaemon: Omit, "id">, ) { const daemons = ((this?.daemons ?? []) as any[]).concat({ ...newDaemon, id, }) return new Daemons(this.effects, this.started, daemons) } async build() { const daemonsStarted = {} as Record> const { effects } = this const daemons = this.daemons ?? [] for (const daemon of daemons) { const requiredPromise = Promise.all( daemon.requires?.map((id) => daemonsStarted[id]) ?? [], ) daemonsStarted[daemon.id] = requiredPromise.then(async () => { const { command, image } = daemon const child = runDaemon()(effects, image, command, { env: daemon.env, mounts: daemon.mounts.build(), }) let currentInput: TriggerInput = {} const getCurrentInput = () => currentInput const trigger = (daemon.ready.trigger ?? defaultTrigger)( getCurrentInput, ) return new Promise(async (resolve) => { for ( let res = await trigger.next(); !res.done; res = await trigger.next() ) { const response = await Promise.resolve(daemon.ready.fn()).catch( (err) => ({ status: "failure", message: "message" in err ? err.message : String(err), }) as CheckResult, ) currentInput.lastResult = response.status || null if (!currentInput.hadSuccess && response.status === "success") { currentInput.hadSuccess = true resolve(child) } } resolve(child) }) }) } return { async term(options?: { signal?: Signals; timeout?: number }) { await Promise.all( Object.values>(daemonsStarted).map((x) => x.then((x) => x.term(options)), ), ) }, async wait() { await Promise.all( Object.values>(daemonsStarted).map((x) => x.then((x) => x.wait()), ), ) }, } } }