feat: add DbWatchedCallbacks abstraction, TypedDbWatch-based callbacks, and SDK watchable wrappers

- Extract DbWatchedCallbacks<K> abstraction in callbacks.rs using SyncMutex
  for the repeated patchdb subscribe-wait-fire-remove callback pattern
- Move get_host_info and get_status callbacks to use TypedDbWatch instead of
  raw db.subscribe, eliminating race conditions between reading and watching
- Make getStatus return Option<StatusInfo> to handle uninstalled packages
- Add getStatus .const/.once/.watch/.onChange wrapper in container-runtime
  for legacy SystemForEmbassy adapter
- Add SDK watchable wrapper classes for all callback-enabled effects:
  GetStatus, GetServiceManifest, GetHostInfo, GetContainerIp, GetSslCertificate
This commit is contained in:
Aiden McClelland
2026-03-09 15:24:56 -06:00
parent 43e514f9ee
commit c52fcf5087
12 changed files with 784 additions and 102 deletions

View File

@@ -42,6 +42,74 @@ function todo(): never {
throw new Error("Not implemented")
}
function getStatus(
effects: Effects,
options: Omit<Parameters<Effects["getStatus"]>[0], "callback"> = {},
) {
async function* watch(abort?: AbortSignal) {
const resolveCell = { resolve: () => {} }
effects.onLeaveContext(() => {
resolveCell.resolve()
})
abort?.addEventListener("abort", () => resolveCell.resolve())
while (effects.isInContext && !abort?.aborted) {
let callback: () => void = () => {}
const waitForNext = new Promise<void>((resolve) => {
callback = resolve
resolveCell.resolve = resolve
})
yield await effects.getStatus({ ...options, callback })
await waitForNext
}
}
return {
const: () =>
effects.getStatus({
...options,
callback:
effects.constRetry &&
(() => effects.constRetry && effects.constRetry()),
}),
once: () => effects.getStatus(options),
watch: (abort?: AbortSignal) => {
const ctrl = new AbortController()
abort?.addEventListener("abort", () => ctrl.abort())
return watch(ctrl.signal)
},
onChange: (
callback: (
value: T.StatusInfo | null,
error?: Error,
) => { cancel: boolean } | Promise<{ cancel: boolean }>,
) => {
;(async () => {
const ctrl = new AbortController()
for await (const value of watch(ctrl.signal)) {
try {
const res = await callback(value)
if (res.cancel) {
ctrl.abort()
break
}
} catch (e) {
console.error(
"callback function threw an error @ getStatus.onChange",
e,
)
}
}
})()
.catch((e) => callback(null, e as Error))
.catch((e) =>
console.error(
"callback function threw an error @ getStatus.onChange",
e,
),
)
},
}
}
/**
* Local type for procedure values from the manifest.
* The manifest's zod schemas use ZodTypeAny casts that produce `unknown` in zod v4.
@@ -1046,6 +1114,8 @@ export class SystemForEmbassy implements System {
timeoutMs: number | null,
): Promise<void> {
// TODO: docker
const status = await getStatus(effects, { packageId: id }).const()
if (!status) return
await effects.mount({
location: `/media/embassy/${id}`,
target: {
@@ -1204,6 +1274,11 @@ async function updateConfig(
if (specValue.target === "config") {
const jp = require("jsonpath")
const depId = specValue["package-id"]
const depStatus = await getStatus(effects, { packageId: depId }).const()
if (!depStatus) {
mutConfigValue[key] = null
continue
}
await effects.mount({
location: `/media/embassy/${depId}`,
target: {

View File

@@ -1,6 +1,6 @@
use std::cmp::min;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::{Arc, Mutex, Weak};
use std::sync::{Arc, Weak};
use std::time::{Duration, SystemTime};
use clap::Parser;
@@ -8,13 +8,12 @@ use futures::future::join_all;
use imbl::{OrdMap, Vector, vector};
use imbl_value::InternedString;
use patch_db::TypedDbWatch;
use patch_db::json_ptr::JsonPointer;
use serde::{Deserialize, Serialize};
use tracing::warn;
use ts_rs::TS;
use crate::db::model::Database;
use crate::db::model::public::NetworkInterfaceInfo;
use crate::net::host::Host;
use crate::net::ssl::FullchainCertData;
use crate::prelude::*;
use crate::service::effects::context::EffectContext;
@@ -23,23 +22,104 @@ use crate::service::rpc::{CallbackHandle, CallbackId};
use crate::service::{Service, ServiceActorSeed};
use crate::util::collections::EqMap;
use crate::util::future::NonDetachingJoinHandle;
use crate::util::sync::SyncMutex;
use crate::status::StatusInfo;
use crate::{GatewayId, HostId, PackageId, ServiceInterfaceId};
#[derive(Default)]
pub struct ServiceCallbacks(Mutex<ServiceCallbackMap>);
/// Abstraction for callbacks that are triggered by patchdb subscriptions.
///
/// Handles the subscribe-wait-fire-remove pattern: when a callback is first
/// registered for a key, a patchdb subscription is spawned. When the subscription
/// fires, all handlers are consumed and invoked, then the subscription stops.
/// A new subscription is created if a handler is registered again.
pub struct DbWatchedCallbacks<K: Ord> {
label: &'static str,
inner: SyncMutex<BTreeMap<K, (NonDetachingJoinHandle<()>, Vec<CallbackHandler>)>>,
}
impl<K: Ord + Clone + Send + Sync + 'static> DbWatchedCallbacks<K> {
pub fn new(label: &'static str) -> Self {
Self {
label,
inner: SyncMutex::new(BTreeMap::new()),
}
}
pub fn add<T: Send + 'static>(
self: &Arc<Self>,
key: K,
watch: TypedDbWatch<T>,
handler: CallbackHandler,
) {
self.inner.mutate(|map| {
map.entry(key.clone())
.or_insert_with(|| {
let this = Arc::clone(self);
let k = key;
let label = self.label;
(
tokio::spawn(async move {
let mut watch = watch.untyped();
if watch.changed().await.is_ok() {
if let Some(cbs) = this.inner.mutate(|map| {
map.remove(&k)
.map(|(_, handlers)| CallbackHandlers(handlers))
.filter(|cb| !cb.0.is_empty())
}) {
let value = watch
.peek_and_mark_seen()
.unwrap_or_default();
if let Err(e) = cbs.call(vector![value]).await {
tracing::error!("Error in {label} callback: {e}");
tracing::debug!("{e:?}");
}
}
}
})
.into(),
Vec::new(),
)
})
.1
.push(handler);
})
}
pub fn gc(&self) {
self.inner.mutate(|map| {
map.retain(|_, (_, v)| {
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
!v.is_empty()
});
})
}
}
pub struct ServiceCallbacks {
inner: SyncMutex<ServiceCallbackMap>,
get_host_info: Arc<DbWatchedCallbacks<(PackageId, HostId)>>,
get_status: Arc<DbWatchedCallbacks<PackageId>>,
}
impl Default for ServiceCallbacks {
fn default() -> Self {
Self {
inner: SyncMutex::new(ServiceCallbackMap::default()),
get_host_info: Arc::new(DbWatchedCallbacks::new("host info")),
get_status: Arc::new(DbWatchedCallbacks::new("get_status")),
}
}
}
#[derive(Default)]
struct ServiceCallbackMap {
get_service_interface: BTreeMap<(PackageId, ServiceInterfaceId), Vec<CallbackHandler>>,
list_service_interfaces: BTreeMap<PackageId, Vec<CallbackHandler>>,
get_system_smtp: Vec<CallbackHandler>,
get_host_info:
BTreeMap<(PackageId, HostId), (NonDetachingJoinHandle<()>, Vec<CallbackHandler>)>,
get_ssl_certificate: EqMap<
(BTreeSet<InternedString>, FullchainCertData, Algorithm),
(NonDetachingJoinHandle<()>, Vec<CallbackHandler>),
>,
get_status: BTreeMap<PackageId, Vec<CallbackHandler>>,
get_container_ip: BTreeMap<PackageId, Vec<CallbackHandler>>,
get_service_manifest: BTreeMap<PackageId, Vec<CallbackHandler>>,
get_outbound_gateway: BTreeMap<PackageId, (NonDetachingJoinHandle<()>, Vec<CallbackHandler>)>,
@@ -47,8 +127,7 @@ struct ServiceCallbackMap {
impl ServiceCallbacks {
fn mutate<T>(&self, f: impl FnOnce(&mut ServiceCallbackMap) -> T) -> T {
let mut this = self.0.lock().unwrap();
f(&mut *this)
self.inner.mutate(f)
}
pub fn gc(&self) {
@@ -63,18 +142,10 @@ impl ServiceCallbacks {
});
this.get_system_smtp
.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
this.get_host_info.retain(|_, (_, v)| {
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
!v.is_empty()
});
this.get_ssl_certificate.retain(|_, (_, v)| {
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
!v.is_empty()
});
this.get_status.retain(|_, v| {
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
!v.is_empty()
});
this.get_service_manifest.retain(|_, v| {
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
!v.is_empty()
@@ -83,7 +154,9 @@ impl ServiceCallbacks {
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
!v.is_empty()
});
})
});
self.get_host_info.gc();
self.get_status.gc();
}
pub(super) fn add_get_service_interface(
@@ -151,51 +224,14 @@ impl ServiceCallbacks {
}
pub(super) fn add_get_host_info(
self: &Arc<Self>,
db: &TypedPatchDb<Database>,
&self,
package_id: PackageId,
host_id: HostId,
watch: TypedDbWatch<Host>,
handler: CallbackHandler,
) {
self.mutate(|this| {
this.get_host_info
.entry((package_id.clone(), host_id.clone()))
.or_insert_with(|| {
let ptr: JsonPointer =
format!("/public/packageData/{}/hosts/{}", package_id, host_id)
.parse()
.expect("valid json pointer");
let db = db.clone();
let callbacks = Arc::clone(self);
let key = (package_id, host_id);
(
tokio::spawn(async move {
let mut sub = db.subscribe(ptr).await;
while sub.recv().await.is_some() {
if let Some(cbs) = callbacks.mutate(|this| {
this.get_host_info
.remove(&key)
.map(|(_, handlers)| CallbackHandlers(handlers))
.filter(|cb| !cb.0.is_empty())
}) {
if let Err(e) = cbs.call(vector![]).await {
tracing::error!("Error in host info callback: {e}");
tracing::debug!("{e:?}");
}
}
// entry was removed when we consumed handlers,
// so stop watching — a new subscription will be
// created if the service re-registers
break;
}
})
.into(),
Vec::new(),
)
})
.1
.push(handler);
})
self.get_host_info
.add((package_id, host_id), watch, handler);
}
pub(super) fn add_get_ssl_certificate(
@@ -256,19 +292,14 @@ impl ServiceCallbacks {
.push(handler);
})
}
pub(super) fn add_get_status(&self, package_id: PackageId, handler: CallbackHandler) {
self.mutate(|this| this.get_status.entry(package_id).or_default().push(handler))
}
#[must_use]
pub fn get_status(&self, package_id: &PackageId) -> Option<CallbackHandlers> {
self.mutate(|this| {
if let Some(watched) = this.get_status.remove(package_id) {
Some(CallbackHandlers(watched))
} else {
None
}
.filter(|cb| !cb.0.is_empty())
})
pub(super) fn add_get_status(
&self,
package_id: PackageId,
watch: TypedDbWatch<StatusInfo>,
handler: CallbackHandler,
) {
self.get_status.add(package_id, watch, handler);
}
pub(super) fn add_get_container_ip(&self, package_id: PackageId, handler: CallbackHandler) {

View File

@@ -80,27 +80,32 @@ pub async fn get_status(
package_id,
callback,
}: GetStatusParams,
) -> Result<StatusInfo, Error> {
) -> Result<Option<StatusInfo>, Error> {
let context = context.deref()?;
let id = package_id.unwrap_or_else(|| context.seed.id.clone());
let db = context.seed.ctx.db.peek().await;
let ptr = format!("/public/packageData/{}/statusInfo", id)
.parse()
.expect("valid json pointer");
let mut watch = context
.seed
.ctx
.db
.watch(ptr)
.await
.typed::<StatusInfo>();
let status = watch.peek_and_mark_seen()?.de().ok();
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context.seed.ctx.callbacks.add_get_status(
id.clone(),
watch,
super::callbacks::CallbackHandler::new(&context, callback),
);
}
let status = db
.as_public()
.as_package_data()
.as_idx(&id)
.or_not_found(&id)?
.as_status_info()
.de()?;
Ok(status)
}

View File

@@ -23,26 +23,30 @@ pub async fn get_host_info(
}: GetHostInfoParams,
) -> Result<Option<Host>, Error> {
let context = context.deref()?;
let db = context.seed.ctx.db.peek().await;
let package_id = package_id.unwrap_or_else(|| context.seed.id.clone());
let ptr = format!("/public/packageData/{}/hosts/{}", package_id, host_id)
.parse()
.expect("valid json pointer");
let mut watch = context
.seed
.ctx
.db
.watch(ptr)
.await
.typed::<Host>();
let res = watch.peek_and_mark_seen()?.de().ok();
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context.seed.ctx.callbacks.add_get_host_info(
&context.seed.ctx.db,
package_id.clone(),
host_id.clone(),
watch,
CallbackHandler::new(&context, callback),
);
}
let res = db
.as_public()
.as_package_data()
.as_idx(&package_id)
.and_then(|m| m.as_hosts().as_idx(&host_id))
.map(|m| m.de())
.transpose()?;
Ok(res)
}

View File

@@ -1,7 +1,6 @@
use std::sync::Arc;
use std::time::Duration;
use imbl::vector;
use patch_db::TypedDbWatch;
use super::ServiceActorSeed;
@@ -99,16 +98,9 @@ async fn service_actor_loop<'a>(
seed: &'a Arc<ServiceActorSeed>,
transition: &mut Option<Transition<'a>>,
) -> Result<(), Error> {
let id = &seed.id;
let status_model = watch.peek_and_mark_seen()?;
let status = status_model.de()?;
if let Some(callbacks) = seed.ctx.callbacks.get_status(id) {
callbacks
.call(vector![patch_db::ModelExt::into_value(status_model)])
.await?;
}
match status {
StatusInfo {
desired: DesiredStatus::Running | DesiredStatus::Restarting,

View File

@@ -69,7 +69,7 @@ export type Effects = {
getStatus(options: {
packageId?: PackageId
callback?: () => void
}): Promise<StatusInfo>
}): Promise<StatusInfo | null>
/** DEPRECATED: indicate to the host os what runstate the service is in */
setMainStatus(options: SetMainStatus): Promise<null>

