reduce task leaking (#2868)

* reduce task leaking

* fix onLeaveContext
This commit is contained in:
Aiden McClelland
2025-04-16 11:00:46 -06:00
committed by GitHub
parent 03f8b73627
commit 89f3fdc05f
18 changed files with 159 additions and 98 deletions

View File

@@ -104,7 +104,18 @@ const rpcRoundFor =
export function makeEffects(context: EffectContext): Effects { export function makeEffects(context: EffectContext): Effects {
const rpcRound = rpcRoundFor(context.procedureId) const rpcRound = rpcRoundFor(context.procedureId)
const self: Effects = { const self: Effects = {
child: (name) =>
makeEffects({ ...context, callbacks: context.callbacks?.child(name) }),
constRetry: context.constRetry, constRetry: context.constRetry,
isInContext: !!context.callbacks,
onLeaveContext:
context.callbacks?.onLeaveContext?.bind(context.callbacks) ||
(() => {
console.warn(
"no context for this effects object",
new Error().stack?.replace(/^Error/, ""),
)
}),
clearCallbacks(...[options]: Parameters<T.Effects["clearCallbacks"]>) { clearCallbacks(...[options]: Parameters<T.Effects["clearCallbacks"]>) {
return rpcRound("clear-callbacks", { return rpcRound("clear-callbacks", {
...options, ...options,
@@ -313,5 +324,14 @@ export function makeEffects(context: EffectContext): Effects {
> >
}, },
} }
self.onLeaveContext(() => {
self.isInContext = false
self.onLeaveContext = () => {
console.warn(
"this effects object is already out of context",
new Error().stack?.replace(/^Error/, ""),
)
}
})
return self return self
} }

View File

@@ -238,21 +238,6 @@ export class RpcListener {
return this._system return this._system
} }
private callbackHolders: Map<string, CallbackHolder> = new Map()
private removeCallbackHolderFor(procedure: string) {
const prev = this.callbackHolders.get(procedure)
if (prev) {
this.callbackHolders.delete(procedure)
this.callbacks?.removeChild(prev)
}
}
private callbackHolderFor(procedure: string): CallbackHolder {
this.removeCallbackHolderFor(procedure)
const callbackHolder = this.callbacks!.child()
this.callbackHolders.set(procedure, callbackHolder)
return callbackHolder
}
callCallback(callback: number, args: any[]): void { callCallback(callback: number, args: any[]): void {
if (this.callbacks) { if (this.callbacks) {
this.callbacks this.callbacks
@@ -302,7 +287,7 @@ export class RpcListener {
return null return null
}) })
.when(startType, async ({ id }) => { .when(startType, async ({ id }) => {
const callbacks = this.callbackHolderFor("main") const callbacks = this.callbacks?.child("main")
const effects = makeEffects({ const effects = makeEffects({
procedureId: null, procedureId: null,
callbacks, callbacks,
@@ -313,7 +298,7 @@ export class RpcListener {
) )
}) })
.when(stopType, async ({ id }) => { .when(stopType, async ({ id }) => {
this.removeCallbackHolderFor("main") this.callbacks?.removeChild("main")
return handleRpc( return handleRpc(
id, id,
this.system.stop().then((result) => ({ result })), this.system.stop().then((result) => ({ result })),
@@ -338,7 +323,7 @@ export class RpcListener {
procedureId: null, procedureId: null,
}), }),
) )
const callbacks = this.callbackHolderFor("containerInit") const callbacks = this.callbacks.child("containerInit")
await system.containerInit( await system.containerInit(
makeEffects({ makeEffects({
procedureId: null, procedureId: null,
@@ -420,7 +405,7 @@ export class RpcListener {
): { result: any } => { ): { result: any } => {
return { result } return { result }
} }
const callbacks = this.callbackHolderFor(procedure) const callbacks = this.callbacks?.child(procedure)
const effects = makeEffects({ const effects = makeEffects({
procedureId, procedureId,
callbacks, callbacks,

View File

@@ -14,7 +14,8 @@ export class CallbackHolder {
constructor(private effects?: T.Effects) {} constructor(private effects?: T.Effects) {}
private callbacks = new Map<number, Function>() private callbacks = new Map<number, Function>()
private children: WeakRef<CallbackHolder>[] = [] private onLeaveContextCallbacks: Function[] = []
private children: Map<string, CallbackHolder> = new Map()
private newId() { private newId() {
return CallbackIdCell.inc++ return CallbackIdCell.inc++
} }
@@ -32,23 +33,25 @@ export class CallbackHolder {
}) })
return id return id
} }
child(): CallbackHolder { child(name: string): CallbackHolder {
this.removeChild(name)
const child = new CallbackHolder() const child = new CallbackHolder()
this.children.push(new WeakRef(child)) this.children.set(name, child)
return child return child
} }
removeChild(child: CallbackHolder) { removeChild(name: string) {
this.children = this.children.filter((c) => { const child = this.children.get(name)
const ref = c.deref() if (child) {
return ref && ref !== child child.leaveContext()
}) this.children.delete(name)
}
} }
private getCallback(index: number): Function | undefined { private getCallback(index: number): Function | undefined {
let callback = this.callbacks.get(index) let callback = this.callbacks.get(index)
if (callback) this.callbacks.delete(index) if (callback) this.callbacks.delete(index)
else { else {
for (let i = 0; i < this.children.length; i++) { for (let [_, child] of this.children) {
callback = this.children[i].deref()?.getCallback(index) callback = child.getCallback(index)
if (callback) return callback if (callback) return callback
} }
} }
@@ -59,4 +62,21 @@ export class CallbackHolder {
if (!callback) return Promise.resolve() if (!callback) return Promise.resolve()
return Promise.resolve().then(() => callback(...args)) return Promise.resolve().then(() => callback(...args))
} }
onLeaveContext(fn: Function) {
this.onLeaveContextCallbacks.push(fn)
}
leaveContext() {
for (let [_, child] of this.children) {
child.leaveContext()
}
this.children = new Map()
for (let fn of this.onLeaveContextCallbacks) {
try {
fn()
} catch (e) {
console.warn(e)
}
}
this.onLeaveContextCallbacks = []
}
} }

View File

@@ -28,7 +28,10 @@ import { UrlString } from "./util/getServiceInterface"
/** Used to reach out from the pure js runtime */ /** Used to reach out from the pure js runtime */
export type Effects = { export type Effects = {
child: (name: string) => Effects
constRetry?: () => void constRetry?: () => void
isInContext: boolean
onLeaveContext: (fn: () => void | null | undefined) => void
clearCallbacks: ( clearCallbacks: (
options: { only: number[] } | { except: number[] }, options: { only: number[] } | { except: number[] },
) => Promise<null> ) => Promise<null>

View File

@@ -99,7 +99,13 @@ export class Action<
async exportMetadata(options: { async exportMetadata(options: {
effects: T.Effects effects: T.Effects
}): Promise<T.ActionMetadata> { }): Promise<T.ActionMetadata> {
const metadata = await callMaybeFn(this.metadataFn, options) const childEffects = options.effects.child(`setupActions/${this.id}`)
childEffects.constRetry = once(() => {
this.exportMetadata(options)
})
const metadata = await callMaybeFn(this.metadataFn, {
effects: childEffects,
})
await options.effects.action.export({ id: this.id, metadata }) await options.effects.action.export({ id: this.id, metadata })
return metadata return metadata
} }
@@ -131,12 +137,6 @@ export class Actions<
return new Actions({ ...this.actions, [action.id]: action }) return new Actions({ ...this.actions, [action.id]: action })
} }
async update(options: { effects: T.Effects }): Promise<null> { async update(options: { effects: T.Effects }): Promise<null> {
options.effects = {
...options.effects,
constRetry: once(() => {
this.update(options) // yes, this reuses the options object, but the const retry function will be overwritten each time, so the once-ness is not a problem
}),
}
for (let action of Object.values(this.actions)) { for (let action of Object.values(this.actions)) {
await action.exportMetadata(options) await action.exportMetadata(options)
} }

View File

@@ -40,13 +40,12 @@ export function setupDependencies<Manifest extends T.SDKManifest>(
): (options: { effects: T.Effects }) => Promise<null> { ): (options: { effects: T.Effects }) => Promise<null> {
const cell = { updater: async (_: { effects: T.Effects }) => null } const cell = { updater: async (_: { effects: T.Effects }) => null }
cell.updater = async (options: { effects: T.Effects }) => { cell.updater = async (options: { effects: T.Effects }) => {
options.effects = { const childEffects = options.effects.child("setupDependencies")
...options.effects, childEffects.constRetry = once(() => {
constRetry: once(() => { cell.updater({ effects: options.effects })
cell.updater(options) })
}),
} const dependencyType = await fn({ effects: childEffects })
const dependencyType = await fn(options)
return await options.effects.setDependencies({ return await options.effects.setDependencies({
dependencies: Object.entries(dependencyType) dependencies: Object.entries(dependencyType)
.map(([k, v]) => [k, v as DependencyRequirement] as const) .map(([k, v]) => [k, v as DependencyRequirement] as const)

View File

@@ -28,24 +28,22 @@ export const setupServiceInterfaces: SetupServiceInterfaces = <
[] as any as Output) as UpdateServiceInterfaces<Output>, [] as any as Output) as UpdateServiceInterfaces<Output>,
} }
cell.updater = (async (options: { effects: T.Effects }) => { cell.updater = (async (options: { effects: T.Effects }) => {
options.effects = { const childEffects = options.effects.child("setupInterfaces")
...options.effects, childEffects.constRetry = once(() => {
constRetry: once(() => { cell.updater({ effects: options.effects })
cell.updater(options) })
}),
}
const bindings: T.BindId[] = [] const bindings: T.BindId[] = []
const interfaces: T.ServiceInterfaceId[] = [] const interfaces: T.ServiceInterfaceId[] = []
const res = await fn({ const res = await fn({
effects: { effects: {
...options.effects, ...childEffects,
bind: (params: T.BindParams) => { bind: (params: T.BindParams) => {
bindings.push({ id: params.id, internalPort: params.internalPort }) bindings.push({ id: params.id, internalPort: params.internalPort })
return options.effects.bind(params) return childEffects.bind(params)
}, },
exportServiceInterface: (params: T.ExportServiceInterfaceParams) => { exportServiceInterface: (params: T.ExportServiceInterfaceParams) => {
interfaces.push(params.id) interfaces.push(params.id)
return options.effects.exportServiceInterface(params) return childEffects.exportServiceInterface(params)
}, },
}, },
}) })

View File

@@ -46,6 +46,9 @@ type EffectsTypeChecker<T extends StringObject = Effects> = {
describe("startosTypeValidation ", () => { describe("startosTypeValidation ", () => {
test(`checking the params match`, () => { test(`checking the params match`, () => {
typeEquality<EffectsTypeChecker>({ typeEquality<EffectsTypeChecker>({
child: "",
isInContext: {} as never,
onLeaveContext: () => {},
clearCallbacks: {} as ClearCallbacksParams, clearCallbacks: {} as ClearCallbacksParams,
action: { action: {
clear: {} as ClearActionsParams, clear: {} as ClearActionsParams,

View File

@@ -25,10 +25,15 @@ export class GetSystemSmtp {
* Watches the system SMTP credentials. Returns an async iterator that yields whenever the value changes * Watches the system SMTP credentials. Returns an async iterator that yields whenever the value changes
*/ */
async *watch() { async *watch() {
while (true) { const resolveCell = { resolve: () => {} }
let callback: () => void this.effects.onLeaveContext(() => {
resolveCell.resolve()
})
while (this.effects.isInContext) {
let callback: () => void = () => {}
const waitForNext = new Promise<void>((resolve) => { const waitForNext = new Promise<void>((resolve) => {
callback = resolve callback = resolve
resolveCell.resolve = resolve
}) })
yield await this.effects.getSystemSmtp({ yield await this.effects.getSystemSmtp({
callback: () => callback(), callback: () => callback(),

View File

@@ -248,10 +248,15 @@ export class GetServiceInterface {
*/ */
async *watch() { async *watch() {
const { id, packageId } = this.opts const { id, packageId } = this.opts
while (true) { const resolveCell = { resolve: () => {} }
this.effects.onLeaveContext(() => {
resolveCell.resolve()
})
while (this.effects.isInContext) {
let callback: () => void = () => {} let callback: () => void = () => {}
const waitForNext = new Promise<void>((resolve) => { const waitForNext = new Promise<void>((resolve) => {
callback = resolve callback = resolve
resolveCell.resolve = resolve
}) })
yield await makeInterfaceFilled({ yield await makeInterfaceFilled({
effects: this.effects, effects: this.effects,

View File

@@ -82,10 +82,15 @@ export class GetServiceInterfaces {
*/ */
async *watch() { async *watch() {
const { packageId } = this.opts const { packageId } = this.opts
while (true) { const resolveCell = { resolve: () => {} }
this.effects.onLeaveContext(() => {
resolveCell.resolve()
})
while (this.effects.isInContext) {
let callback: () => void = () => {} let callback: () => void = () => {}
const waitForNext = new Promise<void>((resolve) => { const waitForNext = new Promise<void>((resolve) => {
callback = resolve callback = resolve
resolveCell.resolve = resolve
}) })
yield await makeManyInterfaceFilled({ yield await makeManyInterfaceFilled({
effects: this.effects, effects: this.effects,

View File

@@ -113,7 +113,12 @@ export class StartSdk<Manifest extends T.SDKManifest, Store> {
| "bind" | "bind"
| "getHostInfo" | "getHostInfo"
type MainUsedEffects = "setMainStatus" | "setHealth" type MainUsedEffects = "setMainStatus" | "setHealth"
type CallbackEffects = "constRetry" | "clearCallbacks" type CallbackEffects =
| "child"
| "constRetry"
| "isInContext"
| "onLeaveContext"
| "clearCallbacks"
type AlreadyExposed = type AlreadyExposed =
| "getSslCertificate" | "getSslCertificate"
| "getSystemSmtp" | "getSystemSmtp"
@@ -211,10 +216,15 @@ export class StartSdk<Manifest extends T.SDKManifest, Store> {
> = {}, > = {},
) => { ) => {
async function* watch() { async function* watch() {
while (true) { const resolveCell = { resolve: () => {} }
effects.onLeaveContext(() => {
resolveCell.resolve()
})
while (effects.isInContext) {
let callback: () => void = () => {} let callback: () => void = () => {}
const waitForNext = new Promise<void>((resolve) => { const waitForNext = new Promise<void>((resolve) => {
callback = resolve callback = resolve
resolveCell.resolve = resolve
}) })
yield await effects.getContainerIp({ ...options, callback }) yield await effects.getContainerIp({ ...options, callback })
await waitForNext await waitForNext

View File

@@ -130,39 +130,37 @@ export class CommandController extends Drop {
return new SubContainerHandle(this.subcontainer) return new SubContainerHandle(this.subcontainer)
} }
async wait({ timeout = NO_TIMEOUT } = {}) { async wait({ timeout = NO_TIMEOUT } = {}) {
const self = this.weak()
if (timeout > 0) if (timeout > 0)
setTimeout(() => { setTimeout(() => {
self.term() this.term()
}, timeout) }, timeout)
try { try {
return await self.runningAnswer return await this.runningAnswer
} finally { } finally {
if (!self.state.exited) { if (!this.state.exited) {
self.process.kill("SIGKILL") this.process.kill("SIGKILL")
} }
await self.subcontainer.destroy().catch((_) => {}) await this.subcontainer.destroy().catch((_) => {})
} }
} }
async term({ signal = SIGTERM, timeout = this.sigtermTimeout } = {}) { async term({ signal = SIGTERM, timeout = this.sigtermTimeout } = {}) {
const self = this.weak()
try { try {
if (!self.state.exited) { if (!this.state.exited) {
if (signal !== "SIGKILL") { if (signal !== "SIGKILL") {
setTimeout(() => { setTimeout(() => {
if (!self.state.exited) self.process.kill("SIGKILL") if (!this.state.exited) this.process.kill("SIGKILL")
}, timeout) }, timeout)
} }
if (!self.process.kill(signal)) { if (!this.process.kill(signal)) {
console.error( console.error(
`failed to send signal ${signal} to pid ${this.process.pid}`, `failed to send signal ${signal} to pid ${this.process.pid}`,
) )
} }
} }
await self.runningAnswer await this.runningAnswer
} finally { } finally {
await self.subcontainer.destroy() await this.subcontainer.destroy()
} }
} }
onDrop(): void { onDrop(): void {

View File

@@ -37,10 +37,15 @@ export class GetStore<Store, StoreValue> {
* Watches the value of Store at the provided path. Returns an async iterator that yields whenever the value changes * Watches the value of Store at the provided path. Returns an async iterator that yields whenever the value changes
*/ */
async *watch() { async *watch() {
while (true) { const resolveCell = { resolve: () => {} }
let callback: () => void this.effects.onLeaveContext(() => {
resolveCell.resolve()
})
while (this.effects.isInContext) {
let callback: () => void = () => {}
const waitForNext = new Promise<void>((resolve) => { const waitForNext = new Promise<void>((resolve) => {
callback = resolve callback = resolve
resolveCell.resolve = resolve
}) })
yield await this.effects.store.get<Store, StoreValue>({ yield await this.effects.store.get<Store, StoreValue>({
...this.options, ...this.options,

View File

@@ -1,7 +1,8 @@
export abstract class Drop { export abstract class Drop {
private static weak: { [id: number]: Drop } = {} private static weak: { [id: number]: Drop } = {}
private static registry = new FinalizationRegistry((id: number) => { private static registry = new FinalizationRegistry((id: number) => {
Drop.weak[id].drop() const weak = Drop.weak[id]
if (weak) weak.drop()
}) })
private static idCtr: number = 0 private static idCtr: number = 0
private id: number private id: number

View File

@@ -34,10 +34,15 @@ export class GetSslCertificate {
* Watches the SSL Certificate for the given hostnames if permitted. Returns an async iterator that yields whenever the value changes * Watches the SSL Certificate for the given hostnames if permitted. Returns an async iterator that yields whenever the value changes
*/ */
async *watch() { async *watch() {
while (true) { const resolveCell = { resolve: () => {} }
let callback: () => void this.effects.onLeaveContext(() => {
resolveCell.resolve()
})
while (this.effects.isInContext) {
let callback: () => void = () => {}
const waitForNext = new Promise<void>((resolve) => { const waitForNext = new Promise<void>((resolve) => {
callback = resolve callback = resolve
resolveCell.resolve = resolve
}) })
yield await this.effects.getSslCertificate({ yield await this.effects.getSslCertificate({
hostnames: this.hostnames, hostnames: this.hostnames,

View File

@@ -4,6 +4,7 @@ import * as cp from "child_process"
import { promisify } from "util" import { promisify } from "util"
import { Buffer } from "node:buffer" import { Buffer } from "node:buffer"
import { once } from "../../../base/lib/util/once" import { once } from "../../../base/lib/util/once"
import { Drop } from "./Drop"
export const execFile = promisify(cp.execFile) export const execFile = promisify(cp.execFile)
const False = () => false const False = () => false
@@ -45,16 +46,8 @@ export interface ExecSpawnable {
* Implements: * Implements:
* @see {@link ExecSpawnable} * @see {@link ExecSpawnable}
*/ */
export class SubContainer implements ExecSpawnable { export class SubContainer extends Drop implements ExecSpawnable {
private static finalizationEffects: { effects?: T.Effects } = {} private destroyed = false
private static registry = new FinalizationRegistry((guid: string) => {
if (this.finalizationEffects.effects) {
this.finalizationEffects.effects.subcontainer
.destroyFs({ guid })
.catch((e) => console.error("failed to cleanup SubContainer", guid, e))
}
})
private leader: cp.ChildProcess private leader: cp.ChildProcess
private leaderExited: boolean = false private leaderExited: boolean = false
private waitProc: () => Promise<null> private waitProc: () => Promise<null>
@@ -64,8 +57,7 @@ export class SubContainer implements ExecSpawnable {
readonly rootfs: string, readonly rootfs: string,
readonly guid: T.Guid, readonly guid: T.Guid,
) { ) {
if (!SubContainer.finalizationEffects.effects) super()
SubContainer.finalizationEffects.effects = effects
this.leaderExited = false this.leaderExited = false
this.leader = cp.spawn("start-cli", ["subcontainer", "launch", rootfs], { this.leader = cp.spawn("start-cli", ["subcontainer", "launch", rootfs], {
killSignal: "SIGKILL", killSignal: "SIGKILL",
@@ -106,7 +98,6 @@ export class SubContainer implements ExecSpawnable {
name, name,
}) })
const res = new SubContainer(effects, imageId, rootfs, guid) const res = new SubContainer(effects, imageId, rootfs, guid)
SubContainer.registry.register(res, guid, res)
const shared = ["dev", "sys"] const shared = ["dev", "sys"]
if (!!sharedRun) { if (!!sharedRun) {
@@ -212,14 +203,20 @@ export class SubContainer implements ExecSpawnable {
get destroy() { get destroy() {
return async () => { return async () => {
const guid = this.guid if (!this.destroyed) {
await this.killLeader() const guid = this.guid
await this.effects.subcontainer.destroyFs({ guid }) await this.killLeader()
SubContainer.registry.unregister(this) await this.effects.subcontainer.destroyFs({ guid })
this.destroyed = true
}
return null return null
} }
} }
onDrop(): void {
this.destroy()
}
async exec( async exec(
command: string[], command: string[],
options?: CommandOptions & ExecOptions, options?: CommandOptions & ExecOptions,

View File

@@ -152,7 +152,7 @@ export class FileHelper<A> {
} }
private async readConst(effects: T.Effects): Promise<A | null> { private async readConst(effects: T.Effects): Promise<A | null> {
const watch = this.readWatch() const watch = this.readWatch(effects)
const res = await watch.next() const res = await watch.next()
if (effects.constRetry) { if (effects.constRetry) {
if (!this.consts.includes(effects.constRetry)) if (!this.consts.includes(effects.constRetry))
@@ -165,9 +165,9 @@ export class FileHelper<A> {
return res.value return res.value
} }
private async *readWatch() { private async *readWatch(effects: T.Effects) {
let res let res
while (true) { while (effects.isInContext) {
if (await exists(this.path)) { if (await exists(this.path)) {
const ctrl = new AbortController() const ctrl = new AbortController()
const watch = fs.watch(this.path, { const watch = fs.watch(this.path, {
@@ -194,10 +194,11 @@ export class FileHelper<A> {
} }
private readOnChange( private readOnChange(
effects: T.Effects,
callback: (value: A | null, error?: Error) => void | Promise<void>, callback: (value: A | null, error?: Error) => void | Promise<void>,
) { ) {
;(async () => { ;(async () => {
for await (const value of this.readWatch()) { for await (const value of this.readWatch(effects)) {
try { try {
await callback(value) await callback(value)
} catch (e) { } catch (e) {
@@ -221,10 +222,11 @@ export class FileHelper<A> {
return { return {
once: () => this.readOnce(), once: () => this.readOnce(),
const: (effects: T.Effects) => this.readConst(effects), const: (effects: T.Effects) => this.readConst(effects),
watch: () => this.readWatch(), watch: (effects: T.Effects) => this.readWatch(effects),
onChange: ( onChange: (
effects: T.Effects,
callback: (value: A | null, error?: Error) => void | Promise<void>, callback: (value: A | null, error?: Error) => void | Promise<void>,
) => this.readOnChange(callback), ) => this.readOnChange(effects, callback),
} }
} }