diff --git a/sdk/base/lib/util/GetContainerIp.ts b/sdk/base/lib/util/GetContainerIp.ts index dfec071ad..7fb2cee2f 100644 --- a/sdk/base/lib/util/GetContainerIp.ts +++ b/sdk/base/lib/util/GetContainerIp.ts @@ -12,7 +12,7 @@ export class GetContainerIp extends Watchable { super(effects) } - protected call(callback?: () => void) { + protected fetch(callback?: () => void) { return this.effects.getContainerIp({ ...this.opts, callback }) } } diff --git a/sdk/base/lib/util/GetHostInfo.ts b/sdk/base/lib/util/GetHostInfo.ts index ef67f03fc..bd822781c 100644 --- a/sdk/base/lib/util/GetHostInfo.ts +++ b/sdk/base/lib/util/GetHostInfo.ts @@ -12,7 +12,7 @@ export class GetHostInfo extends Watchable { super(effects) } - protected call(callback?: () => void) { + protected fetch(callback?: () => void) { return this.effects.getHostInfo({ ...this.opts, callback }) } } diff --git a/sdk/base/lib/util/GetOutboundGateway.ts b/sdk/base/lib/util/GetOutboundGateway.ts index 5a4cecb50..7d8d70880 100644 --- a/sdk/base/lib/util/GetOutboundGateway.ts +++ b/sdk/base/lib/util/GetOutboundGateway.ts @@ -8,7 +8,7 @@ export class GetOutboundGateway extends Watchable { super(effects) } - protected call(callback?: () => void) { + protected fetch(callback?: () => void) { return this.effects.getOutboundGateway({ callback }) } } diff --git a/sdk/base/lib/util/GetServiceManifest.ts b/sdk/base/lib/util/GetServiceManifest.ts index 7a99e5aa0..2afa60fbb 100644 --- a/sdk/base/lib/util/GetServiceManifest.ts +++ b/sdk/base/lib/util/GetServiceManifest.ts @@ -1,18 +1,47 @@ import { Effects } from '../Effects' import { Manifest, PackageId } from '../osBindings' +import { deepEqual } from './deepEqual' import { Watchable } from './Watchable' -export class GetServiceManifest extends Watchable { +export class GetServiceManifest< + Mapped = Manifest | null, +> extends Watchable { protected readonly label = 'GetServiceManifest' constructor( effects: Effects, readonly opts: { packageId: PackageId }, + options?: { + map?: (value: Manifest | null) => Mapped + eq?: (a: Mapped, b: Mapped) => boolean + }, ) { - super(effects) + super(effects, options) } - protected call(callback?: () => void) { + protected fetch(callback?: () => void) { return this.effects.getServiceManifest({ ...this.opts, callback }) } } + +export function getServiceManifest( + effects: Effects, + packageId: PackageId, +): GetServiceManifest +export function getServiceManifest( + effects: Effects, + packageId: PackageId, + map: (manifest: Manifest | null) => Mapped, + eq?: (a: Mapped, b: Mapped) => boolean, +): GetServiceManifest +export function getServiceManifest( + effects: Effects, + packageId: PackageId, + map?: (manifest: Manifest | null) => Mapped, + eq?: (a: Mapped, b: Mapped) => boolean, +): GetServiceManifest { + return new GetServiceManifest(effects, { packageId }, { + map: map ?? ((a) => a as Mapped), + eq: eq ?? ((a, b) => deepEqual(a, b)), + }) +} diff --git a/sdk/base/lib/util/GetSslCertificate.ts b/sdk/base/lib/util/GetSslCertificate.ts index 08d5b10c0..72daee306 100644 --- a/sdk/base/lib/util/GetSslCertificate.ts +++ b/sdk/base/lib/util/GetSslCertificate.ts @@ -14,7 +14,7 @@ export class GetSslCertificate extends Watchable<[string, string, string]> { super(effects) } - protected call(callback?: () => void) { + protected fetch(callback?: () => void) { return this.effects.getSslCertificate({ ...this.opts, callback }) } } diff --git a/sdk/base/lib/util/GetStatus.ts b/sdk/base/lib/util/GetStatus.ts index 365217977..c1d3df38a 100644 --- a/sdk/base/lib/util/GetStatus.ts +++ b/sdk/base/lib/util/GetStatus.ts @@ -12,7 +12,7 @@ export class GetStatus extends Watchable { super(effects) } - protected call(callback?: () => void) { + protected fetch(callback?: () => void) { return this.effects.getStatus({ ...this.opts, callback }) } } diff --git a/sdk/base/lib/util/GetSystemSmtp.ts b/sdk/base/lib/util/GetSystemSmtp.ts index b45263549..2da804437 100644 --- a/sdk/base/lib/util/GetSystemSmtp.ts +++ b/sdk/base/lib/util/GetSystemSmtp.ts @@ -9,7 +9,7 @@ export class GetSystemSmtp extends Watchable { super(effects) } - protected call(callback?: () => void) { + protected fetch(callback?: () => void) { return this.effects.getSystemSmtp({ callback }) } } diff --git a/sdk/base/lib/util/Watchable.ts b/sdk/base/lib/util/Watchable.ts index e6e5fc427..22c2d3581 100644 --- a/sdk/base/lib/util/Watchable.ts +++ b/sdk/base/lib/util/Watchable.ts @@ -1,55 +1,124 @@ import { Effects } from '../Effects' import { AbortedError } from './AbortedError' +import { deepEqual } from './deepEqual' import { DropGenerator, DropPromise } from './Drop' -export abstract class Watchable { - constructor(readonly effects: Effects) {} +export abstract class Watchable { + protected readonly mapFn: (value: Raw) => Mapped + protected readonly eqFn: (a: Mapped, b: Mapped) => boolean - protected abstract call(callback?: () => void): Promise + constructor( + readonly effects: Effects, + options?: { + map?: (value: Raw) => Mapped + eq?: (a: Mapped, b: Mapped) => boolean + }, + ) { + this.mapFn = options?.map ?? ((a) => a as unknown as Mapped) + this.eqFn = options?.eq ?? ((a, b) => deepEqual(a, b)) + } + + /** + * Fetch the current value, optionally registering a callback for change notification. + * The callback should be invoked when the underlying data changes. + */ + protected abstract fetch(callback?: () => void): Promise protected abstract readonly label: string /** - * Returns the value. Reruns the context from which it has been called if the underlying value changes + * Produce a stream of raw values. Default implementation uses fetch() with + * effects callback in a loop. Override for custom subscription mechanisms + * (e.g. fs.watch). */ - const(): Promise { - return this.call( - this.effects.constRetry && - (() => this.effects.constRetry && this.effects.constRetry()), - ) - } - - /** - * Returns the value. Does nothing if the value changes - */ - once(): Promise { - return this.call() - } - - private async *watchGen(abort?: AbortSignal) { + protected async *produce(abort: AbortSignal): AsyncGenerator { const resolveCell = { resolve: () => {} } this.effects.onLeaveContext(() => { resolveCell.resolve() }) - abort?.addEventListener('abort', () => resolveCell.resolve()) - while (this.effects.isInContext && !abort?.aborted) { + abort.addEventListener('abort', () => resolveCell.resolve()) + while (this.effects.isInContext && !abort.aborted) { let callback: () => void = () => {} const waitForNext = new Promise((resolve) => { callback = resolve resolveCell.resolve = resolve }) - yield await this.call(() => callback()) + yield await this.fetch(() => callback()) await waitForNext } - return new Promise((_, rej) => rej(new AbortedError())) + } + + /** + * Lifecycle hook called when const() registers a subscription. + * Return a cleanup function to be called when the subscription ends. + * Override for side effects like FileHelper's consts tracking. + */ + protected onConstRegistered(_value: Mapped): (() => void) | void {} + + /** + * Internal generator that maps raw values and deduplicates using eq. + */ + private async *watchGen( + abort: AbortSignal, + ): AsyncGenerator { + let prev: { value: Mapped } | null = null + for await (const raw of this.produce(abort)) { + if (abort.aborted) return + const mapped = this.mapFn(raw) + if (!prev || !this.eqFn(prev.value, mapped)) { + prev = { value: mapped } + yield mapped + } + } + } + + /** + * Returns the value. Reruns the context from which it has been called if the underlying value changes + */ + async const(): Promise { + const abort = new AbortController() + const gen = this.watchGen(abort.signal) + const res = await gen.next() + const value = res.value as Mapped + if (this.effects.constRetry) { + const constRetry = this.effects.constRetry + const cleanup = this.onConstRegistered(value) + gen.next().then( + () => { + abort.abort() + cleanup?.() + constRetry() + }, + () => { + abort.abort() + cleanup?.() + }, + ) + } else { + abort.abort() + } + return value + } + + /** + * Returns the value. Does nothing if the value changes + */ + async once(): Promise { + return this.mapFn(await this.fetch()) } /** * Watches the value. Returns an async iterator that yields whenever the value changes */ - watch(abort?: AbortSignal): AsyncGenerator { + watch(abort?: AbortSignal): AsyncGenerator { const ctrl = new AbortController() abort?.addEventListener('abort', () => ctrl.abort()) - return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) + return DropGenerator.of( + (async function* (gen): AsyncGenerator { + yield* gen + throw new AbortedError() + })(this.watchGen(ctrl.signal)), + () => ctrl.abort(), + ) } /** @@ -57,13 +126,13 @@ export abstract class Watchable { */ onChange( callback: ( - value: T | undefined, + value: Mapped | undefined, error?: Error, ) => { cancel: boolean } | Promise<{ cancel: boolean }>, ) { ;(async () => { const ctrl = new AbortController() - for await (const value of this.watch(ctrl.signal)) { + for await (const value of this.watchGen(ctrl.signal)) { try { const res = await callback(value) if (res.cancel) { @@ -90,7 +159,7 @@ export abstract class Watchable { /** * Watches the value. Returns when the predicate is true */ - waitFor(pred: (value: T) => boolean): Promise { + waitFor(pred: (value: Mapped) => boolean): Promise { const ctrl = new AbortController() return DropPromise.of( Promise.resolve().then(async () => { diff --git a/sdk/base/lib/util/getServiceInterface.ts b/sdk/base/lib/util/getServiceInterface.ts index 944e1c6b6..527dc09dc 100644 --- a/sdk/base/lib/util/getServiceInterface.ts +++ b/sdk/base/lib/util/getServiceInterface.ts @@ -8,11 +8,10 @@ import { HostnameInfo, } from '../types' import { Effects } from '../Effects' -import { AbortedError } from './AbortedError' -import { DropGenerator, DropPromise } from './Drop' import { IpAddress, IPV6_LINK_LOCAL } from './ip' import { deepEqual } from './deepEqual' import { once } from './once' +import { Watchable } from './Watchable' export type UrlString = string export type HostId = string @@ -440,136 +439,29 @@ const makeInterfaceFilled = async ({ return interfaceFilled } -export class GetServiceInterface { +export class GetServiceInterface< + Mapped = ServiceInterfaceFilled | null, +> extends Watchable { + protected readonly label = 'GetServiceInterface' + constructor( - readonly effects: Effects, + effects: Effects, readonly opts: { id: string; packageId?: string }, - readonly map: (interfaces: ServiceInterfaceFilled | null) => Mapped, - readonly eq: (a: Mapped, b: Mapped) => boolean, - ) {} - - /** - * Returns the requested service interface. Reruns the context from which it has been called if the underlying value changes - */ - async const() { - let abort = new AbortController() - const watch = this.watch(abort.signal) - const res = await watch.next() - if (this.effects.constRetry) { - watch - .next() - .then(() => { - abort.abort() - this.effects.constRetry && this.effects.constRetry() - }) - .catch() - } - return res.value - } - /** - * Returns the requested service interface. Does nothing if the value changes - */ - async once() { - const { id, packageId } = this.opts - const interfaceFilled = await makeInterfaceFilled({ - effects: this.effects, - id, - packageId, - }) - - return this.map(interfaceFilled) - } - - private async *watchGen(abort?: AbortSignal) { - let prev = null as { value: Mapped } | null - const { id, packageId } = this.opts - const resolveCell = { resolve: () => {} } - this.effects.onLeaveContext(() => { - resolveCell.resolve() - }) - abort?.addEventListener('abort', () => resolveCell.resolve()) - while (this.effects.isInContext && !abort?.aborted) { - let callback: () => void = () => {} - const waitForNext = new Promise((resolve) => { - callback = resolve - resolveCell.resolve = resolve - }) - const next = this.map( - await makeInterfaceFilled({ - effects: this.effects, - id, - packageId, - callback, - }), - ) - if (!prev || !this.eq(prev.value, next)) { - yield next - } - await waitForNext - } - return new Promise((_, rej) => rej(new AbortedError())) - } - - /** - * Watches the requested service interface. Returns an async iterator that yields whenever the value changes - */ - watch(abort?: AbortSignal): AsyncGenerator { - const ctrl = new AbortController() - abort?.addEventListener('abort', () => ctrl.abort()) - return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) - } - - /** - * Watches the requested service interface. Takes a custom callback function to run whenever the value changes - */ - onChange( - callback: ( - value: Mapped | null, - error?: Error, - ) => { cancel: boolean } | Promise<{ cancel: boolean }>, + options?: { + map?: (value: ServiceInterfaceFilled | null) => Mapped + eq?: (a: Mapped, b: Mapped) => boolean + }, ) { - ;(async () => { - const ctrl = new AbortController() - for await (const value of this.watch(ctrl.signal)) { - try { - const res = await callback(value) - if (res.cancel) { - ctrl.abort() - break - } - } catch (e) { - console.error( - 'callback function threw an error @ GetServiceInterface.onChange', - e, - ) - } - } - })() - .catch((e) => callback(null, e)) - .catch((e) => - console.error( - 'callback function threw an error @ GetServiceInterface.onChange', - e, - ), - ) + super(effects, options) } - /** - * Watches the requested service interface. Returns when the predicate is true - */ - waitFor(pred: (value: Mapped) => boolean): Promise { - const ctrl = new AbortController() - return DropPromise.of( - Promise.resolve().then(async () => { - for await (const next of this.watchGen(ctrl.signal)) { - if (pred(next)) { - return next - } - } - throw new Error('context left before predicate passed') - }), - () => ctrl.abort(), - ) + protected fetch(callback?: () => void) { + return makeInterfaceFilled({ + effects: this.effects, + id: this.opts.id, + packageId: this.opts.packageId, + callback, + }) } } @@ -589,12 +481,10 @@ export function getOwnServiceInterface( map?: (interfaces: ServiceInterfaceFilled | null) => Mapped, eq?: (a: Mapped, b: Mapped) => boolean, ): GetServiceInterface { - return new GetServiceInterface( - effects, - { id }, - map ?? ((a) => a as Mapped), - eq ?? ((a, b) => deepEqual(a, b)), - ) + return new GetServiceInterface(effects, { id }, { + map: map ?? ((a) => a as Mapped), + eq: eq ?? ((a, b) => deepEqual(a, b)), + }) } export function getServiceInterface( @@ -613,10 +503,8 @@ export function getServiceInterface( map?: (interfaces: ServiceInterfaceFilled | null) => Mapped, eq?: (a: Mapped, b: Mapped) => boolean, ): GetServiceInterface { - return new GetServiceInterface( - effects, - opts, - map ?? ((a) => a as Mapped), - eq ?? ((a, b) => deepEqual(a, b)), - ) + return new GetServiceInterface(effects, opts, { + map: map ?? ((a) => a as Mapped), + eq: eq ?? ((a, b) => deepEqual(a, b)), + }) } diff --git a/sdk/base/lib/util/getServiceInterfaces.ts b/sdk/base/lib/util/getServiceInterfaces.ts index e6a745d56..73a176808 100644 --- a/sdk/base/lib/util/getServiceInterfaces.ts +++ b/sdk/base/lib/util/getServiceInterfaces.ts @@ -1,9 +1,8 @@ import { Effects } from '../Effects' import { PackageId } from '../osBindings' -import { AbortedError } from './AbortedError' import { deepEqual } from './deepEqual' -import { DropGenerator, DropPromise } from './Drop' import { ServiceInterfaceFilled, filledAddress } from './getServiceInterface' +import { Watchable } from './Watchable' const makeManyInterfaceFilled = async ({ effects, @@ -40,139 +39,34 @@ const makeManyInterfaceFilled = async ({ return serviceInterfacesFilled } -export class GetServiceInterfaces { +export class GetServiceInterfaces< + Mapped = ServiceInterfaceFilled[], +> extends Watchable { + protected readonly label = 'GetServiceInterfaces' + constructor( - readonly effects: Effects, + effects: Effects, readonly opts: { packageId?: string }, - readonly map: (interfaces: ServiceInterfaceFilled[]) => Mapped, - readonly eq: (a: Mapped, b: Mapped) => boolean, - ) {} - - /** - * Returns the service interfaces for the package. Reruns the context from which it has been called if the underlying value changes - */ - async const() { - let abort = new AbortController() - const watch = this.watch(abort.signal) - const res = await watch.next() - if (this.effects.constRetry) { - watch - .next() - .then(() => { - abort.abort() - this.effects.constRetry && this.effects.constRetry() - }) - .catch() - } - return res.value - } - /** - * Returns the service interfaces for the package. Does nothing if the value changes - */ - async once() { - const { packageId } = this.opts - const interfaceFilled: ServiceInterfaceFilled[] = - await makeManyInterfaceFilled({ - effects: this.effects, - packageId, - }) - - return this.map(interfaceFilled) - } - - private async *watchGen(abort?: AbortSignal) { - let prev = null as { value: Mapped } | null - const { packageId } = this.opts - const resolveCell = { resolve: () => {} } - this.effects.onLeaveContext(() => { - resolveCell.resolve() - }) - abort?.addEventListener('abort', () => resolveCell.resolve()) - while (this.effects.isInContext && !abort?.aborted) { - let callback: () => void = () => {} - const waitForNext = new Promise((resolve) => { - callback = resolve - resolveCell.resolve = resolve - }) - const next = this.map( - await makeManyInterfaceFilled({ - effects: this.effects, - packageId, - callback, - }), - ) - if (!prev || !this.eq(prev.value, next)) { - yield next - } - await waitForNext - } - return new Promise((_, rej) => rej(new AbortedError())) - } - - /** - * Watches the service interfaces for the package. Returns an async iterator that yields whenever the value changes - */ - watch(abort?: AbortSignal): AsyncGenerator { - const ctrl = new AbortController() - abort?.addEventListener('abort', () => ctrl.abort()) - return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) - } - - /** - * Watches the service interfaces for the package. Takes a custom callback function to run whenever the value changes - */ - onChange( - callback: ( - value: Mapped | null, - error?: Error, - ) => { cancel: boolean } | Promise<{ cancel: boolean }>, + options?: { + map?: (value: ServiceInterfaceFilled[]) => Mapped + eq?: (a: Mapped, b: Mapped) => boolean + }, ) { - ;(async () => { - const ctrl = new AbortController() - for await (const value of this.watch(ctrl.signal)) { - try { - const res = await callback(value) - if (res.cancel) { - ctrl.abort() - break - } - } catch (e) { - console.error( - 'callback function threw an error @ GetServiceInterfaces.onChange', - e, - ) - } - } - })() - .catch((e) => callback(null, e)) - .catch((e) => - console.error( - 'callback function threw an error @ GetServiceInterfaces.onChange', - e, - ), - ) + super(effects, options) } - /** - * Watches the service interfaces for the package. Returns when the predicate is true - */ - waitFor(pred: (value: Mapped) => boolean): Promise { - const ctrl = new AbortController() - return DropPromise.of( - Promise.resolve().then(async () => { - for await (const next of this.watchGen(ctrl.signal)) { - if (pred(next)) { - return next - } - } - throw new Error('context left before predicate passed') - }), - () => ctrl.abort(), - ) + protected fetch(callback?: () => void) { + return makeManyInterfaceFilled({ + effects: this.effects, + packageId: this.opts.packageId, + callback, + }) } } -export function getOwnServiceInterfaces(effects: Effects): GetServiceInterfaces +export function getOwnServiceInterfaces( + effects: Effects, +): GetServiceInterfaces export function getOwnServiceInterfaces( effects: Effects, map: (interfaces: ServiceInterfaceFilled[]) => Mapped, @@ -183,12 +77,10 @@ export function getOwnServiceInterfaces( map?: (interfaces: ServiceInterfaceFilled[]) => Mapped, eq?: (a: Mapped, b: Mapped) => boolean, ): GetServiceInterfaces { - return new GetServiceInterfaces( - effects, - {}, - map ?? ((a) => a as Mapped), - eq ?? ((a, b) => deepEqual(a, b)), - ) + return new GetServiceInterfaces(effects, {}, { + map: map ?? ((a) => a as Mapped), + eq: eq ?? ((a, b) => deepEqual(a, b)), + }) } export function getServiceInterfaces( @@ -207,10 +99,8 @@ export function getServiceInterfaces( map?: (interfaces: ServiceInterfaceFilled[]) => Mapped, eq?: (a: Mapped, b: Mapped) => boolean, ): GetServiceInterfaces { - return new GetServiceInterfaces( - effects, - opts, - map ?? ((a) => a as Mapped), - eq ?? ((a, b) => deepEqual(a, b)), - ) + return new GetServiceInterfaces(effects, opts, { + map: map ?? ((a) => a as Mapped), + eq: eq ?? ((a, b) => deepEqual(a, b)), + }) } diff --git a/sdk/base/lib/util/index.ts b/sdk/base/lib/util/index.ts index 3c4606f14..22cf037ba 100644 --- a/sdk/base/lib/util/index.ts +++ b/sdk/base/lib/util/index.ts @@ -19,7 +19,7 @@ export { Watchable } from './Watchable' export { GetContainerIp } from './GetContainerIp' export { GetHostInfo } from './GetHostInfo' export { GetOutboundGateway } from './GetOutboundGateway' -export { GetServiceManifest } from './GetServiceManifest' +export { GetServiceManifest, getServiceManifest } from './GetServiceManifest' export { GetSslCertificate } from './GetSslCertificate' export { GetStatus } from './GetStatus' export { GetSystemSmtp } from './GetSystemSmtp' diff --git a/sdk/package/lib/StartSdk.ts b/sdk/package/lib/StartSdk.ts index 9450e6b8e..4566b2666 100644 --- a/sdk/package/lib/StartSdk.ts +++ b/sdk/package/lib/StartSdk.ts @@ -59,7 +59,8 @@ import { setupOnInit, setupOnUninit, } from '../../base/lib/inits' -import { DropGenerator } from '../../base/lib/util/Drop' +import { GetContainerIp } from '../../base/lib/util/GetContainerIp' +import { GetStatus } from '../../base/lib/util/GetStatus' import { getOwnServiceInterface, ServiceInterfaceFilled, @@ -257,90 +258,7 @@ export class StartSdk { Parameters[0], 'callback' > = {}, - ) => { - async function* watch(abort?: AbortSignal) { - const resolveCell = { resolve: () => {} } - effects.onLeaveContext(() => { - resolveCell.resolve() - }) - abort?.addEventListener('abort', () => resolveCell.resolve()) - while (effects.isInContext && !abort?.aborted) { - let callback: () => void = () => {} - const waitForNext = new Promise((resolve) => { - callback = resolve - resolveCell.resolve = resolve - }) - yield await effects.getContainerIp({ ...options, callback }) - await waitForNext - } - } - return { - const: () => - effects.getContainerIp({ - ...options, - callback: - effects.constRetry && - (() => effects.constRetry && effects.constRetry()), - }), - once: () => effects.getContainerIp(options), - watch: (abort?: AbortSignal) => { - const ctrl = new AbortController() - abort?.addEventListener('abort', () => ctrl.abort()) - return DropGenerator.of(watch(ctrl.signal), () => ctrl.abort()) - }, - onChange: ( - callback: ( - value: string | null, - error?: Error, - ) => { cancel: boolean } | Promise<{ cancel: boolean }>, - ) => { - ;(async () => { - const ctrl = new AbortController() - for await (const value of watch(ctrl.signal)) { - try { - const res = await callback(value) - if (res.cancel) { - ctrl.abort() - break - } - } catch (e) { - console.error( - 'callback function threw an error @ getContainerIp.onChange', - e, - ) - } - } - })() - .catch((e) => callback(null, e)) - .catch((e) => - console.error( - 'callback function threw an error @ getContainerIp.onChange', - e, - ), - ) - }, - waitFor: async (pred: (value: string | null) => boolean) => { - const resolveCell = { resolve: () => {} } - effects.onLeaveContext(() => { - resolveCell.resolve() - }) - while (effects.isInContext) { - let callback: () => void = () => {} - const waitForNext = new Promise((resolve) => { - callback = resolve - resolveCell.resolve = resolve - }) - const res = await effects.getContainerIp({ ...options, callback }) - if (pred(res)) { - resolveCell.resolve() - return res - } - await waitForNext - } - return null - }, - } - }, + ) => new GetContainerIp(effects, options), /** * Get the service's current status with reactive subscription support. @@ -355,90 +273,7 @@ export class StartSdk { getStatus: ( effects: T.Effects, options: Omit[0], 'callback'> = {}, - ) => { - async function* watch(abort?: AbortSignal) { - const resolveCell = { resolve: () => {} } - effects.onLeaveContext(() => { - resolveCell.resolve() - }) - abort?.addEventListener('abort', () => resolveCell.resolve()) - while (effects.isInContext && !abort?.aborted) { - let callback: () => void = () => {} - const waitForNext = new Promise((resolve) => { - callback = resolve - resolveCell.resolve = resolve - }) - yield await effects.getStatus({ ...options, callback }) - await waitForNext - } - } - return { - const: () => - effects.getStatus({ - ...options, - callback: - effects.constRetry && - (() => effects.constRetry && effects.constRetry()), - }), - once: () => effects.getStatus(options), - watch: (abort?: AbortSignal) => { - const ctrl = new AbortController() - abort?.addEventListener('abort', () => ctrl.abort()) - return DropGenerator.of(watch(ctrl.signal), () => ctrl.abort()) - }, - onChange: ( - callback: ( - value: T.StatusInfo | null, - error?: Error, - ) => { cancel: boolean } | Promise<{ cancel: boolean }>, - ) => { - ;(async () => { - const ctrl = new AbortController() - for await (const value of watch(ctrl.signal)) { - try { - const res = await callback(value) - if (res.cancel) { - ctrl.abort() - break - } - } catch (e) { - console.error( - 'callback function threw an error @ getStatus.onChange', - e, - ) - } - } - })() - .catch((e) => callback(null, e)) - .catch((e) => - console.error( - 'callback function threw an error @ getStatus.onChange', - e, - ), - ) - }, - waitFor: async (pred: (value: T.StatusInfo | null) => boolean) => { - const resolveCell = { resolve: () => {} } - effects.onLeaveContext(() => { - resolveCell.resolve() - }) - while (effects.isInContext) { - let callback: () => void = () => {} - const waitForNext = new Promise((resolve) => { - callback = resolve - resolveCell.resolve = resolve - }) - const res = await effects.getStatus({ ...options, callback }) - if (pred(res)) { - resolveCell.resolve() - return res - } - await waitForNext - } - return null - }, - } - }, + ) => new GetStatus(effects, options), MultiHost: { /** @@ -646,7 +481,7 @@ export class StartSdk { effects: E, hostnames: string[], algorithm?: T.Algorithm, - ) => new GetSslCertificate(effects, hostnames, algorithm), + ) => new GetSslCertificate(effects, { hostnames, algorithm }), /** Retrieve the manifest of any installed service package by its ID */ getServiceManifest, healthCheck: { diff --git a/sdk/package/lib/util/GetServiceManifest.ts b/sdk/package/lib/util/GetServiceManifest.ts deleted file mode 100644 index 9f85570d2..000000000 --- a/sdk/package/lib/util/GetServiceManifest.ts +++ /dev/null @@ -1,156 +0,0 @@ -import { Effects } from '../../../base/lib/Effects' -import { Manifest, PackageId } from '../../../base/lib/osBindings' -import { AbortedError } from '../../../base/lib/util/AbortedError' -import { DropGenerator, DropPromise } from '../../../base/lib/util/Drop' -import { deepEqual } from '../../../base/lib/util/deepEqual' - -export class GetServiceManifest { - constructor( - readonly effects: Effects, - readonly packageId: PackageId, - readonly map: (manifest: Manifest | null) => Mapped, - readonly eq: (a: Mapped, b: Mapped) => boolean, - ) {} - - /** - * Returns the manifest of a service. Reruns the context from which it has been called if the underlying value changes - */ - async const() { - let abort = new AbortController() - const watch = this.watch(abort.signal) - const res = await watch.next() - if (this.effects.constRetry) { - watch - .next() - .then(() => { - abort.abort() - this.effects.constRetry && this.effects.constRetry() - }) - .catch() - } - return res.value - } - /** - * Returns the manifest of a service. Does nothing if it changes - */ - async once() { - const manifest = await this.effects.getServiceManifest({ - packageId: this.packageId, - }) - return this.map(manifest) - } - - private async *watchGen(abort?: AbortSignal) { - let prev = null as { value: Mapped } | null - const resolveCell = { resolve: () => {} } - this.effects.onLeaveContext(() => { - resolveCell.resolve() - }) - abort?.addEventListener('abort', () => resolveCell.resolve()) - while (this.effects.isInContext && !abort?.aborted) { - let callback: () => void = () => {} - const waitForNext = new Promise((resolve) => { - callback = resolve - resolveCell.resolve = resolve - }) - const next = this.map( - await this.effects.getServiceManifest({ - packageId: this.packageId, - callback: () => callback(), - }), - ) - if (!prev || !this.eq(prev.value, next)) { - prev = { value: next } - yield next - } - await waitForNext - } - return new Promise((_, rej) => rej(new AbortedError())) - } - - /** - * Watches the manifest of a service. Returns an async iterator that yields whenever the value changes - */ - watch(abort?: AbortSignal): AsyncGenerator { - const ctrl = new AbortController() - abort?.addEventListener('abort', () => ctrl.abort()) - return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) - } - - /** - * Watches the manifest of a service. Takes a custom callback function to run whenever it changes - */ - onChange( - callback: ( - value: Mapped | null, - error?: Error, - ) => { cancel: boolean } | Promise<{ cancel: boolean }>, - ) { - ;(async () => { - const ctrl = new AbortController() - for await (const value of this.watch(ctrl.signal)) { - try { - const res = await callback(value) - if (res.cancel) { - ctrl.abort() - break - } - } catch (e) { - console.error( - 'callback function threw an error @ GetServiceManifest.onChange', - e, - ) - } - } - })() - .catch((e) => callback(null, e)) - .catch((e) => - console.error( - 'callback function threw an error @ GetServiceManifest.onChange', - e, - ), - ) - } - - /** - * Watches the manifest of a service. Returns when the predicate is true - */ - waitFor(pred: (value: Mapped) => boolean): Promise { - const ctrl = new AbortController() - return DropPromise.of( - Promise.resolve().then(async () => { - for await (const next of this.watchGen(ctrl.signal)) { - if (pred(next)) { - return next - } - } - throw new Error('context left before predicate passed') - }), - () => ctrl.abort(), - ) - } -} - -export function getServiceManifest( - effects: Effects, - packageId: PackageId, -): GetServiceManifest -export function getServiceManifest( - effects: Effects, - packageId: PackageId, - map: (manifest: Manifest | null) => Mapped, - eq?: (a: Mapped, b: Mapped) => boolean, -): GetServiceManifest -export function getServiceManifest( - effects: Effects, - packageId: PackageId, - map?: (manifest: Manifest | null) => Mapped, - eq?: (a: Mapped, b: Mapped) => boolean, -): GetServiceManifest { - return new GetServiceManifest( - effects, - packageId, - map ?? ((a) => a as Mapped), - eq ?? ((a, b) => deepEqual(a, b)), - ) -} diff --git a/sdk/package/lib/util/GetSslCertificate.ts b/sdk/package/lib/util/GetSslCertificate.ts deleted file mode 100644 index b9967bf22..000000000 --- a/sdk/package/lib/util/GetSslCertificate.ts +++ /dev/null @@ -1,122 +0,0 @@ -import { T } from '..' -import { Effects } from '../../../base/lib/Effects' -import { AbortedError } from '../../../base/lib/util/AbortedError' -import { DropGenerator, DropPromise } from '../../../base/lib/util/Drop' - -export class GetSslCertificate { - constructor( - readonly effects: Effects, - readonly hostnames: string[], - readonly algorithm?: T.Algorithm, - ) {} - - /** - * Returns the an SSL Certificate for the given hostnames if permitted. Restarts the service if it changes - */ - const() { - return this.effects.getSslCertificate({ - hostnames: this.hostnames, - algorithm: this.algorithm, - callback: - this.effects.constRetry && - (() => this.effects.constRetry && this.effects.constRetry()), - }) - } - /** - * Returns the an SSL Certificate for the given hostnames if permitted. Does nothing if it changes - */ - once() { - return this.effects.getSslCertificate({ - hostnames: this.hostnames, - algorithm: this.algorithm, - }) - } - - private async *watchGen(abort?: AbortSignal) { - const resolveCell = { resolve: () => {} } - this.effects.onLeaveContext(() => { - resolveCell.resolve() - }) - abort?.addEventListener('abort', () => resolveCell.resolve()) - while (this.effects.isInContext && !abort?.aborted) { - let callback: () => void = () => {} - const waitForNext = new Promise((resolve) => { - callback = resolve - resolveCell.resolve = resolve - }) - yield await this.effects.getSslCertificate({ - hostnames: this.hostnames, - algorithm: this.algorithm, - callback: () => callback(), - }) - await waitForNext - } - return new Promise((_, rej) => rej(new AbortedError())) - } - - /** - * Watches the SSL Certificate for the given hostnames if permitted. Returns an async iterator that yields whenever the value changes - */ - watch( - abort?: AbortSignal, - ): AsyncGenerator<[string, string, string], never, unknown> { - const ctrl = new AbortController() - abort?.addEventListener('abort', () => ctrl.abort()) - return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) - } - - /** - * Watches the SSL Certificate for the given hostnames if permitted. Takes a custom callback function to run whenever it changes - */ - onChange( - callback: ( - value: [string, string, string] | null, - error?: Error, - ) => { cancel: boolean } | Promise<{ cancel: boolean }>, - ) { - ;(async () => { - const ctrl = new AbortController() - for await (const value of this.watch(ctrl.signal)) { - try { - const res = await callback(value) - if (res.cancel) { - ctrl.abort() - break - } - } catch (e) { - console.error( - 'callback function threw an error @ GetSslCertificate.onChange', - e, - ) - } - } - })() - .catch((e) => callback(null, e)) - .catch((e) => - console.error( - 'callback function threw an error @ GetSslCertificate.onChange', - e, - ), - ) - } - - /** - * Watches the SSL Certificate for the given hostnames if permitted. Returns when the predicate is true - */ - waitFor( - pred: (value: [string, string, string] | null) => boolean, - ): Promise<[string, string, string] | null> { - const ctrl = new AbortController() - return DropPromise.of( - Promise.resolve().then(async () => { - for await (const next of this.watchGen(ctrl.signal)) { - if (pred(next)) { - return next - } - } - return null - }), - () => ctrl.abort(), - ) - } -} diff --git a/sdk/package/lib/util/fileHelper.ts b/sdk/package/lib/util/fileHelper.ts index 6b428edfd..1bcde0c90 100644 --- a/sdk/package/lib/util/fileHelper.ts +++ b/sdk/package/lib/util/fileHelper.ts @@ -4,8 +4,8 @@ import * as TOML from '@iarna/toml' import * as INI from 'ini' import * as T from '../../../base/lib/types' import * as fs from 'node:fs/promises' -import { AbortedError, asError, deepEqual } from '../../../base/lib/util' -import { DropGenerator, DropPromise } from '../../../base/lib/util/Drop' +import { asError, deepEqual } from '../../../base/lib/util' +import { Watchable } from '../../../base/lib/util/Watchable' import { PathBase } from './Volume' const previousPath = /(.+?)\/([^/]*)$/ @@ -228,132 +228,72 @@ export class FileHelper { return map(this.validate(data)) } - private async readConst( + private createFileWatchable( effects: T.Effects, map: (value: A) => B, - eq: (left: B | null | undefined, right: B | null) => boolean, - ): Promise { - const watch = this.readWatch(effects, map, eq) - const res = await watch.next() - if (effects.constRetry) { - const record: (typeof this.consts)[number] = [ - effects.constRetry, - res.value, - map, - eq, - ] - this.consts.push(record) - watch - .next() - .then(() => { - this.consts = this.consts.filter((r) => r !== record) - effects.constRetry && effects.constRetry() - }) - .catch() - } - return res.value - } - - private async *readWatch( - effects: T.Effects, - map: (value: A) => B, - eq: (left: B | null | undefined, right: B | null) => boolean, - abort?: AbortSignal, + eq: (left: B | null, right: B | null) => boolean, ) { - let prev: { value: B | null } | null = null - while (effects.isInContext && !abort?.aborted) { - if (await exists(this.path)) { - const ctrl = new AbortController() - abort?.addEventListener('abort', () => ctrl.abort()) - const watch = fs.watch(this.path, { - persistent: false, - signal: ctrl.signal, - }) - const newRes = await this.readOnce(map) - const listen = Promise.resolve() - .then(async () => { - for await (const _ of watch) { - ctrl.abort() - return null - } - }) - .catch((e) => console.error(asError(e))) - if (!prev || !eq(prev.value, newRes)) { - console.error('yielding', JSON.stringify({ prev: prev, newRes })) - yield newRes - } - prev = { value: newRes } - await listen - } else { - yield null - await onCreated(this.path).catch((e) => console.error(asError(e))) - } + const doRead = async (): Promise => { + const data = await this.readFile() + if (!data) return null + return this.validate(data) } - return new Promise((_, rej) => rej(new AbortedError())) - } + const filePath = this.path + const fileHelper = this - private readOnChange( - effects: T.Effects, - callback: ( - value: B | null, - error?: Error, - ) => { cancel: boolean } | Promise<{ cancel: boolean }>, - map: (value: A) => B, - eq: (left: B | null | undefined, right: B | null) => boolean, - ) { - ;(async () => { - const ctrl = new AbortController() - for await (const value of this.readWatch(effects, map, eq, ctrl.signal)) { - try { - const res = await callback(value) - if (res.cancel) ctrl.abort() - } catch (e) { - console.error( - 'callback function threw an error @ FileHelper.read.onChange', - e, - ) - } + const wrappedMap = (raw: A | null): B | null => { + if (raw === null) return null + return map(raw) + } + + return new (class extends Watchable { + protected readonly label = 'FileHelper' + + protected async fetch() { + return doRead() } - })() - .catch((e) => callback(null, e)) - .catch((e) => - console.error( - 'callback function threw an error @ FileHelper.read.onChange', - e, - ), - ) - } - private readWaitFor( - effects: T.Effects, - pred: (value: B | null, error?: Error) => boolean, - map: (value: A) => B, - ): Promise { - const ctrl = new AbortController() - return DropPromise.of( - Promise.resolve().then(async () => { - const watch = this.readWatch(effects, map, (_) => false, ctrl.signal) - while (true) { - try { - const res = await watch.next() - if (pred(res.value)) { - ctrl.abort() - return res.value - } - if (res.done) { - break - } - } catch (e) { - if (pred(null, e as Error)) { - break - } + protected async *produce( + abort: AbortSignal, + ): AsyncGenerator { + while (this.effects.isInContext && !abort.aborted) { + if (await exists(filePath)) { + const ctrl = new AbortController() + abort.addEventListener('abort', () => ctrl.abort()) + const watch = fs.watch(filePath, { + persistent: false, + signal: ctrl.signal, + }) + yield await doRead() + await Promise.resolve() + .then(async () => { + for await (const _ of watch) { + ctrl.abort() + return null + } + }) + .catch((e) => console.error(asError(e))) + } else { + yield null + await onCreated(filePath).catch((e) => console.error(asError(e))) } } - ctrl.abort() - return null - }), - () => ctrl.abort(), - ) + } + + protected onConstRegistered(value: B | null): (() => void) | void { + if (!this.effects.constRetry) return + const record: (typeof fileHelper.consts)[number] = [ + this.effects.constRetry, + value, + wrappedMap, + eq, + ] + fileHelper.consts.push(record) + return () => { + fileHelper.consts = fileHelper.consts.filter((r) => r !== record) + } + } + })(effects, { map: wrappedMap, eq }) } /** @@ -372,7 +312,7 @@ export class FileHelper { read(): ReadType read( map: (value: A) => B, - eq?: (left: B | null | undefined, right: B | null) => boolean, + eq?: (left: B | null, right: B | null) => boolean, ): ReadType read( map?: (value: A) => any, @@ -382,24 +322,19 @@ export class FileHelper { eq = eq ?? deepEqual return { once: () => this.readOnce(map), - const: (effects: T.Effects) => this.readConst(effects, map, eq), - watch: (effects: T.Effects, abort?: AbortSignal) => { - const ctrl = new AbortController() - abort?.addEventListener('abort', () => ctrl.abort()) - return DropGenerator.of( - this.readWatch(effects, map, eq, ctrl.signal), - () => ctrl.abort(), - ) - }, + const: (effects: T.Effects) => + this.createFileWatchable(effects, map, eq).const(), + watch: (effects: T.Effects, abort?: AbortSignal) => + this.createFileWatchable(effects, map, eq).watch(abort), onChange: ( effects: T.Effects, callback: ( value: A | null, error?: Error, ) => { cancel: boolean } | Promise<{ cancel: boolean }>, - ) => this.readOnChange(effects, callback, map, eq), + ) => this.createFileWatchable(effects, map, eq).onChange(callback), waitFor: (effects: T.Effects, pred: (value: A | null) => boolean) => - this.readWaitFor(effects, pred, map), + this.createFileWatchable(effects, map, eq).waitFor(pred), } } diff --git a/sdk/package/lib/util/index.ts b/sdk/package/lib/util/index.ts index 5facab2f8..cdb59d470 100644 --- a/sdk/package/lib/util/index.ts +++ b/sdk/package/lib/util/index.ts @@ -1,6 +1,4 @@ export * from '../../../base/lib/util' -export { GetSslCertificate } from './GetSslCertificate' -export { GetServiceManifest, getServiceManifest } from './GetServiceManifest' export { Drop } from '../../../base/lib/util/Drop' export { Volume, Volumes } from './Volume'