View File

@@ -0,0 +1,112 @@
import { Effects } from '../Effects'
import { PackageId } from '../osBindings'
import { AbortedError } from './AbortedError'
import { DropGenerator, DropPromise } from './Drop'
export class GetContainerIp {
constructor(
readonly effects: Effects,
readonly opts: { packageId?: PackageId } = {},
) {}
/**
* Returns the container IP. Reruns the context from which it has been called if the underlying value changes
*/
const() {
return this.effects.getContainerIp({
...this.opts,
callback:
this.effects.constRetry &&
(() => this.effects.constRetry && this.effects.constRetry()),
})
}
/**
* Returns the container IP. Does nothing if the value changes
*/
once() {
return this.effects.getContainerIp(this.opts)
}
private async *watchGen(abort?: AbortSignal) {
const resolveCell = { resolve: () => {} }
this.effects.onLeaveContext(() => {
resolveCell.resolve()
})
abort?.addEventListener('abort', () => resolveCell.resolve())
while (this.effects.isInContext && !abort?.aborted) {
let callback: () => void = () => {}
const waitForNext = new Promise<void>((resolve) => {
callback = resolve
resolveCell.resolve = resolve
})
yield await this.effects.getContainerIp({
...this.opts,
callback: () => callback(),
})
await waitForNext
}
return new Promise<never>((_, rej) => rej(new AbortedError()))
}
/**
* Watches the container IP. Returns an async iterator that yields whenever the value changes
*/
watch(abort?: AbortSignal): AsyncGenerator<string, never, unknown> {
const ctrl = new AbortController()
abort?.addEventListener('abort', () => ctrl.abort())
return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort())
}
/**
* Watches the container IP. Takes a custom callback function to run whenever the value changes
*/
onChange(
callback: (
value: string,
error?: Error,
) => { cancel: boolean } | Promise<{ cancel: boolean }>,
) {
;(async () => {
const ctrl = new AbortController()
for await (const value of this.watch(ctrl.signal)) {
try {
const res = await callback(value)
if (res.cancel) {
ctrl.abort()
break
}
} catch (e) {
console.error(
'callback function threw an error @ GetContainerIp.onChange',
e,
)
}
}
})()
.catch((e) => callback('', e))
.catch((e) =>
console.error(
'callback function threw an error @ GetContainerIp.onChange',
e,
),
)
}
/**
* Watches the container IP. Returns when the predicate is true
*/
waitFor(pred: (value: string) => boolean): Promise<string> {
const ctrl = new AbortController()
return DropPromise.of(
Promise.resolve().then(async () => {
for await (const next of this.watchGen(ctrl.signal)) {
if (pred(next)) {
return next
}
}
return ''
}),
() => ctrl.abort(),
)
}
}

