kill process by session, and add timeout (#2608)

This commit is contained in:
Aiden McClelland
2024-04-23 14:01:40 -06:00
committed by GitHub
parent 7b8a0114f5
commit 3a5ee4a296
6 changed files with 191 additions and 85 deletions

View File

@@ -84,10 +84,19 @@ export class DockerProcedureContainer {
} }
} }
async execSpawn(commands: string[]) { async execFail(commands: string[], timeoutMs: number | null) {
try { try {
const spawned = await this.overlay.spawn(commands) const res = await this.overlay.exec(commands, {}, timeoutMs)
return spawned if (res.exitCode !== 0) {
const codeOrSignal =
res.exitCode !== null
? `code ${res.exitCode}`
: `signal ${res.exitSignal}`
throw new Error(
`Process exited with ${codeOrSignal}: ${res.stderr.toString()}`,
)
}
return res
} finally { } finally {
await this.overlay.destroy() await this.overlay.destroy()
} }

View File

@@ -140,7 +140,7 @@ export class MainLoop {
actionProcedure, actionProcedure,
manifest.volumes, manifest.volumes,
) )
const executed = await container.execSpawn([ const executed = await container.exec([
actionProcedure.entrypoint, actionProcedure.entrypoint,
...actionProcedure.args, ...actionProcedure.args,
JSON.stringify(timeChanged), JSON.stringify(timeChanged),

View File

@@ -263,37 +263,65 @@ export class SystemForEmbassy implements System {
const input = options.input const input = options.input
switch (options.procedure) { switch (options.procedure) {
case "/backup/create": case "/backup/create":
return this.createBackup(effects) return this.createBackup(effects, options.timeout || null)
case "/backup/restore": case "/backup/restore":
return this.restoreBackup(effects) return this.restoreBackup(effects, options.timeout || null)
case "/config/get": case "/config/get":
return this.getConfig(effects) return this.getConfig(effects, options.timeout || null)
case "/config/set": case "/config/set":
return this.setConfig(effects, input) return this.setConfig(effects, input, options.timeout || null)
case "/properties": case "/properties":
return this.properties(effects) return this.properties(effects, options.timeout || null)
case "/actions/metadata": case "/actions/metadata":
return todo() return todo()
case "/init": case "/init":
return this.init(effects, string.optional().unsafeCast(input)) return this.init(
effects,
string.optional().unsafeCast(input),
options.timeout || null,
)
case "/uninit": case "/uninit":
return this.uninit(effects, string.optional().unsafeCast(input)) return this.uninit(
effects,
string.optional().unsafeCast(input),
options.timeout || null,
)
case "/main/start": case "/main/start":
return this.mainStart(effects) return this.mainStart(effects, options.timeout || null)
case "/main/stop": case "/main/stop":
return this.mainStop(effects) return this.mainStop(effects, options.timeout || null)
default: default:
const procedures = unNestPath(options.procedure) const procedures = unNestPath(options.procedure)
switch (true) { switch (true) {
case procedures[1] === "actions" && procedures[3] === "get": case procedures[1] === "actions" && procedures[3] === "get":
return this.action(effects, procedures[2], input) return this.action(
effects,
procedures[2],
input,
options.timeout || null,
)
case procedures[1] === "actions" && procedures[3] === "run": case procedures[1] === "actions" && procedures[3] === "run":
return this.action(effects, procedures[2], input) return this.action(
effects,
procedures[2],
input,
options.timeout || null,
)
case procedures[1] === "dependencies" && procedures[3] === "query": case procedures[1] === "dependencies" && procedures[3] === "query":
return this.dependenciesAutoconfig(effects, procedures[2], input) return this.dependenciesAutoconfig(
effects,
procedures[2],
input,
options.timeout || null,
)
case procedures[1] === "dependencies" && procedures[3] === "update": case procedures[1] === "dependencies" && procedures[3] === "update":
return this.dependenciesAutoconfig(effects, procedures[2], input) return this.dependenciesAutoconfig(
effects,
procedures[2],
input,
options.timeout || null,
)
} }
} }
throw new Error(`Could not find the path for ${options.procedure}`) throw new Error(`Could not find the path for ${options.procedure}`)
@@ -301,8 +329,10 @@ export class SystemForEmbassy implements System {
private async init( private async init(
effects: HostSystemStartOs, effects: HostSystemStartOs,
previousVersion: Optional<string>, previousVersion: Optional<string>,
timeoutMs: number | null,
): Promise<void> { ): Promise<void> {
if (previousVersion) await this.migration(effects, previousVersion) if (previousVersion)
await this.migration(effects, previousVersion, timeoutMs)
await effects.setMainStatus({ status: "stopped" }) await effects.setMainStatus({ status: "stopped" })
await this.exportActions(effects) await this.exportActions(effects)
} }
@@ -337,29 +367,36 @@ export class SystemForEmbassy implements System {
private async uninit( private async uninit(
effects: HostSystemStartOs, effects: HostSystemStartOs,
nextVersion: Optional<string>, nextVersion: Optional<string>,
timeoutMs: number | null,
): Promise<void> { ): Promise<void> {
// TODO Do a migration down if the version exists // TODO Do a migration down if the version exists
await effects.setMainStatus({ status: "stopped" }) await effects.setMainStatus({ status: "stopped" })
} }
private async mainStart(effects: HostSystemStartOs): Promise<void> { private async mainStart(
effects: HostSystemStartOs,
timeoutMs: number | null,
): Promise<void> {
if (!!this.currentRunning) return if (!!this.currentRunning) return
this.currentRunning = new MainLoop(this, effects) this.currentRunning = new MainLoop(this, effects)
} }
private async mainStop( private async mainStop(
effects: HostSystemStartOs, effects: HostSystemStartOs,
options?: { timeout?: number }, timeoutMs: number | null,
): Promise<Duration> { ): Promise<Duration> {
const { currentRunning } = this const { currentRunning } = this
delete this.currentRunning delete this.currentRunning
if (currentRunning) { if (currentRunning) {
await currentRunning.clean({ await currentRunning.clean({
timeout: options?.timeout || this.manifest.main["sigterm-timeout"], timeout: this.manifest.main["sigterm-timeout"],
}) })
} }
return duration(this.manifest.main["sigterm-timeout"], "s") return duration(this.manifest.main["sigterm-timeout"], "s")
} }
private async createBackup(effects: HostSystemStartOs): Promise<void> { private async createBackup(
effects: HostSystemStartOs,
timeoutMs: number | null,
): Promise<void> {
const backup = this.manifest.backup.create const backup = this.manifest.backup.create
if (backup.type === "docker") { if (backup.type === "docker") {
const container = await DockerProcedureContainer.of( const container = await DockerProcedureContainer.of(
@@ -367,7 +404,7 @@ export class SystemForEmbassy implements System {
backup, backup,
this.manifest.volumes, this.manifest.volumes,
) )
await container.exec([backup.entrypoint, ...backup.args]) await container.execFail([backup.entrypoint, ...backup.args], timeoutMs)
} else { } else {
const moduleCode = await this.moduleCode const moduleCode = await this.moduleCode
await moduleCode.createBackup?.( await moduleCode.createBackup?.(
@@ -375,7 +412,10 @@ export class SystemForEmbassy implements System {
) )
} }
} }
private async restoreBackup(effects: HostSystemStartOs): Promise<void> { private async restoreBackup(
effects: HostSystemStartOs,
timeoutMs: number | null,
): Promise<void> {
const restoreBackup = this.manifest.backup.restore const restoreBackup = this.manifest.backup.restore
if (restoreBackup.type === "docker") { if (restoreBackup.type === "docker") {
const container = await DockerProcedureContainer.of( const container = await DockerProcedureContainer.of(
@@ -383,7 +423,10 @@ export class SystemForEmbassy implements System {
restoreBackup, restoreBackup,
this.manifest.volumes, this.manifest.volumes,
) )
await container.exec([restoreBackup.entrypoint, ...restoreBackup.args]) await container.execFail(
[restoreBackup.entrypoint, ...restoreBackup.args],
timeoutMs,
)
} else { } else {
const moduleCode = await this.moduleCode const moduleCode = await this.moduleCode
await moduleCode.restoreBackup?.( await moduleCode.restoreBackup?.(
@@ -391,11 +434,15 @@ export class SystemForEmbassy implements System {
) )
} }
} }
private async getConfig(effects: HostSystemStartOs): Promise<T.ConfigRes> { private async getConfig(
return this.getConfigUncleaned(effects).then(removePointers) effects: HostSystemStartOs,
timeoutMs: number | null,
): Promise<T.ConfigRes> {
return this.getConfigUncleaned(effects, timeoutMs).then(removePointers)
} }
private async getConfigUncleaned( private async getConfigUncleaned(
effects: HostSystemStartOs, effects: HostSystemStartOs,
timeoutMs: number | null,
): Promise<T.ConfigRes> { ): Promise<T.ConfigRes> {
const config = this.manifest.config?.get const config = this.manifest.config?.get
if (!config) return { spec: {} } if (!config) return { spec: {} }
@@ -408,7 +455,10 @@ export class SystemForEmbassy implements System {
// TODO: yaml // TODO: yaml
return JSON.parse( return JSON.parse(
( (
await container.exec([config.entrypoint, ...config.args]) await container.execFail(
[config.entrypoint, ...config.args],
timeoutMs,
)
).stdout.toString(), ).stdout.toString(),
) )
} else { } else {
@@ -427,11 +477,12 @@ export class SystemForEmbassy implements System {
private async setConfig( private async setConfig(
effects: HostSystemStartOs, effects: HostSystemStartOs,
newConfigWithoutPointers: unknown, newConfigWithoutPointers: unknown,
timeoutMs: number | null,
): Promise<void> { ): Promise<void> {
const newConfig = structuredClone(newConfigWithoutPointers) const newConfig = structuredClone(newConfigWithoutPointers)
await updateConfig( await updateConfig(
effects, effects,
await this.getConfigUncleaned(effects).then((x) => x.spec), await this.getConfigUncleaned(effects, timeoutMs).then((x) => x.spec),
newConfig, newConfig,
) )
const setConfigValue = this.manifest.config?.set const setConfigValue = this.manifest.config?.set
@@ -445,11 +496,14 @@ export class SystemForEmbassy implements System {
const answer = matchSetResult.unsafeCast( const answer = matchSetResult.unsafeCast(
JSON.parse( JSON.parse(
( (
await container.exec([ await container.execFail(
setConfigValue.entrypoint, [
...setConfigValue.args, setConfigValue.entrypoint,
JSON.stringify(newConfig), ...setConfigValue.args,
]) JSON.stringify(newConfig),
],
timeoutMs,
)
).stdout.toString(), ).stdout.toString(),
), ),
) )
@@ -508,6 +562,7 @@ export class SystemForEmbassy implements System {
private async migration( private async migration(
effects: HostSystemStartOs, effects: HostSystemStartOs,
fromVersion: string, fromVersion: string,
timeoutMs: number | null,
): Promise<T.MigrationRes> { ): Promise<T.MigrationRes> {
const fromEmver = EmVer.from(fromVersion) const fromEmver = EmVer.from(fromVersion)
const currentEmver = EmVer.from(this.manifest.version) const currentEmver = EmVer.from(this.manifest.version)
@@ -542,11 +597,14 @@ export class SystemForEmbassy implements System {
) )
return JSON.parse( return JSON.parse(
( (
await container.exec([ await container.execFail(
procedure.entrypoint, [
...procedure.args, procedure.entrypoint,
JSON.stringify(fromVersion), ...procedure.args,
]) JSON.stringify(fromVersion),
],
timeoutMs,
)
).stdout.toString(), ).stdout.toString(),
) )
} else if (procedure.type === "script") { } else if (procedure.type === "script") {
@@ -568,6 +626,7 @@ export class SystemForEmbassy implements System {
} }
private async properties( private async properties(
effects: HostSystemStartOs, effects: HostSystemStartOs,
timeoutMs: number | null,
): Promise<ReturnType<T.ExpectedExports.Properties>> { ): Promise<ReturnType<T.ExpectedExports.Properties>> {
// TODO BLU-J set the properties ever so often // TODO BLU-J set the properties ever so often
const setConfigValue = this.manifest.properties const setConfigValue = this.manifest.properties
@@ -581,10 +640,10 @@ export class SystemForEmbassy implements System {
const properties = matchProperties.unsafeCast( const properties = matchProperties.unsafeCast(
JSON.parse( JSON.parse(
( (
await container.exec([ await container.execFail(
setConfigValue.entrypoint, [setConfigValue.entrypoint, ...setConfigValue.args],
...setConfigValue.args, timeoutMs,
]) )
).stdout.toString(), ).stdout.toString(),
), ),
) )
@@ -609,6 +668,7 @@ export class SystemForEmbassy implements System {
effects: HostSystemStartOs, effects: HostSystemStartOs,
healthId: string, healthId: string,
timeSinceStarted: unknown, timeSinceStarted: unknown,
timeoutMs: number | null,
): Promise<void> { ): Promise<void> {
const healthProcedure = this.manifest["health-checks"][healthId] const healthProcedure = this.manifest["health-checks"][healthId]
if (!healthProcedure) return if (!healthProcedure) return
@@ -620,11 +680,14 @@ export class SystemForEmbassy implements System {
) )
return JSON.parse( return JSON.parse(
( (
await container.exec([ await container.execFail(
healthProcedure.entrypoint, [
...healthProcedure.args, healthProcedure.entrypoint,
JSON.stringify(timeSinceStarted), ...healthProcedure.args,
]) JSON.stringify(timeSinceStarted),
],
timeoutMs,
)
).stdout.toString(), ).stdout.toString(),
) )
} else if (healthProcedure.type === "script") { } else if (healthProcedure.type === "script") {
@@ -645,6 +708,7 @@ export class SystemForEmbassy implements System {
effects: HostSystemStartOs, effects: HostSystemStartOs,
actionId: string, actionId: string,
formData: unknown, formData: unknown,
timeoutMs: number | null,
): Promise<T.ActionResult> { ): Promise<T.ActionResult> {
const actionProcedure = this.manifest.actions?.[actionId]?.implementation const actionProcedure = this.manifest.actions?.[actionId]?.implementation
if (!actionProcedure) return { message: "Action not found", value: null } if (!actionProcedure) return { message: "Action not found", value: null }
@@ -656,11 +720,14 @@ export class SystemForEmbassy implements System {
) )
return JSON.parse( return JSON.parse(
( (
await container.exec([ await container.execFail(
actionProcedure.entrypoint, [
...actionProcedure.args, actionProcedure.entrypoint,
JSON.stringify(formData), ...actionProcedure.args,
]) JSON.stringify(formData),
],
timeoutMs,
)
).stdout.toString(), ).stdout.toString(),
) )
} else { } else {
@@ -681,6 +748,7 @@ export class SystemForEmbassy implements System {
effects: HostSystemStartOs, effects: HostSystemStartOs,
id: string, id: string,
oldConfig: unknown, oldConfig: unknown,
timeoutMs: number | null,
): Promise<object> { ): Promise<object> {
const actionProcedure = this.manifest.dependencies?.[id]?.config?.check const actionProcedure = this.manifest.dependencies?.[id]?.config?.check
if (!actionProcedure) return { message: "Action not found", value: null } if (!actionProcedure) return { message: "Action not found", value: null }
@@ -692,11 +760,14 @@ export class SystemForEmbassy implements System {
) )
return JSON.parse( return JSON.parse(
( (
await container.exec([ await container.execFail(
actionProcedure.entrypoint, [
...actionProcedure.args, actionProcedure.entrypoint,
JSON.stringify(oldConfig), ...actionProcedure.args,
]) JSON.stringify(oldConfig),
],
timeoutMs,
)
).stdout.toString(), ).stdout.toString(),
) )
} else if (actionProcedure.type === "script") { } else if (actionProcedure.type === "script") {
@@ -722,7 +793,9 @@ export class SystemForEmbassy implements System {
effects: HostSystemStartOs, effects: HostSystemStartOs,
id: string, id: string,
oldConfig: unknown, oldConfig: unknown,
timeoutMs: number | null,
): Promise<void> { ): Promise<void> {
// TODO: docker
const moduleCode = await this.moduleCode const moduleCode = await this.moduleCode
const method = moduleCode.dependencies?.[id]?.autoConfigure const method = moduleCode.dependencies?.[id]?.autoConfigure
if (!method) if (!method)

View File

@@ -539,6 +539,7 @@ fn chroot(
cmd.env(k, v); cmd.env(k, v);
} }
} }
nix::unistd::setsid().with_kind(ErrorKind::Lxc)?; // TODO: error code
std::os::unix::fs::chroot(path)?; std::os::unix::fs::chroot(path)?;
if let Some(uid) = user.as_deref().and_then(|u| u.parse::<u32>().ok()) { if let Some(uid) = user.as_deref().and_then(|u| u.parse::<u32>().ok()) {
cmd.uid(uid); cmd.uid(uid);

View File

@@ -15,13 +15,6 @@ import * as CP from "node:child_process"
const cpExec = promisify(CP.exec) const cpExec = promisify(CP.exec)
const cpExecFile = promisify(CP.execFile) const cpExecFile = promisify(CP.execFile)
async function psTree(pid: number, overlay: Overlay): Promise<number[]> {
const { stdout } = await cpExec(`pstree -p ${pid}`)
const regex: RegExp = /\((\d+)\)/g
return [...stdout.toString().matchAll(regex)].map(([_all, pid]) =>
parseInt(pid),
)
}
type Daemon< type Daemon<
Manifest extends SDKManifest, Manifest extends SDKManifest,
Ids extends string, Ids extends string,
@@ -81,19 +74,15 @@ export const runDaemon =
const pid = childProcess.pid const pid = childProcess.pid
return { return {
async wait() { async wait() {
const pids = pid ? await psTree(pid, overlay) : []
try { try {
return await answer return await answer
} finally { } finally {
for (const process of pids) { await cpExecFile("pkill", ["-9", "-s", String(pid)]).catch((_) => {})
cpExecFile("kill", [`-9`, String(process)]).catch((_) => {})
}
} }
}, },
async term({ signal = SIGTERM, timeout = NO_TIMEOUT } = {}) { async term({ signal = SIGTERM, timeout = NO_TIMEOUT } = {}) {
const pids = pid ? await psTree(pid, overlay) : []
try { try {
childProcess.kill(signal) await cpExecFile("pkill", [`-${signal}`, "-s", String(pid)])
if (timeout > NO_TIMEOUT) { if (timeout > NO_TIMEOUT) {
const didTimeout = await Promise.race([ const didTimeout = await Promise.race([
@@ -103,7 +92,9 @@ export const runDaemon =
answer.then(() => false), answer.then(() => false),
]) ])
if (didTimeout) { if (didTimeout) {
childProcess.kill(SIGKILL) await cpExecFile("pkill", [`-9`, "-s", String(pid)]).catch(
(_) => {},
)
} }
} else { } else {
await answer await answer
@@ -111,16 +102,6 @@ export const runDaemon =
} finally { } finally {
await overlay.destroy() await overlay.destroy()
} }
try {
for (const process of pids) {
await cpExecFile("kill", [`-${signal}`, String(process)])
}
} finally {
for (const process of pids) {
cpExecFile("kill", [`-9`, String(process)]).catch((_) => {})
}
}
}, },
} }
} }

View File

@@ -70,7 +70,13 @@ export class Overlay {
async exec( async exec(
command: string[], command: string[],
options?: CommandOptions, options?: CommandOptions,
): Promise<{ stdout: string | Buffer; stderr: string | Buffer }> { timeoutMs: number | null = 30000,
): Promise<{
exitCode: number | null
exitSignal: NodeJS.Signals | null
stdout: string | Buffer
stderr: string | Buffer
}> {
const imageMeta = await fs const imageMeta = await fs
.readFile(`/media/startos/images/${this.imageId}.json`, { .readFile(`/media/startos/images/${this.imageId}.json`, {
encoding: "utf8", encoding: "utf8",
@@ -87,7 +93,7 @@ export class Overlay {
workdir = options.cwd workdir = options.cwd
delete options.cwd delete options.cwd
} }
return await execFile( const child = cp.spawn(
"start-cli", "start-cli",
[ [
"chroot", "chroot",
@@ -97,8 +103,44 @@ export class Overlay {
this.rootfs, this.rootfs,
...command, ...command,
], ],
options, options || {},
) )
const pid = child.pid
const stdout = { data: "" as string | Buffer }
const stderr = { data: "" as string | Buffer }
const appendData =
(appendTo: { data: string | Buffer }) =>
(chunk: string | Buffer | any) => {
if (typeof appendTo.data === "string" && typeof chunk === "string") {
appendTo.data += chunk
} else if (typeof chunk === "string" || chunk instanceof Buffer) {
appendTo.data = Buffer.concat([
Buffer.from(appendTo.data),
Buffer.from(chunk),
])
} else {
console.error("received unexpected chunk", chunk)
}
}
return new Promise((resolve, reject) => {
child.on("error", reject)
if (timeoutMs !== null && pid) {
setTimeout(
() => execFile("pkill", ["-9", "-s", String(pid)]).catch((_) => {}),
timeoutMs,
)
}
child.stdout.on("data", appendData(stdout))
child.stderr.on("data", appendData(stderr))
child.on("exit", (code, signal) =>
resolve({
exitCode: code,
exitSignal: signal,
stdout: stdout.data,
stderr: stderr.data,
}),
)
})
} }
async spawn( async spawn(