diff --git a/container-runtime/src/Adapters/EffectCreator.ts b/container-runtime/src/Adapters/EffectCreator.ts index e11cd9d43..7bcf8f08b 100644 --- a/container-runtime/src/Adapters/EffectCreator.ts +++ b/container-runtime/src/Adapters/EffectCreator.ts @@ -104,7 +104,18 @@ const rpcRoundFor = export function makeEffects(context: EffectContext): Effects { const rpcRound = rpcRoundFor(context.procedureId) const self: Effects = { + child: (name) => + makeEffects({ ...context, callbacks: context.callbacks?.child(name) }), constRetry: context.constRetry, + isInContext: !!context.callbacks, + onLeaveContext: + context.callbacks?.onLeaveContext?.bind(context.callbacks) || + (() => { + console.warn( + "no context for this effects object", + new Error().stack?.replace(/^Error/, ""), + ) + }), clearCallbacks(...[options]: Parameters) { return rpcRound("clear-callbacks", { ...options, @@ -313,5 +324,14 @@ export function makeEffects(context: EffectContext): Effects { > }, } + self.onLeaveContext(() => { + self.isInContext = false + self.onLeaveContext = () => { + console.warn( + "this effects object is already out of context", + new Error().stack?.replace(/^Error/, ""), + ) + } + }) return self } diff --git a/container-runtime/src/Adapters/RpcListener.ts b/container-runtime/src/Adapters/RpcListener.ts index dade89ad2..510110b24 100644 --- a/container-runtime/src/Adapters/RpcListener.ts +++ b/container-runtime/src/Adapters/RpcListener.ts @@ -238,21 +238,6 @@ export class RpcListener { return this._system } - private callbackHolders: Map = new Map() - private removeCallbackHolderFor(procedure: string) { - const prev = this.callbackHolders.get(procedure) - if (prev) { - this.callbackHolders.delete(procedure) - this.callbacks?.removeChild(prev) - } - } - private callbackHolderFor(procedure: string): CallbackHolder { - this.removeCallbackHolderFor(procedure) - const callbackHolder = this.callbacks!.child() - this.callbackHolders.set(procedure, callbackHolder) - return callbackHolder - } - callCallback(callback: number, args: any[]): void { if (this.callbacks) { this.callbacks @@ -302,7 +287,7 @@ export class RpcListener { return null }) .when(startType, async ({ id }) => { - const callbacks = this.callbackHolderFor("main") + const callbacks = this.callbacks?.child("main") const effects = makeEffects({ procedureId: null, callbacks, @@ -313,7 +298,7 @@ export class RpcListener { ) }) .when(stopType, async ({ id }) => { - this.removeCallbackHolderFor("main") + this.callbacks?.removeChild("main") return handleRpc( id, this.system.stop().then((result) => ({ result })), @@ -338,7 +323,7 @@ export class RpcListener { procedureId: null, }), ) - const callbacks = this.callbackHolderFor("containerInit") + const callbacks = this.callbacks.child("containerInit") await system.containerInit( makeEffects({ procedureId: null, @@ -420,7 +405,7 @@ export class RpcListener { ): { result: any } => { return { result } } - const callbacks = this.callbackHolderFor(procedure) + const callbacks = this.callbacks?.child(procedure) const effects = makeEffects({ procedureId, callbacks, diff --git a/container-runtime/src/Models/CallbackHolder.ts b/container-runtime/src/Models/CallbackHolder.ts index ce474268a..e33398f53 100644 --- a/container-runtime/src/Models/CallbackHolder.ts +++ b/container-runtime/src/Models/CallbackHolder.ts @@ -14,7 +14,8 @@ export class CallbackHolder { constructor(private effects?: T.Effects) {} private callbacks = new Map() - private children: WeakRef[] = [] + private onLeaveContextCallbacks: Function[] = [] + private children: Map = new Map() private newId() { return CallbackIdCell.inc++ } @@ -32,23 +33,25 @@ export class CallbackHolder { }) return id } - child(): CallbackHolder { + child(name: string): CallbackHolder { + this.removeChild(name) const child = new CallbackHolder() - this.children.push(new WeakRef(child)) + this.children.set(name, child) return child } - removeChild(child: CallbackHolder) { - this.children = this.children.filter((c) => { - const ref = c.deref() - return ref && ref !== child - }) + removeChild(name: string) { + const child = this.children.get(name) + if (child) { + child.leaveContext() + this.children.delete(name) + } } private getCallback(index: number): Function | undefined { let callback = this.callbacks.get(index) if (callback) this.callbacks.delete(index) else { - for (let i = 0; i < this.children.length; i++) { - callback = this.children[i].deref()?.getCallback(index) + for (let [_, child] of this.children) { + callback = child.getCallback(index) if (callback) return callback } } @@ -59,4 +62,21 @@ export class CallbackHolder { if (!callback) return Promise.resolve() return Promise.resolve().then(() => callback(...args)) } + onLeaveContext(fn: Function) { + this.onLeaveContextCallbacks.push(fn) + } + leaveContext() { + for (let [_, child] of this.children) { + child.leaveContext() + } + this.children = new Map() + for (let fn of this.onLeaveContextCallbacks) { + try { + fn() + } catch (e) { + console.warn(e) + } + } + this.onLeaveContextCallbacks = [] + } } diff --git a/sdk/base/lib/Effects.ts b/sdk/base/lib/Effects.ts index 941e57cf9..8ebb59ab5 100644 --- a/sdk/base/lib/Effects.ts +++ b/sdk/base/lib/Effects.ts @@ -28,7 +28,10 @@ import { UrlString } from "./util/getServiceInterface" /** Used to reach out from the pure js runtime */ export type Effects = { + child: (name: string) => Effects constRetry?: () => void + isInContext: boolean + onLeaveContext: (fn: () => void | null | undefined) => void clearCallbacks: ( options: { only: number[] } | { except: number[] }, ) => Promise diff --git a/sdk/base/lib/actions/setupActions.ts b/sdk/base/lib/actions/setupActions.ts index 602530c3d..290a1c0f0 100644 --- a/sdk/base/lib/actions/setupActions.ts +++ b/sdk/base/lib/actions/setupActions.ts @@ -99,7 +99,13 @@ export class Action< async exportMetadata(options: { effects: T.Effects }): Promise { - const metadata = await callMaybeFn(this.metadataFn, options) + const childEffects = options.effects.child(`setupActions/${this.id}`) + childEffects.constRetry = once(() => { + this.exportMetadata(options) + }) + const metadata = await callMaybeFn(this.metadataFn, { + effects: childEffects, + }) await options.effects.action.export({ id: this.id, metadata }) return metadata } @@ -131,12 +137,6 @@ export class Actions< return new Actions({ ...this.actions, [action.id]: action }) } async update(options: { effects: T.Effects }): Promise { - options.effects = { - ...options.effects, - constRetry: once(() => { - this.update(options) // yes, this reuses the options object, but the const retry function will be overwritten each time, so the once-ness is not a problem - }), - } for (let action of Object.values(this.actions)) { await action.exportMetadata(options) } diff --git a/sdk/base/lib/dependencies/setupDependencies.ts b/sdk/base/lib/dependencies/setupDependencies.ts index 4583ae749..13d4adba7 100644 --- a/sdk/base/lib/dependencies/setupDependencies.ts +++ b/sdk/base/lib/dependencies/setupDependencies.ts @@ -40,13 +40,12 @@ export function setupDependencies( ): (options: { effects: T.Effects }) => Promise { const cell = { updater: async (_: { effects: T.Effects }) => null } cell.updater = async (options: { effects: T.Effects }) => { - options.effects = { - ...options.effects, - constRetry: once(() => { - cell.updater(options) - }), - } - const dependencyType = await fn(options) + const childEffects = options.effects.child("setupDependencies") + childEffects.constRetry = once(() => { + cell.updater({ effects: options.effects }) + }) + + const dependencyType = await fn({ effects: childEffects }) return await options.effects.setDependencies({ dependencies: Object.entries(dependencyType) .map(([k, v]) => [k, v as DependencyRequirement] as const) diff --git a/sdk/base/lib/interfaces/setupInterfaces.ts b/sdk/base/lib/interfaces/setupInterfaces.ts index ba284bcb3..df90f7fa7 100644 --- a/sdk/base/lib/interfaces/setupInterfaces.ts +++ b/sdk/base/lib/interfaces/setupInterfaces.ts @@ -28,24 +28,22 @@ export const setupServiceInterfaces: SetupServiceInterfaces = < [] as any as Output) as UpdateServiceInterfaces, } cell.updater = (async (options: { effects: T.Effects }) => { - options.effects = { - ...options.effects, - constRetry: once(() => { - cell.updater(options) - }), - } + const childEffects = options.effects.child("setupInterfaces") + childEffects.constRetry = once(() => { + cell.updater({ effects: options.effects }) + }) const bindings: T.BindId[] = [] const interfaces: T.ServiceInterfaceId[] = [] const res = await fn({ effects: { - ...options.effects, + ...childEffects, bind: (params: T.BindParams) => { bindings.push({ id: params.id, internalPort: params.internalPort }) - return options.effects.bind(params) + return childEffects.bind(params) }, exportServiceInterface: (params: T.ExportServiceInterfaceParams) => { interfaces.push(params.id) - return options.effects.exportServiceInterface(params) + return childEffects.exportServiceInterface(params) }, }, }) diff --git a/sdk/base/lib/test/startosTypeValidation.test.ts b/sdk/base/lib/test/startosTypeValidation.test.ts index 3a66de1e1..58918ec2b 100644 --- a/sdk/base/lib/test/startosTypeValidation.test.ts +++ b/sdk/base/lib/test/startosTypeValidation.test.ts @@ -46,6 +46,9 @@ type EffectsTypeChecker = { describe("startosTypeValidation ", () => { test(`checking the params match`, () => { typeEquality({ + child: "", + isInContext: {} as never, + onLeaveContext: () => {}, clearCallbacks: {} as ClearCallbacksParams, action: { clear: {} as ClearActionsParams, diff --git a/sdk/base/lib/util/GetSystemSmtp.ts b/sdk/base/lib/util/GetSystemSmtp.ts index 2e560f509..457c49d2c 100644 --- a/sdk/base/lib/util/GetSystemSmtp.ts +++ b/sdk/base/lib/util/GetSystemSmtp.ts @@ -25,10 +25,15 @@ export class GetSystemSmtp { * Watches the system SMTP credentials. Returns an async iterator that yields whenever the value changes */ async *watch() { - while (true) { - let callback: () => void + const resolveCell = { resolve: () => {} } + this.effects.onLeaveContext(() => { + resolveCell.resolve() + }) + while (this.effects.isInContext) { + let callback: () => void = () => {} const waitForNext = new Promise((resolve) => { callback = resolve + resolveCell.resolve = resolve }) yield await this.effects.getSystemSmtp({ callback: () => callback(), diff --git a/sdk/base/lib/util/getServiceInterface.ts b/sdk/base/lib/util/getServiceInterface.ts index 58698a5f9..3aa3782d6 100644 --- a/sdk/base/lib/util/getServiceInterface.ts +++ b/sdk/base/lib/util/getServiceInterface.ts @@ -248,10 +248,15 @@ export class GetServiceInterface { */ async *watch() { const { id, packageId } = this.opts - while (true) { + const resolveCell = { resolve: () => {} } + this.effects.onLeaveContext(() => { + resolveCell.resolve() + }) + while (this.effects.isInContext) { let callback: () => void = () => {} const waitForNext = new Promise((resolve) => { callback = resolve + resolveCell.resolve = resolve }) yield await makeInterfaceFilled({ effects: this.effects, diff --git a/sdk/base/lib/util/getServiceInterfaces.ts b/sdk/base/lib/util/getServiceInterfaces.ts index afb87c6d0..13d3f282d 100644 --- a/sdk/base/lib/util/getServiceInterfaces.ts +++ b/sdk/base/lib/util/getServiceInterfaces.ts @@ -82,10 +82,15 @@ export class GetServiceInterfaces { */ async *watch() { const { packageId } = this.opts - while (true) { + const resolveCell = { resolve: () => {} } + this.effects.onLeaveContext(() => { + resolveCell.resolve() + }) + while (this.effects.isInContext) { let callback: () => void = () => {} const waitForNext = new Promise((resolve) => { callback = resolve + resolveCell.resolve = resolve }) yield await makeManyInterfaceFilled({ effects: this.effects, diff --git a/sdk/package/lib/StartSdk.ts b/sdk/package/lib/StartSdk.ts index bdabd0ae6..5c2d99e3a 100644 --- a/sdk/package/lib/StartSdk.ts +++ b/sdk/package/lib/StartSdk.ts @@ -113,7 +113,12 @@ export class StartSdk { | "bind" | "getHostInfo" type MainUsedEffects = "setMainStatus" | "setHealth" - type CallbackEffects = "constRetry" | "clearCallbacks" + type CallbackEffects = + | "child" + | "constRetry" + | "isInContext" + | "onLeaveContext" + | "clearCallbacks" type AlreadyExposed = | "getSslCertificate" | "getSystemSmtp" @@ -211,10 +216,15 @@ export class StartSdk { > = {}, ) => { async function* watch() { - while (true) { + const resolveCell = { resolve: () => {} } + effects.onLeaveContext(() => { + resolveCell.resolve() + }) + while (effects.isInContext) { let callback: () => void = () => {} const waitForNext = new Promise((resolve) => { callback = resolve + resolveCell.resolve = resolve }) yield await effects.getContainerIp({ ...options, callback }) await waitForNext diff --git a/sdk/package/lib/mainFn/CommandController.ts b/sdk/package/lib/mainFn/CommandController.ts index fbbeb9e63..19be60e77 100644 --- a/sdk/package/lib/mainFn/CommandController.ts +++ b/sdk/package/lib/mainFn/CommandController.ts @@ -130,39 +130,37 @@ export class CommandController extends Drop { return new SubContainerHandle(this.subcontainer) } async wait({ timeout = NO_TIMEOUT } = {}) { - const self = this.weak() if (timeout > 0) setTimeout(() => { - self.term() + this.term() }, timeout) try { - return await self.runningAnswer + return await this.runningAnswer } finally { - if (!self.state.exited) { - self.process.kill("SIGKILL") + if (!this.state.exited) { + this.process.kill("SIGKILL") } - await self.subcontainer.destroy().catch((_) => {}) + await this.subcontainer.destroy().catch((_) => {}) } } async term({ signal = SIGTERM, timeout = this.sigtermTimeout } = {}) { - const self = this.weak() try { - if (!self.state.exited) { + if (!this.state.exited) { if (signal !== "SIGKILL") { setTimeout(() => { - if (!self.state.exited) self.process.kill("SIGKILL") + if (!this.state.exited) this.process.kill("SIGKILL") }, timeout) } - if (!self.process.kill(signal)) { + if (!this.process.kill(signal)) { console.error( `failed to send signal ${signal} to pid ${this.process.pid}`, ) } } - await self.runningAnswer + await this.runningAnswer } finally { - await self.subcontainer.destroy() + await this.subcontainer.destroy() } } onDrop(): void { diff --git a/sdk/package/lib/store/getStore.ts b/sdk/package/lib/store/getStore.ts index cd086c979..c2901ee7a 100644 --- a/sdk/package/lib/store/getStore.ts +++ b/sdk/package/lib/store/getStore.ts @@ -37,10 +37,15 @@ export class GetStore { * Watches the value of Store at the provided path. Returns an async iterator that yields whenever the value changes */ async *watch() { - while (true) { - let callback: () => void + const resolveCell = { resolve: () => {} } + this.effects.onLeaveContext(() => { + resolveCell.resolve() + }) + while (this.effects.isInContext) { + let callback: () => void = () => {} const waitForNext = new Promise((resolve) => { callback = resolve + resolveCell.resolve = resolve }) yield await this.effects.store.get({ ...this.options, diff --git a/sdk/package/lib/util/Drop.ts b/sdk/package/lib/util/Drop.ts index e08883580..e0f57ace4 100644 --- a/sdk/package/lib/util/Drop.ts +++ b/sdk/package/lib/util/Drop.ts @@ -1,7 +1,8 @@ export abstract class Drop { private static weak: { [id: number]: Drop } = {} private static registry = new FinalizationRegistry((id: number) => { - Drop.weak[id].drop() + const weak = Drop.weak[id] + if (weak) weak.drop() }) private static idCtr: number = 0 private id: number diff --git a/sdk/package/lib/util/GetSslCertificate.ts b/sdk/package/lib/util/GetSslCertificate.ts index afc12e6b2..33a111b21 100644 --- a/sdk/package/lib/util/GetSslCertificate.ts +++ b/sdk/package/lib/util/GetSslCertificate.ts @@ -34,10 +34,15 @@ export class GetSslCertificate { * Watches the SSL Certificate for the given hostnames if permitted. Returns an async iterator that yields whenever the value changes */ async *watch() { - while (true) { - let callback: () => void + const resolveCell = { resolve: () => {} } + this.effects.onLeaveContext(() => { + resolveCell.resolve() + }) + while (this.effects.isInContext) { + let callback: () => void = () => {} const waitForNext = new Promise((resolve) => { callback = resolve + resolveCell.resolve = resolve }) yield await this.effects.getSslCertificate({ hostnames: this.hostnames, diff --git a/sdk/package/lib/util/SubContainer.ts b/sdk/package/lib/util/SubContainer.ts index 13d91535e..8630c7aa2 100644 --- a/sdk/package/lib/util/SubContainer.ts +++ b/sdk/package/lib/util/SubContainer.ts @@ -4,6 +4,7 @@ import * as cp from "child_process" import { promisify } from "util" import { Buffer } from "node:buffer" import { once } from "../../../base/lib/util/once" +import { Drop } from "./Drop" export const execFile = promisify(cp.execFile) const False = () => false @@ -45,16 +46,8 @@ export interface ExecSpawnable { * Implements: * @see {@link ExecSpawnable} */ -export class SubContainer implements ExecSpawnable { - private static finalizationEffects: { effects?: T.Effects } = {} - private static registry = new FinalizationRegistry((guid: string) => { - if (this.finalizationEffects.effects) { - this.finalizationEffects.effects.subcontainer - .destroyFs({ guid }) - .catch((e) => console.error("failed to cleanup SubContainer", guid, e)) - } - }) - +export class SubContainer extends Drop implements ExecSpawnable { + private destroyed = false private leader: cp.ChildProcess private leaderExited: boolean = false private waitProc: () => Promise @@ -64,8 +57,7 @@ export class SubContainer implements ExecSpawnable { readonly rootfs: string, readonly guid: T.Guid, ) { - if (!SubContainer.finalizationEffects.effects) - SubContainer.finalizationEffects.effects = effects + super() this.leaderExited = false this.leader = cp.spawn("start-cli", ["subcontainer", "launch", rootfs], { killSignal: "SIGKILL", @@ -106,7 +98,6 @@ export class SubContainer implements ExecSpawnable { name, }) const res = new SubContainer(effects, imageId, rootfs, guid) - SubContainer.registry.register(res, guid, res) const shared = ["dev", "sys"] if (!!sharedRun) { @@ -212,14 +203,20 @@ export class SubContainer implements ExecSpawnable { get destroy() { return async () => { - const guid = this.guid - await this.killLeader() - await this.effects.subcontainer.destroyFs({ guid }) - SubContainer.registry.unregister(this) + if (!this.destroyed) { + const guid = this.guid + await this.killLeader() + await this.effects.subcontainer.destroyFs({ guid }) + this.destroyed = true + } return null } } + onDrop(): void { + this.destroy() + } + async exec( command: string[], options?: CommandOptions & ExecOptions, diff --git a/sdk/package/lib/util/fileHelper.ts b/sdk/package/lib/util/fileHelper.ts index 83f46d94d..5df265223 100644 --- a/sdk/package/lib/util/fileHelper.ts +++ b/sdk/package/lib/util/fileHelper.ts @@ -152,7 +152,7 @@ export class FileHelper { } private async readConst(effects: T.Effects): Promise { - const watch = this.readWatch() + const watch = this.readWatch(effects) const res = await watch.next() if (effects.constRetry) { if (!this.consts.includes(effects.constRetry)) @@ -165,9 +165,9 @@ export class FileHelper { return res.value } - private async *readWatch() { + private async *readWatch(effects: T.Effects) { let res - while (true) { + while (effects.isInContext) { if (await exists(this.path)) { const ctrl = new AbortController() const watch = fs.watch(this.path, { @@ -194,10 +194,11 @@ export class FileHelper { } private readOnChange( + effects: T.Effects, callback: (value: A | null, error?: Error) => void | Promise, ) { ;(async () => { - for await (const value of this.readWatch()) { + for await (const value of this.readWatch(effects)) { try { await callback(value) } catch (e) { @@ -221,10 +222,11 @@ export class FileHelper { return { once: () => this.readOnce(), const: (effects: T.Effects) => this.readConst(effects), - watch: () => this.readWatch(), + watch: (effects: T.Effects) => this.readWatch(effects), onChange: ( + effects: T.Effects, callback: (value: A | null, error?: Error) => void | Promise, - ) => this.readOnChange(callback), + ) => this.readOnChange(effects, callback), } }