View File

@@ -0,0 +1,112 @@
import { Effects } from '../Effects'
import { Host, HostId, PackageId } from '../osBindings'
import { AbortedError } from './AbortedError'
import { DropGenerator, DropPromise } from './Drop'
export class GetHostInfo {
constructor(
readonly effects: Effects,
readonly opts: { hostId: HostId; packageId?: PackageId },
) {}
/**
* Returns host info. Reruns the context from which it has been called if the underlying value changes
*/
const() {
return this.effects.getHostInfo({
...this.opts,
callback:
this.effects.constRetry &&
(() => this.effects.constRetry && this.effects.constRetry()),
})
}
/**
* Returns host info. Does nothing if the value changes
*/
once() {
return this.effects.getHostInfo(this.opts)
}
private async *watchGen(abort?: AbortSignal) {
const resolveCell = { resolve: () => {} }
this.effects.onLeaveContext(() => {
resolveCell.resolve()
})
abort?.addEventListener('abort', () => resolveCell.resolve())
while (this.effects.isInContext && !abort?.aborted) {
let callback: () => void = () => {}
const waitForNext = new Promise<void>((resolve) => {
callback = resolve
resolveCell.resolve = resolve
})
yield await this.effects.getHostInfo({
...this.opts,
callback: () => callback(),
})
await waitForNext
}
return new Promise<never>((_, rej) => rej(new AbortedError()))
}
/**
* Watches host info. Returns an async iterator that yields whenever the value changes
*/
watch(abort?: AbortSignal): AsyncGenerator<Host | null, never, unknown> {
const ctrl = new AbortController()
abort?.addEventListener('abort', () => ctrl.abort())
return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort())
}
/**
* Watches host info. Takes a custom callback function to run whenever the value changes
*/
onChange(
callback: (
value: Host | null,
error?: Error,
) => { cancel: boolean } | Promise<{ cancel: boolean }>,
) {
;(async () => {
const ctrl = new AbortController()
for await (const value of this.watch(ctrl.signal)) {
try {
const res = await callback(value)
if (res.cancel) {
ctrl.abort()
break
}
} catch (e) {
console.error(
'callback function threw an error @ GetHostInfo.onChange',
e,
)
}
}
})()
.catch((e) => callback(null, e))
.catch((e) =>
console.error(
'callback function threw an error @ GetHostInfo.onChange',
e,
),
)
}
/**
* Watches host info. Returns when the predicate is true
*/
waitFor(pred: (value: Host | null) => boolean): Promise<Host | null> {
const ctrl = new AbortController()
return DropPromise.of(
Promise.resolve().then(async () => {
for await (const next of this.watchGen(ctrl.signal)) {
if (pred(next)) {
return next
}
}
return null
}),
() => ctrl.abort(),
)
}
}

