diff --git a/sdk/base/lib/util/GetContainerIp.ts b/sdk/base/lib/util/GetContainerIp.ts index 6b0cc1d8d..dfec071ad 100644 --- a/sdk/base/lib/util/GetContainerIp.ts +++ b/sdk/base/lib/util/GetContainerIp.ts @@ -1,112 +1,18 @@ import { Effects } from '../Effects' import { PackageId } from '../osBindings' -import { AbortedError } from './AbortedError' -import { DropGenerator, DropPromise } from './Drop' +import { Watchable } from './Watchable' + +export class GetContainerIp extends Watchable { + protected readonly label = 'GetContainerIp' -export class GetContainerIp { constructor( - readonly effects: Effects, + effects: Effects, readonly opts: { packageId?: PackageId } = {}, - ) {} - - /** - * Returns the container IP. Reruns the context from which it has been called if the underlying value changes - */ - const() { - return this.effects.getContainerIp({ - ...this.opts, - callback: - this.effects.constRetry && - (() => this.effects.constRetry && this.effects.constRetry()), - }) - } - /** - * Returns the container IP. Does nothing if the value changes - */ - once() { - return this.effects.getContainerIp(this.opts) - } - - private async *watchGen(abort?: AbortSignal) { - const resolveCell = { resolve: () => {} } - this.effects.onLeaveContext(() => { - resolveCell.resolve() - }) - abort?.addEventListener('abort', () => resolveCell.resolve()) - while (this.effects.isInContext && !abort?.aborted) { - let callback: () => void = () => {} - const waitForNext = new Promise((resolve) => { - callback = resolve - resolveCell.resolve = resolve - }) - yield await this.effects.getContainerIp({ - ...this.opts, - callback: () => callback(), - }) - await waitForNext - } - return new Promise((_, rej) => rej(new AbortedError())) - } - - /** - * Watches the container IP. Returns an async iterator that yields whenever the value changes - */ - watch(abort?: AbortSignal): AsyncGenerator { - const ctrl = new AbortController() - abort?.addEventListener('abort', () => ctrl.abort()) - return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) - } - - /** - * Watches the container IP. Takes a custom callback function to run whenever the value changes - */ - onChange( - callback: ( - value: string, - error?: Error, - ) => { cancel: boolean } | Promise<{ cancel: boolean }>, ) { - ;(async () => { - const ctrl = new AbortController() - for await (const value of this.watch(ctrl.signal)) { - try { - const res = await callback(value) - if (res.cancel) { - ctrl.abort() - break - } - } catch (e) { - console.error( - 'callback function threw an error @ GetContainerIp.onChange', - e, - ) - } - } - })() - .catch((e) => callback('', e)) - .catch((e) => - console.error( - 'callback function threw an error @ GetContainerIp.onChange', - e, - ), - ) + super(effects) } - /** - * Watches the container IP. Returns when the predicate is true - */ - waitFor(pred: (value: string) => boolean): Promise { - const ctrl = new AbortController() - return DropPromise.of( - Promise.resolve().then(async () => { - for await (const next of this.watchGen(ctrl.signal)) { - if (pred(next)) { - return next - } - } - return '' - }), - () => ctrl.abort(), - ) + protected call(callback?: () => void) { + return this.effects.getContainerIp({ ...this.opts, callback }) } } diff --git a/sdk/base/lib/util/GetHostInfo.ts b/sdk/base/lib/util/GetHostInfo.ts index 81784412a..ef67f03fc 100644 --- a/sdk/base/lib/util/GetHostInfo.ts +++ b/sdk/base/lib/util/GetHostInfo.ts @@ -1,112 +1,18 @@ import { Effects } from '../Effects' import { Host, HostId, PackageId } from '../osBindings' -import { AbortedError } from './AbortedError' -import { DropGenerator, DropPromise } from './Drop' +import { Watchable } from './Watchable' + +export class GetHostInfo extends Watchable { + protected readonly label = 'GetHostInfo' -export class GetHostInfo { constructor( - readonly effects: Effects, + effects: Effects, readonly opts: { hostId: HostId; packageId?: PackageId }, - ) {} - - /** - * Returns host info. Reruns the context from which it has been called if the underlying value changes - */ - const() { - return this.effects.getHostInfo({ - ...this.opts, - callback: - this.effects.constRetry && - (() => this.effects.constRetry && this.effects.constRetry()), - }) - } - /** - * Returns host info. Does nothing if the value changes - */ - once() { - return this.effects.getHostInfo(this.opts) - } - - private async *watchGen(abort?: AbortSignal) { - const resolveCell = { resolve: () => {} } - this.effects.onLeaveContext(() => { - resolveCell.resolve() - }) - abort?.addEventListener('abort', () => resolveCell.resolve()) - while (this.effects.isInContext && !abort?.aborted) { - let callback: () => void = () => {} - const waitForNext = new Promise((resolve) => { - callback = resolve - resolveCell.resolve = resolve - }) - yield await this.effects.getHostInfo({ - ...this.opts, - callback: () => callback(), - }) - await waitForNext - } - return new Promise((_, rej) => rej(new AbortedError())) - } - - /** - * Watches host info. Returns an async iterator that yields whenever the value changes - */ - watch(abort?: AbortSignal): AsyncGenerator { - const ctrl = new AbortController() - abort?.addEventListener('abort', () => ctrl.abort()) - return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) - } - - /** - * Watches host info. Takes a custom callback function to run whenever the value changes - */ - onChange( - callback: ( - value: Host | null, - error?: Error, - ) => { cancel: boolean } | Promise<{ cancel: boolean }>, ) { - ;(async () => { - const ctrl = new AbortController() - for await (const value of this.watch(ctrl.signal)) { - try { - const res = await callback(value) - if (res.cancel) { - ctrl.abort() - break - } - } catch (e) { - console.error( - 'callback function threw an error @ GetHostInfo.onChange', - e, - ) - } - } - })() - .catch((e) => callback(null, e)) - .catch((e) => - console.error( - 'callback function threw an error @ GetHostInfo.onChange', - e, - ), - ) + super(effects) } - /** - * Watches host info. Returns when the predicate is true - */ - waitFor(pred: (value: Host | null) => boolean): Promise { - const ctrl = new AbortController() - return DropPromise.of( - Promise.resolve().then(async () => { - for await (const next of this.watchGen(ctrl.signal)) { - if (pred(next)) { - return next - } - } - return null - }), - () => ctrl.abort(), - ) + protected call(callback?: () => void) { + return this.effects.getHostInfo({ ...this.opts, callback }) } } diff --git a/sdk/base/lib/util/GetOutboundGateway.ts b/sdk/base/lib/util/GetOutboundGateway.ts index 460bb8b90..5a4cecb50 100644 --- a/sdk/base/lib/util/GetOutboundGateway.ts +++ b/sdk/base/lib/util/GetOutboundGateway.ts @@ -1,106 +1,14 @@ import { Effects } from '../Effects' -import { AbortedError } from './AbortedError' -import { DropGenerator, DropPromise } from './Drop' +import { Watchable } from './Watchable' -export class GetOutboundGateway { - constructor(readonly effects: Effects) {} +export class GetOutboundGateway extends Watchable { + protected readonly label = 'GetOutboundGateway' - /** - * Returns the effective outbound gateway. Reruns the context from which it has been called if the underlying value changes - */ - const() { - return this.effects.getOutboundGateway({ - callback: - this.effects.constRetry && - (() => this.effects.constRetry && this.effects.constRetry()), - }) - } - /** - * Returns the effective outbound gateway. Does nothing if the value changes - */ - once() { - return this.effects.getOutboundGateway({}) + constructor(effects: Effects) { + super(effects) } - private async *watchGen(abort?: AbortSignal) { - const resolveCell = { resolve: () => {} } - this.effects.onLeaveContext(() => { - resolveCell.resolve() - }) - abort?.addEventListener('abort', () => resolveCell.resolve()) - while (this.effects.isInContext && !abort?.aborted) { - let callback: () => void = () => {} - const waitForNext = new Promise((resolve) => { - callback = resolve - resolveCell.resolve = resolve - }) - yield await this.effects.getOutboundGateway({ - callback: () => callback(), - }) - await waitForNext - } - return new Promise((_, rej) => rej(new AbortedError())) - } - - /** - * Watches the effective outbound gateway. Returns an async iterator that yields whenever the value changes - */ - watch(abort?: AbortSignal): AsyncGenerator { - const ctrl = new AbortController() - abort?.addEventListener('abort', () => ctrl.abort()) - return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) - } - - /** - * Watches the effective outbound gateway. Takes a custom callback function to run whenever the value changes - */ - onChange( - callback: ( - value: string, - error?: Error, - ) => { cancel: boolean } | Promise<{ cancel: boolean }>, - ) { - ;(async () => { - const ctrl = new AbortController() - for await (const value of this.watch(ctrl.signal)) { - try { - const res = await callback(value) - if (res.cancel) { - ctrl.abort() - break - } - } catch (e) { - console.error( - 'callback function threw an error @ GetOutboundGateway.onChange', - e, - ) - } - } - })() - .catch((e) => callback('', e)) - .catch((e) => - console.error( - 'callback function threw an error @ GetOutboundGateway.onChange', - e, - ), - ) - } - - /** - * Watches the effective outbound gateway. Returns when the predicate is true - */ - waitFor(pred: (value: string) => boolean): Promise { - const ctrl = new AbortController() - return DropPromise.of( - Promise.resolve().then(async () => { - for await (const next of this.watchGen(ctrl.signal)) { - if (pred(next)) { - return next - } - } - return '' - }), - () => ctrl.abort(), - ) + protected call(callback?: () => void) { + return this.effects.getOutboundGateway({ callback }) } } diff --git a/sdk/base/lib/util/GetServiceManifest.ts b/sdk/base/lib/util/GetServiceManifest.ts index d7e13d69a..7a99e5aa0 100644 --- a/sdk/base/lib/util/GetServiceManifest.ts +++ b/sdk/base/lib/util/GetServiceManifest.ts @@ -1,112 +1,18 @@ import { Effects } from '../Effects' import { Manifest, PackageId } from '../osBindings' -import { AbortedError } from './AbortedError' -import { DropGenerator, DropPromise } from './Drop' +import { Watchable } from './Watchable' + +export class GetServiceManifest extends Watchable { + protected readonly label = 'GetServiceManifest' -export class GetServiceManifest { constructor( - readonly effects: Effects, + effects: Effects, readonly opts: { packageId: PackageId }, - ) {} - - /** - * Returns the service manifest. Reruns the context from which it has been called if the underlying value changes - */ - const() { - return this.effects.getServiceManifest({ - ...this.opts, - callback: - this.effects.constRetry && - (() => this.effects.constRetry && this.effects.constRetry()), - }) - } - /** - * Returns the service manifest. Does nothing if the value changes - */ - once() { - return this.effects.getServiceManifest(this.opts) - } - - private async *watchGen(abort?: AbortSignal) { - const resolveCell = { resolve: () => {} } - this.effects.onLeaveContext(() => { - resolveCell.resolve() - }) - abort?.addEventListener('abort', () => resolveCell.resolve()) - while (this.effects.isInContext && !abort?.aborted) { - let callback: () => void = () => {} - const waitForNext = new Promise((resolve) => { - callback = resolve - resolveCell.resolve = resolve - }) - yield await this.effects.getServiceManifest({ - ...this.opts, - callback: () => callback(), - }) - await waitForNext - } - return new Promise((_, rej) => rej(new AbortedError())) - } - - /** - * Watches the service manifest. Returns an async iterator that yields whenever the value changes - */ - watch(abort?: AbortSignal): AsyncGenerator { - const ctrl = new AbortController() - abort?.addEventListener('abort', () => ctrl.abort()) - return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) - } - - /** - * Watches the service manifest. Takes a custom callback function to run whenever the value changes - */ - onChange( - callback: ( - value: Manifest | null, - error?: Error, - ) => { cancel: boolean } | Promise<{ cancel: boolean }>, ) { - ;(async () => { - const ctrl = new AbortController() - for await (const value of this.watch(ctrl.signal)) { - try { - const res = await callback(value) - if (res.cancel) { - ctrl.abort() - break - } - } catch (e) { - console.error( - 'callback function threw an error @ GetServiceManifest.onChange', - e, - ) - } - } - })() - .catch((e) => callback(null, e)) - .catch((e) => - console.error( - 'callback function threw an error @ GetServiceManifest.onChange', - e, - ), - ) + super(effects) } - /** - * Watches the service manifest. Returns when the predicate is true - */ - waitFor(pred: (value: Manifest) => boolean): Promise { - const ctrl = new AbortController() - return DropPromise.of( - Promise.resolve().then(async () => { - for await (const next of this.watchGen(ctrl.signal)) { - if (pred(next)) { - return next - } - } - throw new Error('context left before predicate passed') - }), - () => ctrl.abort(), - ) + protected call(callback?: () => void) { + return this.effects.getServiceManifest({ ...this.opts, callback }) } } diff --git a/sdk/base/lib/util/GetSslCertificate.ts b/sdk/base/lib/util/GetSslCertificate.ts index 296c2695f..08d5b10c0 100644 --- a/sdk/base/lib/util/GetSslCertificate.ts +++ b/sdk/base/lib/util/GetSslCertificate.ts @@ -1,118 +1,20 @@ import { Effects } from '../Effects' -import { AbortedError } from './AbortedError' -import { DropGenerator, DropPromise } from './Drop' +import { Watchable } from './Watchable' + +export class GetSslCertificate extends Watchable<[string, string, string]> { + protected readonly label = 'GetSslCertificate' -export class GetSslCertificate { constructor( - readonly effects: Effects, + effects: Effects, readonly opts: { hostnames: string[] algorithm?: 'ecdsa' | 'ed25519' }, - ) {} - - /** - * Returns the SSL certificate. Reruns the context from which it has been called if the underlying value changes - */ - const() { - return this.effects.getSslCertificate({ - ...this.opts, - callback: - this.effects.constRetry && - (() => this.effects.constRetry && this.effects.constRetry()), - }) - } - /** - * Returns the SSL certificate. Does nothing if the value changes - */ - once() { - return this.effects.getSslCertificate(this.opts) - } - - private async *watchGen(abort?: AbortSignal) { - const resolveCell = { resolve: () => {} } - this.effects.onLeaveContext(() => { - resolveCell.resolve() - }) - abort?.addEventListener('abort', () => resolveCell.resolve()) - while (this.effects.isInContext && !abort?.aborted) { - let callback: () => void = () => {} - const waitForNext = new Promise((resolve) => { - callback = resolve - resolveCell.resolve = resolve - }) - yield await this.effects.getSslCertificate({ - ...this.opts, - callback: () => callback(), - }) - await waitForNext - } - return new Promise((_, rej) => rej(new AbortedError())) - } - - /** - * Watches the SSL certificate. Returns an async iterator that yields whenever the value changes - */ - watch( - abort?: AbortSignal, - ): AsyncGenerator<[string, string, string], never, unknown> { - const ctrl = new AbortController() - abort?.addEventListener('abort', () => ctrl.abort()) - return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) - } - - /** - * Watches the SSL certificate. Takes a custom callback function to run whenever the value changes - */ - onChange( - callback: ( - value: [string, string, string] | null, - error?: Error, - ) => { cancel: boolean } | Promise<{ cancel: boolean }>, ) { - ;(async () => { - const ctrl = new AbortController() - for await (const value of this.watch(ctrl.signal)) { - try { - const res = await callback(value) - if (res.cancel) { - ctrl.abort() - break - } - } catch (e) { - console.error( - 'callback function threw an error @ GetSslCertificate.onChange', - e, - ) - } - } - })() - .catch((e) => callback(null, e)) - .catch((e) => - console.error( - 'callback function threw an error @ GetSslCertificate.onChange', - e, - ), - ) + super(effects) } - /** - * Watches the SSL certificate. Returns when the predicate is true - */ - waitFor( - pred: (value: [string, string, string]) => boolean, - ): Promise<[string, string, string]> { - const ctrl = new AbortController() - return DropPromise.of( - Promise.resolve().then(async () => { - for await (const next of this.watchGen(ctrl.signal)) { - if (pred(next)) { - return next - } - } - throw new Error('context left before predicate passed') - }), - () => ctrl.abort(), - ) + protected call(callback?: () => void) { + return this.effects.getSslCertificate({ ...this.opts, callback }) } } diff --git a/sdk/base/lib/util/GetStatus.ts b/sdk/base/lib/util/GetStatus.ts index 8804cbadf..365217977 100644 --- a/sdk/base/lib/util/GetStatus.ts +++ b/sdk/base/lib/util/GetStatus.ts @@ -1,116 +1,18 @@ import { Effects } from '../Effects' import { PackageId, StatusInfo } from '../osBindings' -import { AbortedError } from './AbortedError' -import { DropGenerator, DropPromise } from './Drop' +import { Watchable } from './Watchable' + +export class GetStatus extends Watchable { + protected readonly label = 'GetStatus' -export class GetStatus { constructor( - readonly effects: Effects, + effects: Effects, readonly opts: { packageId?: PackageId } = {}, - ) {} - - /** - * Returns the service status. Reruns the context from which it has been called if the underlying value changes - */ - const() { - return this.effects.getStatus({ - ...this.opts, - callback: - this.effects.constRetry && - (() => this.effects.constRetry && this.effects.constRetry()), - }) - } - /** - * Returns the service status. Does nothing if the value changes - */ - once() { - return this.effects.getStatus(this.opts) - } - - private async *watchGen(abort?: AbortSignal) { - const resolveCell = { resolve: () => {} } - this.effects.onLeaveContext(() => { - resolveCell.resolve() - }) - abort?.addEventListener('abort', () => resolveCell.resolve()) - while (this.effects.isInContext && !abort?.aborted) { - let callback: () => void = () => {} - const waitForNext = new Promise((resolve) => { - callback = resolve - resolveCell.resolve = resolve - }) - yield await this.effects.getStatus({ - ...this.opts, - callback: () => callback(), - }) - await waitForNext - } - return new Promise((_, rej) => rej(new AbortedError())) - } - - /** - * Watches the service status. Returns an async iterator that yields whenever the value changes - */ - watch( - abort?: AbortSignal, - ): AsyncGenerator { - const ctrl = new AbortController() - abort?.addEventListener('abort', () => ctrl.abort()) - return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) - } - - /** - * Watches the service status. Takes a custom callback function to run whenever the value changes - */ - onChange( - callback: ( - value: StatusInfo | null, - error?: Error, - ) => { cancel: boolean } | Promise<{ cancel: boolean }>, ) { - ;(async () => { - const ctrl = new AbortController() - for await (const value of this.watch(ctrl.signal)) { - try { - const res = await callback(value) - if (res.cancel) { - ctrl.abort() - break - } - } catch (e) { - console.error( - 'callback function threw an error @ GetStatus.onChange', - e, - ) - } - } - })() - .catch((e) => callback(null, e)) - .catch((e) => - console.error( - 'callback function threw an error @ GetStatus.onChange', - e, - ), - ) + super(effects) } - /** - * Watches the service status. Returns when the predicate is true - */ - waitFor( - pred: (value: StatusInfo | null) => boolean, - ): Promise { - const ctrl = new AbortController() - return DropPromise.of( - Promise.resolve().then(async () => { - for await (const next of this.watchGen(ctrl.signal)) { - if (pred(next)) { - return next - } - } - return null - }), - () => ctrl.abort(), - ) + protected call(callback?: () => void) { + return this.effects.getStatus({ ...this.opts, callback }) } } diff --git a/sdk/base/lib/util/GetSystemSmtp.ts b/sdk/base/lib/util/GetSystemSmtp.ts index 03cedba6f..b45263549 100644 --- a/sdk/base/lib/util/GetSystemSmtp.ts +++ b/sdk/base/lib/util/GetSystemSmtp.ts @@ -1,111 +1,15 @@ import { Effects } from '../Effects' import * as T from '../types' -import { AbortedError } from './AbortedError' -import { DropGenerator, DropPromise } from './Drop' +import { Watchable } from './Watchable' -export class GetSystemSmtp { - constructor(readonly effects: Effects) {} +export class GetSystemSmtp extends Watchable { + protected readonly label = 'GetSystemSmtp' - /** - * Returns the system SMTP credentials. Reruns the context from which it has been called if the underlying value changes - */ - const() { - return this.effects.getSystemSmtp({ - callback: - this.effects.constRetry && - (() => this.effects.constRetry && this.effects.constRetry()), - }) - } - /** - * Returns the system SMTP credentials. Does nothing if the credentials change - */ - once() { - return this.effects.getSystemSmtp({}) + constructor(effects: Effects) { + super(effects) } - private async *watchGen(abort?: AbortSignal) { - const resolveCell = { resolve: () => {} } - this.effects.onLeaveContext(() => { - resolveCell.resolve() - }) - abort?.addEventListener('abort', () => resolveCell.resolve()) - while (this.effects.isInContext && !abort?.aborted) { - let callback: () => void = () => {} - const waitForNext = new Promise((resolve) => { - callback = resolve - resolveCell.resolve = resolve - }) - yield await this.effects.getSystemSmtp({ - callback: () => callback(), - }) - await waitForNext - } - return new Promise((_, rej) => rej(new AbortedError())) - } - - /** - * Watches the system SMTP credentials. Returns an async iterator that yields whenever the value changes - */ - watch( - abort?: AbortSignal, - ): AsyncGenerator { - const ctrl = new AbortController() - abort?.addEventListener('abort', () => ctrl.abort()) - return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) - } - - /** - * Watches the system SMTP credentials. Takes a custom callback function to run whenever the credentials change - */ - onChange( - callback: ( - value: T.SmtpValue | null, - error?: Error, - ) => { cancel: boolean } | Promise<{ cancel: boolean }>, - ) { - ;(async () => { - const ctrl = new AbortController() - for await (const value of this.watch(ctrl.signal)) { - try { - const res = await callback(value) - if (res.cancel) { - ctrl.abort() - break - } - } catch (e) { - console.error( - 'callback function threw an error @ GetSystemSmtp.onChange', - e, - ) - } - } - })() - .catch((e) => callback(null, e)) - .catch((e) => - console.error( - 'callback function threw an error @ GetSystemSmtp.onChange', - e, - ), - ) - } - - /** - * Watches the system SMTP credentials. Returns when the predicate is true - */ - waitFor( - pred: (value: T.SmtpValue | null) => boolean, - ): Promise { - const ctrl = new AbortController() - return DropPromise.of( - Promise.resolve().then(async () => { - for await (const next of this.watchGen(ctrl.signal)) { - if (pred(next)) { - return next - } - } - return null - }), - () => ctrl.abort(), - ) + protected call(callback?: () => void) { + return this.effects.getSystemSmtp({ callback }) } } diff --git a/sdk/base/lib/util/Watchable.ts b/sdk/base/lib/util/Watchable.ts new file mode 100644 index 000000000..e6e5fc427 --- /dev/null +++ b/sdk/base/lib/util/Watchable.ts @@ -0,0 +1,107 @@ +import { Effects } from '../Effects' +import { AbortedError } from './AbortedError' +import { DropGenerator, DropPromise } from './Drop' + +export abstract class Watchable { + constructor(readonly effects: Effects) {} + + protected abstract call(callback?: () => void): Promise + protected abstract readonly label: string + + /** + * Returns the value. Reruns the context from which it has been called if the underlying value changes + */ + const(): Promise { + return this.call( + this.effects.constRetry && + (() => this.effects.constRetry && this.effects.constRetry()), + ) + } + + /** + * Returns the value. Does nothing if the value changes + */ + once(): Promise { + return this.call() + } + + private async *watchGen(abort?: AbortSignal) { + const resolveCell = { resolve: () => {} } + this.effects.onLeaveContext(() => { + resolveCell.resolve() + }) + abort?.addEventListener('abort', () => resolveCell.resolve()) + while (this.effects.isInContext && !abort?.aborted) { + let callback: () => void = () => {} + const waitForNext = new Promise((resolve) => { + callback = resolve + resolveCell.resolve = resolve + }) + yield await this.call(() => callback()) + await waitForNext + } + return new Promise((_, rej) => rej(new AbortedError())) + } + + /** + * Watches the value. Returns an async iterator that yields whenever the value changes + */ + watch(abort?: AbortSignal): AsyncGenerator { + const ctrl = new AbortController() + abort?.addEventListener('abort', () => ctrl.abort()) + return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) + } + + /** + * Watches the value. Takes a custom callback function to run whenever the value changes + */ + onChange( + callback: ( + value: T | undefined, + error?: Error, + ) => { cancel: boolean } | Promise<{ cancel: boolean }>, + ) { + ;(async () => { + const ctrl = new AbortController() + for await (const value of this.watch(ctrl.signal)) { + try { + const res = await callback(value) + if (res.cancel) { + ctrl.abort() + break + } + } catch (e) { + console.error( + `callback function threw an error @ ${this.label}.onChange`, + e, + ) + } + } + })() + .catch((e) => callback(undefined, e)) + .catch((e) => + console.error( + `callback function threw an error @ ${this.label}.onChange`, + e, + ), + ) + } + + /** + * Watches the value. Returns when the predicate is true + */ + waitFor(pred: (value: T) => boolean): Promise { + const ctrl = new AbortController() + return DropPromise.of( + Promise.resolve().then(async () => { + for await (const next of this.watchGen(ctrl.signal)) { + if (pred(next)) { + return next + } + } + throw new AbortedError() + }), + () => ctrl.abort(), + ) + } +} diff --git a/sdk/base/lib/util/index.ts b/sdk/base/lib/util/index.ts index 7f0f9306e..3c4606f14 100644 --- a/sdk/base/lib/util/index.ts +++ b/sdk/base/lib/util/index.ts @@ -15,6 +15,7 @@ export { once } from './once' export { asError } from './asError' export * as Patterns from './patterns' export * from './typeHelpers' +export { Watchable } from './Watchable' export { GetContainerIp } from './GetContainerIp' export { GetHostInfo } from './GetHostInfo' export { GetOutboundGateway } from './GetOutboundGateway'