From c52fcf50873862f9aa6fbdf9e9f4b0dcf9646666 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Mon, 9 Mar 2026 15:24:56 -0600 Subject: [PATCH] feat: add DbWatchedCallbacks abstraction, TypedDbWatch-based callbacks, and SDK watchable wrappers - Extract DbWatchedCallbacks abstraction in callbacks.rs using SyncMutex for the repeated patchdb subscribe-wait-fire-remove callback pattern - Move get_host_info and get_status callbacks to use TypedDbWatch instead of raw db.subscribe, eliminating race conditions between reading and watching - Make getStatus return Option to handle uninstalled packages - Add getStatus .const/.once/.watch/.onChange wrapper in container-runtime for legacy SystemForEmbassy adapter - Add SDK watchable wrapper classes for all callback-enabled effects: GetStatus, GetServiceManifest, GetHostInfo, GetContainerIp, GetSslCertificate --- .../Systems/SystemForEmbassy/index.ts | 75 ++++++++ core/src/service/effects/callbacks.rs | 177 ++++++++++-------- core/src/service/effects/control.rs | 25 ++- core/src/service/effects/net/host.rs | 24 ++- core/src/service/service_actor.rs | 8 - sdk/base/lib/Effects.ts | 2 +- sdk/base/lib/util/GetContainerIp.ts | 112 +++++++++++ sdk/base/lib/util/GetHostInfo.ts | 112 +++++++++++ sdk/base/lib/util/GetServiceManifest.ts | 112 +++++++++++ sdk/base/lib/util/GetSslCertificate.ts | 118 ++++++++++++ sdk/base/lib/util/GetStatus.ts | 116 ++++++++++++ sdk/base/lib/util/index.ts | 5 + 12 files changed, 784 insertions(+), 102 deletions(-) create mode 100644 sdk/base/lib/util/GetContainerIp.ts create mode 100644 sdk/base/lib/util/GetHostInfo.ts create mode 100644 sdk/base/lib/util/GetServiceManifest.ts create mode 100644 sdk/base/lib/util/GetSslCertificate.ts create mode 100644 sdk/base/lib/util/GetStatus.ts diff --git a/container-runtime/src/Adapters/Systems/SystemForEmbassy/index.ts b/container-runtime/src/Adapters/Systems/SystemForEmbassy/index.ts index 15a97178d..65a6c56e8 100644 --- a/container-runtime/src/Adapters/Systems/SystemForEmbassy/index.ts +++ b/container-runtime/src/Adapters/Systems/SystemForEmbassy/index.ts @@ -42,6 +42,74 @@ function todo(): never { throw new Error("Not implemented") } +function getStatus( + effects: Effects, + options: Omit[0], "callback"> = {}, +) { + async function* watch(abort?: AbortSignal) { + const resolveCell = { resolve: () => {} } + effects.onLeaveContext(() => { + resolveCell.resolve() + }) + abort?.addEventListener("abort", () => resolveCell.resolve()) + while (effects.isInContext && !abort?.aborted) { + let callback: () => void = () => {} + const waitForNext = new Promise((resolve) => { + callback = resolve + resolveCell.resolve = resolve + }) + yield await effects.getStatus({ ...options, callback }) + await waitForNext + } + } + return { + const: () => + effects.getStatus({ + ...options, + callback: + effects.constRetry && + (() => effects.constRetry && effects.constRetry()), + }), + once: () => effects.getStatus(options), + watch: (abort?: AbortSignal) => { + const ctrl = new AbortController() + abort?.addEventListener("abort", () => ctrl.abort()) + return watch(ctrl.signal) + }, + onChange: ( + callback: ( + value: T.StatusInfo | null, + error?: Error, + ) => { cancel: boolean } | Promise<{ cancel: boolean }>, + ) => { + ;(async () => { + const ctrl = new AbortController() + for await (const value of watch(ctrl.signal)) { + try { + const res = await callback(value) + if (res.cancel) { + ctrl.abort() + break + } + } catch (e) { + console.error( + "callback function threw an error @ getStatus.onChange", + e, + ) + } + } + })() + .catch((e) => callback(null, e as Error)) + .catch((e) => + console.error( + "callback function threw an error @ getStatus.onChange", + e, + ), + ) + }, + } +} + /** * Local type for procedure values from the manifest. * The manifest's zod schemas use ZodTypeAny casts that produce `unknown` in zod v4. @@ -1046,6 +1114,8 @@ export class SystemForEmbassy implements System { timeoutMs: number | null, ): Promise { // TODO: docker + const status = await getStatus(effects, { packageId: id }).const() + if (!status) return await effects.mount({ location: `/media/embassy/${id}`, target: { @@ -1204,6 +1274,11 @@ async function updateConfig( if (specValue.target === "config") { const jp = require("jsonpath") const depId = specValue["package-id"] + const depStatus = await getStatus(effects, { packageId: depId }).const() + if (!depStatus) { + mutConfigValue[key] = null + continue + } await effects.mount({ location: `/media/embassy/${depId}`, target: { diff --git a/core/src/service/effects/callbacks.rs b/core/src/service/effects/callbacks.rs index d30665c96..b50c526f8 100644 --- a/core/src/service/effects/callbacks.rs +++ b/core/src/service/effects/callbacks.rs @@ -1,6 +1,6 @@ use std::cmp::min; use std::collections::{BTreeMap, BTreeSet}; -use std::sync::{Arc, Mutex, Weak}; +use std::sync::{Arc, Weak}; use std::time::{Duration, SystemTime}; use clap::Parser; @@ -8,13 +8,12 @@ use futures::future::join_all; use imbl::{OrdMap, Vector, vector}; use imbl_value::InternedString; use patch_db::TypedDbWatch; -use patch_db::json_ptr::JsonPointer; use serde::{Deserialize, Serialize}; use tracing::warn; use ts_rs::TS; -use crate::db::model::Database; use crate::db::model::public::NetworkInterfaceInfo; +use crate::net::host::Host; use crate::net::ssl::FullchainCertData; use crate::prelude::*; use crate::service::effects::context::EffectContext; @@ -23,23 +22,104 @@ use crate::service::rpc::{CallbackHandle, CallbackId}; use crate::service::{Service, ServiceActorSeed}; use crate::util::collections::EqMap; use crate::util::future::NonDetachingJoinHandle; +use crate::util::sync::SyncMutex; +use crate::status::StatusInfo; use crate::{GatewayId, HostId, PackageId, ServiceInterfaceId}; -#[derive(Default)] -pub struct ServiceCallbacks(Mutex); +/// Abstraction for callbacks that are triggered by patchdb subscriptions. +/// +/// Handles the subscribe-wait-fire-remove pattern: when a callback is first +/// registered for a key, a patchdb subscription is spawned. When the subscription +/// fires, all handlers are consumed and invoked, then the subscription stops. +/// A new subscription is created if a handler is registered again. +pub struct DbWatchedCallbacks { + label: &'static str, + inner: SyncMutex, Vec)>>, +} + +impl DbWatchedCallbacks { + pub fn new(label: &'static str) -> Self { + Self { + label, + inner: SyncMutex::new(BTreeMap::new()), + } + } + + pub fn add( + self: &Arc, + key: K, + watch: TypedDbWatch, + handler: CallbackHandler, + ) { + self.inner.mutate(|map| { + map.entry(key.clone()) + .or_insert_with(|| { + let this = Arc::clone(self); + let k = key; + let label = self.label; + ( + tokio::spawn(async move { + let mut watch = watch.untyped(); + if watch.changed().await.is_ok() { + if let Some(cbs) = this.inner.mutate(|map| { + map.remove(&k) + .map(|(_, handlers)| CallbackHandlers(handlers)) + .filter(|cb| !cb.0.is_empty()) + }) { + let value = watch + .peek_and_mark_seen() + .unwrap_or_default(); + if let Err(e) = cbs.call(vector![value]).await { + tracing::error!("Error in {label} callback: {e}"); + tracing::debug!("{e:?}"); + } + } + } + }) + .into(), + Vec::new(), + ) + }) + .1 + .push(handler); + }) + } + + pub fn gc(&self) { + self.inner.mutate(|map| { + map.retain(|_, (_, v)| { + v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0); + !v.is_empty() + }); + }) + } +} + +pub struct ServiceCallbacks { + inner: SyncMutex, + get_host_info: Arc>, + get_status: Arc>, +} + +impl Default for ServiceCallbacks { + fn default() -> Self { + Self { + inner: SyncMutex::new(ServiceCallbackMap::default()), + get_host_info: Arc::new(DbWatchedCallbacks::new("host info")), + get_status: Arc::new(DbWatchedCallbacks::new("get_status")), + } + } +} #[derive(Default)] struct ServiceCallbackMap { get_service_interface: BTreeMap<(PackageId, ServiceInterfaceId), Vec>, list_service_interfaces: BTreeMap>, get_system_smtp: Vec, - get_host_info: - BTreeMap<(PackageId, HostId), (NonDetachingJoinHandle<()>, Vec)>, get_ssl_certificate: EqMap< (BTreeSet, FullchainCertData, Algorithm), (NonDetachingJoinHandle<()>, Vec), >, - get_status: BTreeMap>, get_container_ip: BTreeMap>, get_service_manifest: BTreeMap>, get_outbound_gateway: BTreeMap, Vec)>, @@ -47,8 +127,7 @@ struct ServiceCallbackMap { impl ServiceCallbacks { fn mutate(&self, f: impl FnOnce(&mut ServiceCallbackMap) -> T) -> T { - let mut this = self.0.lock().unwrap(); - f(&mut *this) + self.inner.mutate(f) } pub fn gc(&self) { @@ -63,18 +142,10 @@ impl ServiceCallbacks { }); this.get_system_smtp .retain(|h| h.handle.is_active() && h.seed.strong_count() > 0); - this.get_host_info.retain(|_, (_, v)| { - v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0); - !v.is_empty() - }); this.get_ssl_certificate.retain(|_, (_, v)| { v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0); !v.is_empty() }); - this.get_status.retain(|_, v| { - v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0); - !v.is_empty() - }); this.get_service_manifest.retain(|_, v| { v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0); !v.is_empty() @@ -83,7 +154,9 @@ impl ServiceCallbacks { v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0); !v.is_empty() }); - }) + }); + self.get_host_info.gc(); + self.get_status.gc(); } pub(super) fn add_get_service_interface( @@ -151,51 +224,14 @@ impl ServiceCallbacks { } pub(super) fn add_get_host_info( - self: &Arc, - db: &TypedPatchDb, + &self, package_id: PackageId, host_id: HostId, + watch: TypedDbWatch, handler: CallbackHandler, ) { - self.mutate(|this| { - this.get_host_info - .entry((package_id.clone(), host_id.clone())) - .or_insert_with(|| { - let ptr: JsonPointer = - format!("/public/packageData/{}/hosts/{}", package_id, host_id) - .parse() - .expect("valid json pointer"); - let db = db.clone(); - let callbacks = Arc::clone(self); - let key = (package_id, host_id); - ( - tokio::spawn(async move { - let mut sub = db.subscribe(ptr).await; - while sub.recv().await.is_some() { - if let Some(cbs) = callbacks.mutate(|this| { - this.get_host_info - .remove(&key) - .map(|(_, handlers)| CallbackHandlers(handlers)) - .filter(|cb| !cb.0.is_empty()) - }) { - if let Err(e) = cbs.call(vector![]).await { - tracing::error!("Error in host info callback: {e}"); - tracing::debug!("{e:?}"); - } - } - // entry was removed when we consumed handlers, - // so stop watching — a new subscription will be - // created if the service re-registers - break; - } - }) - .into(), - Vec::new(), - ) - }) - .1 - .push(handler); - }) + self.get_host_info + .add((package_id, host_id), watch, handler); } pub(super) fn add_get_ssl_certificate( @@ -256,19 +292,14 @@ impl ServiceCallbacks { .push(handler); }) } - pub(super) fn add_get_status(&self, package_id: PackageId, handler: CallbackHandler) { - self.mutate(|this| this.get_status.entry(package_id).or_default().push(handler)) - } - #[must_use] - pub fn get_status(&self, package_id: &PackageId) -> Option { - self.mutate(|this| { - if let Some(watched) = this.get_status.remove(package_id) { - Some(CallbackHandlers(watched)) - } else { - None - } - .filter(|cb| !cb.0.is_empty()) - }) + + pub(super) fn add_get_status( + &self, + package_id: PackageId, + watch: TypedDbWatch, + handler: CallbackHandler, + ) { + self.get_status.add(package_id, watch, handler); } pub(super) fn add_get_container_ip(&self, package_id: PackageId, handler: CallbackHandler) { diff --git a/core/src/service/effects/control.rs b/core/src/service/effects/control.rs index 88931812f..edaf998e8 100644 --- a/core/src/service/effects/control.rs +++ b/core/src/service/effects/control.rs @@ -80,27 +80,32 @@ pub async fn get_status( package_id, callback, }: GetStatusParams, -) -> Result { +) -> Result, Error> { let context = context.deref()?; let id = package_id.unwrap_or_else(|| context.seed.id.clone()); - let db = context.seed.ctx.db.peek().await; + + let ptr = format!("/public/packageData/{}/statusInfo", id) + .parse() + .expect("valid json pointer"); + let mut watch = context + .seed + .ctx + .db + .watch(ptr) + .await + .typed::(); + + let status = watch.peek_and_mark_seen()?.de().ok(); if let Some(callback) = callback { let callback = callback.register(&context.seed.persistent_container); context.seed.ctx.callbacks.add_get_status( id.clone(), + watch, super::callbacks::CallbackHandler::new(&context, callback), ); } - let status = db - .as_public() - .as_package_data() - .as_idx(&id) - .or_not_found(&id)? - .as_status_info() - .de()?; - Ok(status) } diff --git a/core/src/service/effects/net/host.rs b/core/src/service/effects/net/host.rs index a20fcf189..193826aac 100644 --- a/core/src/service/effects/net/host.rs +++ b/core/src/service/effects/net/host.rs @@ -23,26 +23,30 @@ pub async fn get_host_info( }: GetHostInfoParams, ) -> Result, Error> { let context = context.deref()?; - let db = context.seed.ctx.db.peek().await; let package_id = package_id.unwrap_or_else(|| context.seed.id.clone()); + let ptr = format!("/public/packageData/{}/hosts/{}", package_id, host_id) + .parse() + .expect("valid json pointer"); + let mut watch = context + .seed + .ctx + .db + .watch(ptr) + .await + .typed::(); + + let res = watch.peek_and_mark_seen()?.de().ok(); + if let Some(callback) = callback { let callback = callback.register(&context.seed.persistent_container); context.seed.ctx.callbacks.add_get_host_info( - &context.seed.ctx.db, package_id.clone(), host_id.clone(), + watch, CallbackHandler::new(&context, callback), ); } - let res = db - .as_public() - .as_package_data() - .as_idx(&package_id) - .and_then(|m| m.as_hosts().as_idx(&host_id)) - .map(|m| m.de()) - .transpose()?; - Ok(res) } diff --git a/core/src/service/service_actor.rs b/core/src/service/service_actor.rs index 4fec11a08..ed8feafdf 100644 --- a/core/src/service/service_actor.rs +++ b/core/src/service/service_actor.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use std::time::Duration; -use imbl::vector; use patch_db::TypedDbWatch; use super::ServiceActorSeed; @@ -99,16 +98,9 @@ async fn service_actor_loop<'a>( seed: &'a Arc, transition: &mut Option>, ) -> Result<(), Error> { - let id = &seed.id; let status_model = watch.peek_and_mark_seen()?; let status = status_model.de()?; - if let Some(callbacks) = seed.ctx.callbacks.get_status(id) { - callbacks - .call(vector![patch_db::ModelExt::into_value(status_model)]) - .await?; - } - match status { StatusInfo { desired: DesiredStatus::Running | DesiredStatus::Restarting, diff --git a/sdk/base/lib/Effects.ts b/sdk/base/lib/Effects.ts index d3d0b8923..554390654 100644 --- a/sdk/base/lib/Effects.ts +++ b/sdk/base/lib/Effects.ts @@ -69,7 +69,7 @@ export type Effects = { getStatus(options: { packageId?: PackageId callback?: () => void - }): Promise + }): Promise /** DEPRECATED: indicate to the host os what runstate the service is in */ setMainStatus(options: SetMainStatus): Promise diff --git a/sdk/base/lib/util/GetContainerIp.ts b/sdk/base/lib/util/GetContainerIp.ts new file mode 100644 index 000000000..6b0cc1d8d --- /dev/null +++ b/sdk/base/lib/util/GetContainerIp.ts @@ -0,0 +1,112 @@ +import { Effects } from '../Effects' +import { PackageId } from '../osBindings' +import { AbortedError } from './AbortedError' +import { DropGenerator, DropPromise } from './Drop' + +export class GetContainerIp { + constructor( + readonly effects: Effects, + readonly opts: { packageId?: PackageId } = {}, + ) {} + + /** + * Returns the container IP. Reruns the context from which it has been called if the underlying value changes + */ + const() { + return this.effects.getContainerIp({ + ...this.opts, + callback: + this.effects.constRetry && + (() => this.effects.constRetry && this.effects.constRetry()), + }) + } + /** + * Returns the container IP. Does nothing if the value changes + */ + once() { + return this.effects.getContainerIp(this.opts) + } + + private async *watchGen(abort?: AbortSignal) { + const resolveCell = { resolve: () => {} } + this.effects.onLeaveContext(() => { + resolveCell.resolve() + }) + abort?.addEventListener('abort', () => resolveCell.resolve()) + while (this.effects.isInContext && !abort?.aborted) { + let callback: () => void = () => {} + const waitForNext = new Promise((resolve) => { + callback = resolve + resolveCell.resolve = resolve + }) + yield await this.effects.getContainerIp({ + ...this.opts, + callback: () => callback(), + }) + await waitForNext + } + return new Promise((_, rej) => rej(new AbortedError())) + } + + /** + * Watches the container IP. Returns an async iterator that yields whenever the value changes + */ + watch(abort?: AbortSignal): AsyncGenerator { + const ctrl = new AbortController() + abort?.addEventListener('abort', () => ctrl.abort()) + return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) + } + + /** + * Watches the container IP. Takes a custom callback function to run whenever the value changes + */ + onChange( + callback: ( + value: string, + error?: Error, + ) => { cancel: boolean } | Promise<{ cancel: boolean }>, + ) { + ;(async () => { + const ctrl = new AbortController() + for await (const value of this.watch(ctrl.signal)) { + try { + const res = await callback(value) + if (res.cancel) { + ctrl.abort() + break + } + } catch (e) { + console.error( + 'callback function threw an error @ GetContainerIp.onChange', + e, + ) + } + } + })() + .catch((e) => callback('', e)) + .catch((e) => + console.error( + 'callback function threw an error @ GetContainerIp.onChange', + e, + ), + ) + } + + /** + * Watches the container IP. Returns when the predicate is true + */ + waitFor(pred: (value: string) => boolean): Promise { + const ctrl = new AbortController() + return DropPromise.of( + Promise.resolve().then(async () => { + for await (const next of this.watchGen(ctrl.signal)) { + if (pred(next)) { + return next + } + } + return '' + }), + () => ctrl.abort(), + ) + } +} diff --git a/sdk/base/lib/util/GetHostInfo.ts b/sdk/base/lib/util/GetHostInfo.ts new file mode 100644 index 000000000..81784412a --- /dev/null +++ b/sdk/base/lib/util/GetHostInfo.ts @@ -0,0 +1,112 @@ +import { Effects } from '../Effects' +import { Host, HostId, PackageId } from '../osBindings' +import { AbortedError } from './AbortedError' +import { DropGenerator, DropPromise } from './Drop' + +export class GetHostInfo { + constructor( + readonly effects: Effects, + readonly opts: { hostId: HostId; packageId?: PackageId }, + ) {} + + /** + * Returns host info. Reruns the context from which it has been called if the underlying value changes + */ + const() { + return this.effects.getHostInfo({ + ...this.opts, + callback: + this.effects.constRetry && + (() => this.effects.constRetry && this.effects.constRetry()), + }) + } + /** + * Returns host info. Does nothing if the value changes + */ + once() { + return this.effects.getHostInfo(this.opts) + } + + private async *watchGen(abort?: AbortSignal) { + const resolveCell = { resolve: () => {} } + this.effects.onLeaveContext(() => { + resolveCell.resolve() + }) + abort?.addEventListener('abort', () => resolveCell.resolve()) + while (this.effects.isInContext && !abort?.aborted) { + let callback: () => void = () => {} + const waitForNext = new Promise((resolve) => { + callback = resolve + resolveCell.resolve = resolve + }) + yield await this.effects.getHostInfo({ + ...this.opts, + callback: () => callback(), + }) + await waitForNext + } + return new Promise((_, rej) => rej(new AbortedError())) + } + + /** + * Watches host info. Returns an async iterator that yields whenever the value changes + */ + watch(abort?: AbortSignal): AsyncGenerator { + const ctrl = new AbortController() + abort?.addEventListener('abort', () => ctrl.abort()) + return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) + } + + /** + * Watches host info. Takes a custom callback function to run whenever the value changes + */ + onChange( + callback: ( + value: Host | null, + error?: Error, + ) => { cancel: boolean } | Promise<{ cancel: boolean }>, + ) { + ;(async () => { + const ctrl = new AbortController() + for await (const value of this.watch(ctrl.signal)) { + try { + const res = await callback(value) + if (res.cancel) { + ctrl.abort() + break + } + } catch (e) { + console.error( + 'callback function threw an error @ GetHostInfo.onChange', + e, + ) + } + } + })() + .catch((e) => callback(null, e)) + .catch((e) => + console.error( + 'callback function threw an error @ GetHostInfo.onChange', + e, + ), + ) + } + + /** + * Watches host info. Returns when the predicate is true + */ + waitFor(pred: (value: Host | null) => boolean): Promise { + const ctrl = new AbortController() + return DropPromise.of( + Promise.resolve().then(async () => { + for await (const next of this.watchGen(ctrl.signal)) { + if (pred(next)) { + return next + } + } + return null + }), + () => ctrl.abort(), + ) + } +} diff --git a/sdk/base/lib/util/GetServiceManifest.ts b/sdk/base/lib/util/GetServiceManifest.ts new file mode 100644 index 000000000..d7e13d69a --- /dev/null +++ b/sdk/base/lib/util/GetServiceManifest.ts @@ -0,0 +1,112 @@ +import { Effects } from '../Effects' +import { Manifest, PackageId } from '../osBindings' +import { AbortedError } from './AbortedError' +import { DropGenerator, DropPromise } from './Drop' + +export class GetServiceManifest { + constructor( + readonly effects: Effects, + readonly opts: { packageId: PackageId }, + ) {} + + /** + * Returns the service manifest. Reruns the context from which it has been called if the underlying value changes + */ + const() { + return this.effects.getServiceManifest({ + ...this.opts, + callback: + this.effects.constRetry && + (() => this.effects.constRetry && this.effects.constRetry()), + }) + } + /** + * Returns the service manifest. Does nothing if the value changes + */ + once() { + return this.effects.getServiceManifest(this.opts) + } + + private async *watchGen(abort?: AbortSignal) { + const resolveCell = { resolve: () => {} } + this.effects.onLeaveContext(() => { + resolveCell.resolve() + }) + abort?.addEventListener('abort', () => resolveCell.resolve()) + while (this.effects.isInContext && !abort?.aborted) { + let callback: () => void = () => {} + const waitForNext = new Promise((resolve) => { + callback = resolve + resolveCell.resolve = resolve + }) + yield await this.effects.getServiceManifest({ + ...this.opts, + callback: () => callback(), + }) + await waitForNext + } + return new Promise((_, rej) => rej(new AbortedError())) + } + + /** + * Watches the service manifest. Returns an async iterator that yields whenever the value changes + */ + watch(abort?: AbortSignal): AsyncGenerator { + const ctrl = new AbortController() + abort?.addEventListener('abort', () => ctrl.abort()) + return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) + } + + /** + * Watches the service manifest. Takes a custom callback function to run whenever the value changes + */ + onChange( + callback: ( + value: Manifest | null, + error?: Error, + ) => { cancel: boolean } | Promise<{ cancel: boolean }>, + ) { + ;(async () => { + const ctrl = new AbortController() + for await (const value of this.watch(ctrl.signal)) { + try { + const res = await callback(value) + if (res.cancel) { + ctrl.abort() + break + } + } catch (e) { + console.error( + 'callback function threw an error @ GetServiceManifest.onChange', + e, + ) + } + } + })() + .catch((e) => callback(null, e)) + .catch((e) => + console.error( + 'callback function threw an error @ GetServiceManifest.onChange', + e, + ), + ) + } + + /** + * Watches the service manifest. Returns when the predicate is true + */ + waitFor(pred: (value: Manifest) => boolean): Promise { + const ctrl = new AbortController() + return DropPromise.of( + Promise.resolve().then(async () => { + for await (const next of this.watchGen(ctrl.signal)) { + if (pred(next)) { + return next + } + } + throw new Error('context left before predicate passed') + }), + () => ctrl.abort(), + ) + } +} diff --git a/sdk/base/lib/util/GetSslCertificate.ts b/sdk/base/lib/util/GetSslCertificate.ts new file mode 100644 index 000000000..296c2695f --- /dev/null +++ b/sdk/base/lib/util/GetSslCertificate.ts @@ -0,0 +1,118 @@ +import { Effects } from '../Effects' +import { AbortedError } from './AbortedError' +import { DropGenerator, DropPromise } from './Drop' + +export class GetSslCertificate { + constructor( + readonly effects: Effects, + readonly opts: { + hostnames: string[] + algorithm?: 'ecdsa' | 'ed25519' + }, + ) {} + + /** + * Returns the SSL certificate. Reruns the context from which it has been called if the underlying value changes + */ + const() { + return this.effects.getSslCertificate({ + ...this.opts, + callback: + this.effects.constRetry && + (() => this.effects.constRetry && this.effects.constRetry()), + }) + } + /** + * Returns the SSL certificate. Does nothing if the value changes + */ + once() { + return this.effects.getSslCertificate(this.opts) + } + + private async *watchGen(abort?: AbortSignal) { + const resolveCell = { resolve: () => {} } + this.effects.onLeaveContext(() => { + resolveCell.resolve() + }) + abort?.addEventListener('abort', () => resolveCell.resolve()) + while (this.effects.isInContext && !abort?.aborted) { + let callback: () => void = () => {} + const waitForNext = new Promise((resolve) => { + callback = resolve + resolveCell.resolve = resolve + }) + yield await this.effects.getSslCertificate({ + ...this.opts, + callback: () => callback(), + }) + await waitForNext + } + return new Promise((_, rej) => rej(new AbortedError())) + } + + /** + * Watches the SSL certificate. Returns an async iterator that yields whenever the value changes + */ + watch( + abort?: AbortSignal, + ): AsyncGenerator<[string, string, string], never, unknown> { + const ctrl = new AbortController() + abort?.addEventListener('abort', () => ctrl.abort()) + return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) + } + + /** + * Watches the SSL certificate. Takes a custom callback function to run whenever the value changes + */ + onChange( + callback: ( + value: [string, string, string] | null, + error?: Error, + ) => { cancel: boolean } | Promise<{ cancel: boolean }>, + ) { + ;(async () => { + const ctrl = new AbortController() + for await (const value of this.watch(ctrl.signal)) { + try { + const res = await callback(value) + if (res.cancel) { + ctrl.abort() + break + } + } catch (e) { + console.error( + 'callback function threw an error @ GetSslCertificate.onChange', + e, + ) + } + } + })() + .catch((e) => callback(null, e)) + .catch((e) => + console.error( + 'callback function threw an error @ GetSslCertificate.onChange', + e, + ), + ) + } + + /** + * Watches the SSL certificate. Returns when the predicate is true + */ + waitFor( + pred: (value: [string, string, string]) => boolean, + ): Promise<[string, string, string]> { + const ctrl = new AbortController() + return DropPromise.of( + Promise.resolve().then(async () => { + for await (const next of this.watchGen(ctrl.signal)) { + if (pred(next)) { + return next + } + } + throw new Error('context left before predicate passed') + }), + () => ctrl.abort(), + ) + } +} diff --git a/sdk/base/lib/util/GetStatus.ts b/sdk/base/lib/util/GetStatus.ts new file mode 100644 index 000000000..8804cbadf --- /dev/null +++ b/sdk/base/lib/util/GetStatus.ts @@ -0,0 +1,116 @@ +import { Effects } from '../Effects' +import { PackageId, StatusInfo } from '../osBindings' +import { AbortedError } from './AbortedError' +import { DropGenerator, DropPromise } from './Drop' + +export class GetStatus { + constructor( + readonly effects: Effects, + readonly opts: { packageId?: PackageId } = {}, + ) {} + + /** + * Returns the service status. Reruns the context from which it has been called if the underlying value changes + */ + const() { + return this.effects.getStatus({ + ...this.opts, + callback: + this.effects.constRetry && + (() => this.effects.constRetry && this.effects.constRetry()), + }) + } + /** + * Returns the service status. Does nothing if the value changes + */ + once() { + return this.effects.getStatus(this.opts) + } + + private async *watchGen(abort?: AbortSignal) { + const resolveCell = { resolve: () => {} } + this.effects.onLeaveContext(() => { + resolveCell.resolve() + }) + abort?.addEventListener('abort', () => resolveCell.resolve()) + while (this.effects.isInContext && !abort?.aborted) { + let callback: () => void = () => {} + const waitForNext = new Promise((resolve) => { + callback = resolve + resolveCell.resolve = resolve + }) + yield await this.effects.getStatus({ + ...this.opts, + callback: () => callback(), + }) + await waitForNext + } + return new Promise((_, rej) => rej(new AbortedError())) + } + + /** + * Watches the service status. Returns an async iterator that yields whenever the value changes + */ + watch( + abort?: AbortSignal, + ): AsyncGenerator { + const ctrl = new AbortController() + abort?.addEventListener('abort', () => ctrl.abort()) + return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort()) + } + + /** + * Watches the service status. Takes a custom callback function to run whenever the value changes + */ + onChange( + callback: ( + value: StatusInfo | null, + error?: Error, + ) => { cancel: boolean } | Promise<{ cancel: boolean }>, + ) { + ;(async () => { + const ctrl = new AbortController() + for await (const value of this.watch(ctrl.signal)) { + try { + const res = await callback(value) + if (res.cancel) { + ctrl.abort() + break + } + } catch (e) { + console.error( + 'callback function threw an error @ GetStatus.onChange', + e, + ) + } + } + })() + .catch((e) => callback(null, e)) + .catch((e) => + console.error( + 'callback function threw an error @ GetStatus.onChange', + e, + ), + ) + } + + /** + * Watches the service status. Returns when the predicate is true + */ + waitFor( + pred: (value: StatusInfo | null) => boolean, + ): Promise { + const ctrl = new AbortController() + return DropPromise.of( + Promise.resolve().then(async () => { + for await (const next of this.watchGen(ctrl.signal)) { + if (pred(next)) { + return next + } + } + return null + }), + () => ctrl.abort(), + ) + } +} diff --git a/sdk/base/lib/util/index.ts b/sdk/base/lib/util/index.ts index e156cb97b..7f0f9306e 100644 --- a/sdk/base/lib/util/index.ts +++ b/sdk/base/lib/util/index.ts @@ -15,7 +15,12 @@ export { once } from './once' export { asError } from './asError' export * as Patterns from './patterns' export * from './typeHelpers' +export { GetContainerIp } from './GetContainerIp' +export { GetHostInfo } from './GetHostInfo' export { GetOutboundGateway } from './GetOutboundGateway' +export { GetServiceManifest } from './GetServiceManifest' +export { GetSslCertificate } from './GetSslCertificate' +export { GetStatus } from './GetStatus' export { GetSystemSmtp } from './GetSystemSmtp' export { Graph, Vertex } from './graph' export { inMs } from './inMs'