View File

@@ -0,0 +1,112 @@
import { Effects } from '../Effects'
import { Manifest, PackageId } from '../osBindings'
import { AbortedError } from './AbortedError'
import { DropGenerator, DropPromise } from './Drop'
export class GetServiceManifest {
constructor(
readonly effects: Effects,
readonly opts: { packageId: PackageId },
) {}
/**
* Returns the service manifest. Reruns the context from which it has been called if the underlying value changes
*/
const() {
return this.effects.getServiceManifest({
...this.opts,
callback:
this.effects.constRetry &&
(() => this.effects.constRetry && this.effects.constRetry()),
})
}
/**
* Returns the service manifest. Does nothing if the value changes
*/
once() {
return this.effects.getServiceManifest(this.opts)
}
private async *watchGen(abort?: AbortSignal) {
const resolveCell = { resolve: () => {} }
this.effects.onLeaveContext(() => {
resolveCell.resolve()
})
abort?.addEventListener('abort', () => resolveCell.resolve())
while (this.effects.isInContext && !abort?.aborted) {
let callback: () => void = () => {}
const waitForNext = new Promise<void>((resolve) => {
callback = resolve
resolveCell.resolve = resolve
})
yield await this.effects.getServiceManifest({
...this.opts,
callback: () => callback(),
})
await waitForNext
}
return new Promise<never>((_, rej) => rej(new AbortedError()))
}
/**
* Watches the service manifest. Returns an async iterator that yields whenever the value changes
*/
watch(abort?: AbortSignal): AsyncGenerator<Manifest, never, unknown> {
const ctrl = new AbortController()
abort?.addEventListener('abort', () => ctrl.abort())
return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort())
}
/**
* Watches the service manifest. Takes a custom callback function to run whenever the value changes
*/
onChange(
callback: (
value: Manifest | null,
error?: Error,
) => { cancel: boolean } | Promise<{ cancel: boolean }>,
) {
;(async () => {
const ctrl = new AbortController()
for await (const value of this.watch(ctrl.signal)) {
try {
const res = await callback(value)
if (res.cancel) {
ctrl.abort()
break
}
} catch (e) {
console.error(
'callback function threw an error @ GetServiceManifest.onChange',
e,
)
}
}
})()
.catch((e) => callback(null, e))
.catch((e) =>
console.error(
'callback function threw an error @ GetServiceManifest.onChange',
e,
),
)
}
/**
* Watches the service manifest. Returns when the predicate is true
*/
waitFor(pred: (value: Manifest) => boolean): Promise<Manifest> {
const ctrl = new AbortController()
return DropPromise.of(
Promise.resolve().then(async () => {
for await (const next of this.watchGen(ctrl.signal)) {
if (pred(next)) {
return next
}
}
throw new Error('context left before predicate passed')
}),
() => ctrl.abort(),
)
}
}

View File

@@ -0,0 +1,118 @@
import { Effects } from '../Effects'
import { AbortedError } from './AbortedError'
import { DropGenerator, DropPromise } from './Drop'
export class GetSslCertificate {
constructor(
readonly effects: Effects,
readonly opts: {
hostnames: string[]
algorithm?: 'ecdsa' | 'ed25519'
},
) {}
/**
* Returns the SSL certificate. Reruns the context from which it has been called if the underlying value changes
*/
const() {
return this.effects.getSslCertificate({
...this.opts,
callback:
this.effects.constRetry &&
(() => this.effects.constRetry && this.effects.constRetry()),
})
}
/**
* Returns the SSL certificate. Does nothing if the value changes
*/
once() {
return this.effects.getSslCertificate(this.opts)
}
private async *watchGen(abort?: AbortSignal) {
const resolveCell = { resolve: () => {} }
this.effects.onLeaveContext(() => {
resolveCell.resolve()
})
abort?.addEventListener('abort', () => resolveCell.resolve())
while (this.effects.isInContext && !abort?.aborted) {
let callback: () => void = () => {}
const waitForNext = new Promise<void>((resolve) => {
callback = resolve
resolveCell.resolve = resolve
})
yield await this.effects.getSslCertificate({
...this.opts,
callback: () => callback(),
})
await waitForNext
}
return new Promise<never>((_, rej) => rej(new AbortedError()))
}
/**
* Watches the SSL certificate. Returns an async iterator that yields whenever the value changes
*/
watch(
abort?: AbortSignal,
): AsyncGenerator<[string, string, string], never, unknown> {
const ctrl = new AbortController()
abort?.addEventListener('abort', () => ctrl.abort())
return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort())
}
/**
* Watches the SSL certificate. Takes a custom callback function to run whenever the value changes
*/
onChange(
callback: (
value: [string, string, string] | null,
error?: Error,
) => { cancel: boolean } | Promise<{ cancel: boolean }>,
) {
;(async () => {
const ctrl = new AbortController()
for await (const value of this.watch(ctrl.signal)) {
try {
const res = await callback(value)
if (res.cancel) {
ctrl.abort()
break
}
} catch (e) {
console.error(
'callback function threw an error @ GetSslCertificate.onChange',
e,
)
}
}
})()
.catch((e) => callback(null, e))
.catch((e) =>
console.error(
'callback function threw an error @ GetSslCertificate.onChange',
e,
),
)
}
/**
* Watches the SSL certificate. Returns when the predicate is true
*/
waitFor(
pred: (value: [string, string, string]) => boolean,
): Promise<[string, string, string]> {
const ctrl = new AbortController()
return DropPromise.of(
Promise.resolve().then(async () => {
for await (const next of this.watchGen(ctrl.signal)) {
if (pred(next)) {
return next
}
}
throw new Error('context left before predicate passed')
}),
() => ctrl.abort(),
)
}
}

View File

@@ -0,0 +1,116 @@
import { Effects } from '../Effects'
import { PackageId, StatusInfo } from '../osBindings'
import { AbortedError } from './AbortedError'
import { DropGenerator, DropPromise } from './Drop'
export class GetStatus {
constructor(
readonly effects: Effects,
readonly opts: { packageId?: PackageId } = {},
) {}
/**
* Returns the service status. Reruns the context from which it has been called if the underlying value changes
*/
const() {
return this.effects.getStatus({
...this.opts,
callback:
this.effects.constRetry &&
(() => this.effects.constRetry && this.effects.constRetry()),
})
}
/**
* Returns the service status. Does nothing if the value changes
*/
once() {
return this.effects.getStatus(this.opts)
}
private async *watchGen(abort?: AbortSignal) {
const resolveCell = { resolve: () => {} }
this.effects.onLeaveContext(() => {
resolveCell.resolve()
})
abort?.addEventListener('abort', () => resolveCell.resolve())
while (this.effects.isInContext && !abort?.aborted) {
let callback: () => void = () => {}
const waitForNext = new Promise<void>((resolve) => {
callback = resolve
resolveCell.resolve = resolve
})
yield await this.effects.getStatus({
...this.opts,
callback: () => callback(),
})
await waitForNext
}
return new Promise<never>((_, rej) => rej(new AbortedError()))
}
/**
* Watches the service status. Returns an async iterator that yields whenever the value changes
*/
watch(
abort?: AbortSignal,
): AsyncGenerator<StatusInfo | null, never, unknown> {
const ctrl = new AbortController()
abort?.addEventListener('abort', () => ctrl.abort())
return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort())
}
/**
* Watches the service status. Takes a custom callback function to run whenever the value changes
*/
onChange(
callback: (
value: StatusInfo | null,
error?: Error,
) => { cancel: boolean } | Promise<{ cancel: boolean }>,
) {
;(async () => {
const ctrl = new AbortController()
for await (const value of this.watch(ctrl.signal)) {
try {
const res = await callback(value)
if (res.cancel) {
ctrl.abort()
break
}
} catch (e) {
console.error(
'callback function threw an error @ GetStatus.onChange',
e,
)
}
}
})()
.catch((e) => callback(null, e))
.catch((e) =>
console.error(
'callback function threw an error @ GetStatus.onChange',
e,
),
)
}
/**
* Watches the service status. Returns when the predicate is true
*/
waitFor(
pred: (value: StatusInfo | null) => boolean,
): Promise<StatusInfo | null> {
const ctrl = new AbortController()
return DropPromise.of(
Promise.resolve().then(async () => {
for await (const next of this.watchGen(ctrl.signal)) {
if (pred(next)) {
return next
}
}
return null
}),
() => ctrl.abort(),
)
}
}

View File

@@ -15,7 +15,12 @@ export { once } from './once'
export { asError } from './asError'
export * as Patterns from './patterns'
export * from './typeHelpers'
export { GetContainerIp } from './GetContainerIp'
export { GetHostInfo } from './GetHostInfo'
export { GetOutboundGateway } from './GetOutboundGateway'
export { GetServiceManifest } from './GetServiceManifest'
export { GetSslCertificate } from './GetSslCertificate'
export { GetStatus } from './GetStatus'
export { GetSystemSmtp } from './GetSystemSmtp'
export { Graph, Vertex } from './graph'
export { inMs } from './inMs'