mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-03-30 12:11:56 +00:00
feat: add getOutboundGateway effect and simplify VersionGraph init/uninit
Add getOutboundGateway effect across core, container-runtime, and SDK to let services query their effective outbound gateway with callback support. Remove preInstall/uninstall hooks from VersionGraph as they are no longer needed.
This commit is contained in:
@@ -253,6 +253,14 @@ export function makeEffects(context: EffectContext): Effects {
|
|||||||
callback: context.callbacks?.addCallback(options.callback) || null,
|
callback: context.callbacks?.addCallback(options.callback) || null,
|
||||||
}) as ReturnType<T.Effects["getSystemSmtp"]>
|
}) as ReturnType<T.Effects["getSystemSmtp"]>
|
||||||
},
|
},
|
||||||
|
getOutboundGateway(
|
||||||
|
...[options]: Parameters<T.Effects["getOutboundGateway"]>
|
||||||
|
) {
|
||||||
|
return rpcRound("get-outbound-gateway", {
|
||||||
|
...options,
|
||||||
|
callback: context.callbacks?.addCallback(options.callback) || null,
|
||||||
|
}) as ReturnType<T.Effects["getOutboundGateway"]>
|
||||||
|
},
|
||||||
listServiceInterfaces(
|
listServiceInterfaces(
|
||||||
...[options]: Parameters<T.Effects["listServiceInterfaces"]>
|
...[options]: Parameters<T.Effects["listServiceInterfaces"]>
|
||||||
) {
|
) {
|
||||||
|
|||||||
@@ -5,15 +5,16 @@ use std::time::{Duration, SystemTime};
|
|||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
use imbl::{Vector, vector};
|
use imbl::{OrdMap, Vector, vector};
|
||||||
use imbl_value::InternedString;
|
use imbl_value::InternedString;
|
||||||
|
use patch_db::TypedDbWatch;
|
||||||
|
use patch_db::json_ptr::JsonPointer;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tracing::warn;
|
use tracing::warn;
|
||||||
use ts_rs::TS;
|
use ts_rs::TS;
|
||||||
|
|
||||||
use patch_db::json_ptr::JsonPointer;
|
|
||||||
|
|
||||||
use crate::db::model::Database;
|
use crate::db::model::Database;
|
||||||
|
use crate::db::model::public::NetworkInterfaceInfo;
|
||||||
use crate::net::ssl::FullchainCertData;
|
use crate::net::ssl::FullchainCertData;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use crate::service::effects::context::EffectContext;
|
use crate::service::effects::context::EffectContext;
|
||||||
@@ -22,7 +23,7 @@ use crate::service::rpc::{CallbackHandle, CallbackId};
|
|||||||
use crate::service::{Service, ServiceActorSeed};
|
use crate::service::{Service, ServiceActorSeed};
|
||||||
use crate::util::collections::EqMap;
|
use crate::util::collections::EqMap;
|
||||||
use crate::util::future::NonDetachingJoinHandle;
|
use crate::util::future::NonDetachingJoinHandle;
|
||||||
use crate::{HostId, PackageId, ServiceInterfaceId};
|
use crate::{GatewayId, HostId, PackageId, ServiceInterfaceId};
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct ServiceCallbacks(Mutex<ServiceCallbackMap>);
|
pub struct ServiceCallbacks(Mutex<ServiceCallbackMap>);
|
||||||
@@ -32,7 +33,8 @@ struct ServiceCallbackMap {
|
|||||||
get_service_interface: BTreeMap<(PackageId, ServiceInterfaceId), Vec<CallbackHandler>>,
|
get_service_interface: BTreeMap<(PackageId, ServiceInterfaceId), Vec<CallbackHandler>>,
|
||||||
list_service_interfaces: BTreeMap<PackageId, Vec<CallbackHandler>>,
|
list_service_interfaces: BTreeMap<PackageId, Vec<CallbackHandler>>,
|
||||||
get_system_smtp: Vec<CallbackHandler>,
|
get_system_smtp: Vec<CallbackHandler>,
|
||||||
get_host_info: BTreeMap<(PackageId, HostId), (NonDetachingJoinHandle<()>, Vec<CallbackHandler>)>,
|
get_host_info:
|
||||||
|
BTreeMap<(PackageId, HostId), (NonDetachingJoinHandle<()>, Vec<CallbackHandler>)>,
|
||||||
get_ssl_certificate: EqMap<
|
get_ssl_certificate: EqMap<
|
||||||
(BTreeSet<InternedString>, FullchainCertData, Algorithm),
|
(BTreeSet<InternedString>, FullchainCertData, Algorithm),
|
||||||
(NonDetachingJoinHandle<()>, Vec<CallbackHandler>),
|
(NonDetachingJoinHandle<()>, Vec<CallbackHandler>),
|
||||||
@@ -40,6 +42,7 @@ struct ServiceCallbackMap {
|
|||||||
get_status: BTreeMap<PackageId, Vec<CallbackHandler>>,
|
get_status: BTreeMap<PackageId, Vec<CallbackHandler>>,
|
||||||
get_container_ip: BTreeMap<PackageId, Vec<CallbackHandler>>,
|
get_container_ip: BTreeMap<PackageId, Vec<CallbackHandler>>,
|
||||||
get_service_manifest: BTreeMap<PackageId, Vec<CallbackHandler>>,
|
get_service_manifest: BTreeMap<PackageId, Vec<CallbackHandler>>,
|
||||||
|
get_outbound_gateway: BTreeMap<PackageId, (NonDetachingJoinHandle<()>, Vec<CallbackHandler>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServiceCallbacks {
|
impl ServiceCallbacks {
|
||||||
@@ -76,6 +79,10 @@ impl ServiceCallbacks {
|
|||||||
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
|
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
|
||||||
!v.is_empty()
|
!v.is_empty()
|
||||||
});
|
});
|
||||||
|
this.get_outbound_gateway.retain(|_, (_, v)| {
|
||||||
|
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
|
||||||
|
!v.is_empty()
|
||||||
|
});
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -154,12 +161,10 @@ impl ServiceCallbacks {
|
|||||||
this.get_host_info
|
this.get_host_info
|
||||||
.entry((package_id.clone(), host_id.clone()))
|
.entry((package_id.clone(), host_id.clone()))
|
||||||
.or_insert_with(|| {
|
.or_insert_with(|| {
|
||||||
let ptr: JsonPointer = format!(
|
let ptr: JsonPointer =
|
||||||
"/public/packageData/{}/hosts/{}",
|
format!("/public/packageData/{}/hosts/{}", package_id, host_id)
|
||||||
package_id, host_id
|
.parse()
|
||||||
)
|
.expect("valid json pointer");
|
||||||
.parse()
|
|
||||||
.expect("valid json pointer");
|
|
||||||
let db = db.clone();
|
let db = db.clone();
|
||||||
let callbacks = Arc::clone(self);
|
let callbacks = Arc::clone(self);
|
||||||
let key = (package_id, host_id);
|
let key = (package_id, host_id);
|
||||||
@@ -174,9 +179,7 @@ impl ServiceCallbacks {
|
|||||||
.filter(|cb| !cb.0.is_empty())
|
.filter(|cb| !cb.0.is_empty())
|
||||||
}) {
|
}) {
|
||||||
if let Err(e) = cbs.call(vector![]).await {
|
if let Err(e) = cbs.call(vector![]).await {
|
||||||
tracing::error!(
|
tracing::error!("Error in host info callback: {e}");
|
||||||
"Error in host info callback: {e}"
|
|
||||||
);
|
|
||||||
tracing::debug!("{e:?}");
|
tracing::debug!("{e:?}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -287,6 +290,61 @@ impl ServiceCallbacks {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Register a callback for outbound gateway changes.
|
||||||
|
pub(super) fn add_get_outbound_gateway(
|
||||||
|
self: &Arc<Self>,
|
||||||
|
package_id: PackageId,
|
||||||
|
mut outbound_gateway: TypedDbWatch<Option<GatewayId>>,
|
||||||
|
mut default_outbound: Option<TypedDbWatch<Option<GatewayId>>>,
|
||||||
|
mut fallback: Option<TypedDbWatch<OrdMap<GatewayId, NetworkInterfaceInfo>>>,
|
||||||
|
handler: CallbackHandler,
|
||||||
|
) {
|
||||||
|
self.mutate(|this| {
|
||||||
|
this.get_outbound_gateway
|
||||||
|
.entry(package_id.clone())
|
||||||
|
.or_insert_with(|| {
|
||||||
|
let callbacks = Arc::clone(self);
|
||||||
|
let key = package_id;
|
||||||
|
(
|
||||||
|
tokio::spawn(async move {
|
||||||
|
tokio::select! {
|
||||||
|
_ = outbound_gateway.changed() => {}
|
||||||
|
_ = async {
|
||||||
|
if let Some(ref mut w) = default_outbound {
|
||||||
|
let _ = w.changed().await;
|
||||||
|
} else {
|
||||||
|
std::future::pending::<()>().await;
|
||||||
|
}
|
||||||
|
} => {}
|
||||||
|
_ = async {
|
||||||
|
if let Some(ref mut w) = fallback {
|
||||||
|
let _ = w.changed().await;
|
||||||
|
} else {
|
||||||
|
std::future::pending::<()>().await;
|
||||||
|
}
|
||||||
|
} => {}
|
||||||
|
}
|
||||||
|
if let Some(cbs) = callbacks.mutate(|this| {
|
||||||
|
this.get_outbound_gateway
|
||||||
|
.remove(&key)
|
||||||
|
.map(|(_, handlers)| CallbackHandlers(handlers))
|
||||||
|
.filter(|cb| !cb.0.is_empty())
|
||||||
|
}) {
|
||||||
|
if let Err(e) = cbs.call(vector![]).await {
|
||||||
|
tracing::error!("Error in outbound gateway callback: {e}");
|
||||||
|
tracing::debug!("{e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.into(),
|
||||||
|
Vec::new(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.1
|
||||||
|
.push(handler);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
pub(super) fn add_get_service_manifest(&self, package_id: PackageId, handler: CallbackHandler) {
|
pub(super) fn add_get_service_manifest(&self, package_id: PackageId, handler: CallbackHandler) {
|
||||||
self.mutate(|this| {
|
self.mutate(|this| {
|
||||||
this.get_service_manifest
|
this.get_service_manifest
|
||||||
|
|||||||
@@ -143,6 +143,10 @@ pub fn handler<C: Context>() -> ParentHandler<C> {
|
|||||||
"get-container-ip",
|
"get-container-ip",
|
||||||
from_fn_async(net::info::get_container_ip).no_cli(),
|
from_fn_async(net::info::get_container_ip).no_cli(),
|
||||||
)
|
)
|
||||||
|
.subcommand(
|
||||||
|
"get-outbound-gateway",
|
||||||
|
from_fn_async(net::info::get_outbound_gateway).no_cli(),
|
||||||
|
)
|
||||||
.subcommand(
|
.subcommand(
|
||||||
"get-os-ip",
|
"get-os-ip",
|
||||||
from_fn(|_: C| Ok::<_, Error>(Ipv4Addr::from(HOST_IP))),
|
from_fn(|_: C| Ok::<_, Error>(Ipv4Addr::from(HOST_IP))),
|
||||||
|
|||||||
@@ -1,9 +1,16 @@
|
|||||||
use std::net::Ipv4Addr;
|
use std::net::Ipv4Addr;
|
||||||
|
|
||||||
use crate::PackageId;
|
use imbl::OrdMap;
|
||||||
|
use patch_db::TypedDbWatch;
|
||||||
|
use patch_db::json_ptr::JsonPointer;
|
||||||
|
use tokio::process::Command;
|
||||||
|
|
||||||
|
use crate::db::model::public::NetworkInterfaceInfo;
|
||||||
use crate::service::effects::callbacks::CallbackHandler;
|
use crate::service::effects::callbacks::CallbackHandler;
|
||||||
use crate::service::effects::prelude::*;
|
use crate::service::effects::prelude::*;
|
||||||
use crate::service::rpc::CallbackId;
|
use crate::service::rpc::CallbackId;
|
||||||
|
use crate::util::Invoke;
|
||||||
|
use crate::{GatewayId, PackageId};
|
||||||
|
|
||||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, TS)]
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, TS)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
@@ -51,3 +58,116 @@ pub async fn get_container_ip(
|
|||||||
lxc.ip().await.map(Some)
|
lxc.ip().await.map(Some)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, TS)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
#[ts(export)]
|
||||||
|
pub struct GetOutboundGatewayParams {
|
||||||
|
#[ts(optional)]
|
||||||
|
callback: Option<CallbackId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_outbound_gateway(
|
||||||
|
context: EffectContext,
|
||||||
|
GetOutboundGatewayParams { callback }: GetOutboundGatewayParams,
|
||||||
|
) -> Result<GatewayId, Error> {
|
||||||
|
let context = context.deref()?;
|
||||||
|
let ctx = &context.seed.ctx;
|
||||||
|
|
||||||
|
// Resolve the effective gateway; DB watches are created atomically
|
||||||
|
// with each read to avoid race conditions.
|
||||||
|
let (gw, pkg_watch, os_watch, gateways_watch) =
|
||||||
|
resolve_outbound_gateway(ctx, &context.seed.id).await?;
|
||||||
|
|
||||||
|
if let Some(callback) = callback {
|
||||||
|
let callback = callback.register(&context.seed.persistent_container);
|
||||||
|
context.seed.ctx.callbacks.add_get_outbound_gateway(
|
||||||
|
context.seed.id.clone(),
|
||||||
|
pkg_watch,
|
||||||
|
os_watch,
|
||||||
|
gateways_watch,
|
||||||
|
CallbackHandler::new(&context, callback),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(gw)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn resolve_outbound_gateway(
|
||||||
|
ctx: &crate::context::RpcContext,
|
||||||
|
package_id: &PackageId,
|
||||||
|
) -> Result<
|
||||||
|
(
|
||||||
|
GatewayId,
|
||||||
|
TypedDbWatch<Option<GatewayId>>,
|
||||||
|
Option<TypedDbWatch<Option<GatewayId>>>,
|
||||||
|
Option<TypedDbWatch<OrdMap<GatewayId, NetworkInterfaceInfo>>>,
|
||||||
|
),
|
||||||
|
Error,
|
||||||
|
> {
|
||||||
|
// 1. Package-specific outbound gateway — subscribe before reading
|
||||||
|
let pkg_ptr: JsonPointer = format!("/public/packageData/{}/outboundGateway", package_id)
|
||||||
|
.parse()
|
||||||
|
.expect("valid json pointer");
|
||||||
|
let mut pkg_watch = ctx.db.watch(pkg_ptr).await;
|
||||||
|
let pkg_gw: Option<GatewayId> = imbl_value::from_value(pkg_watch.peek_and_mark_seen()?)?;
|
||||||
|
|
||||||
|
if let Some(gw) = pkg_gw {
|
||||||
|
return Ok((gw, pkg_watch.typed(), None, None));
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. OS-level default outbound — subscribe before reading
|
||||||
|
let os_ptr: JsonPointer = "/public/serverInfo/network/defaultOutbound"
|
||||||
|
.parse()
|
||||||
|
.expect("valid json pointer");
|
||||||
|
let mut os_watch = ctx.db.watch(os_ptr).await;
|
||||||
|
let default_outbound: Option<GatewayId> =
|
||||||
|
imbl_value::from_value(os_watch.peek_and_mark_seen()?)?;
|
||||||
|
|
||||||
|
if let Some(gw) = default_outbound {
|
||||||
|
return Ok((gw, pkg_watch.typed(), Some(os_watch.typed()), None));
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Fall through to main routing table — watch gateways for changes
|
||||||
|
let gw_ptr: JsonPointer = "/public/serverInfo/network/gateways"
|
||||||
|
.parse()
|
||||||
|
.expect("valid json pointer");
|
||||||
|
let mut gateways_watch = ctx.db.watch(gw_ptr).await;
|
||||||
|
gateways_watch.peek_and_mark_seen()?;
|
||||||
|
|
||||||
|
let gw = default_route_interface().await?;
|
||||||
|
Ok((
|
||||||
|
gw,
|
||||||
|
pkg_watch.typed(),
|
||||||
|
Some(os_watch.typed()),
|
||||||
|
Some(gateways_watch.typed()),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parses `ip route show table main` for the default route's `dev` field.
|
||||||
|
async fn default_route_interface() -> Result<GatewayId, Error> {
|
||||||
|
let output = Command::new("ip")
|
||||||
|
.arg("route")
|
||||||
|
.arg("show")
|
||||||
|
.arg("table")
|
||||||
|
.arg("main")
|
||||||
|
.invoke(ErrorKind::Network)
|
||||||
|
.await?;
|
||||||
|
let text = String::from_utf8_lossy(&output);
|
||||||
|
for line in text.lines() {
|
||||||
|
if line.starts_with("default ") {
|
||||||
|
let mut parts = line.split_whitespace();
|
||||||
|
while let Some(tok) = parts.next() {
|
||||||
|
if tok == "dev" {
|
||||||
|
if let Some(dev) = parts.next() {
|
||||||
|
return Ok(dev.parse().unwrap());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(Error::new(
|
||||||
|
eyre!("no default route found in main routing table"),
|
||||||
|
ErrorKind::Network,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|||||||
@@ -135,6 +135,8 @@ export type Effects = {
|
|||||||
}): Promise<string>
|
}): Promise<string>
|
||||||
/** Returns the IP address of StartOS */
|
/** Returns the IP address of StartOS */
|
||||||
getOsIp(): Promise<string>
|
getOsIp(): Promise<string>
|
||||||
|
/** Returns the effective outbound gateway for this service */
|
||||||
|
getOutboundGateway(options: { callback?: () => void }): Promise<string>
|
||||||
// interface
|
// interface
|
||||||
/** Creates an interface bound to a specific host and port to show to the user */
|
/** Creates an interface bound to a specific host and port to show to the user */
|
||||||
exportServiceInterface(options: ExportServiceInterfaceParams): Promise<null>
|
exportServiceInterface(options: ExportServiceInterfaceParams): Promise<null>
|
||||||
|
|||||||
4
sdk/base/lib/osBindings/GetOutboundGatewayParams.ts
Normal file
4
sdk/base/lib/osBindings/GetOutboundGatewayParams.ts
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||||
|
import type { CallbackId } from './CallbackId'
|
||||||
|
|
||||||
|
export type GetOutboundGatewayParams = { callback?: CallbackId }
|
||||||
@@ -110,6 +110,7 @@ export { GetContainerIpParams } from './GetContainerIpParams'
|
|||||||
export { GetHostInfoParams } from './GetHostInfoParams'
|
export { GetHostInfoParams } from './GetHostInfoParams'
|
||||||
export { GetOsAssetParams } from './GetOsAssetParams'
|
export { GetOsAssetParams } from './GetOsAssetParams'
|
||||||
export { GetOsVersionParams } from './GetOsVersionParams'
|
export { GetOsVersionParams } from './GetOsVersionParams'
|
||||||
|
export { GetOutboundGatewayParams } from './GetOutboundGatewayParams'
|
||||||
export { GetPackageParams } from './GetPackageParams'
|
export { GetPackageParams } from './GetPackageParams'
|
||||||
export { GetPackageResponseFull } from './GetPackageResponseFull'
|
export { GetPackageResponseFull } from './GetPackageResponseFull'
|
||||||
export { GetPackageResponse } from './GetPackageResponse'
|
export { GetPackageResponse } from './GetPackageResponse'
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import { GetSslKeyParams } from '.././osBindings'
|
|||||||
import { GetServiceInterfaceParams } from '.././osBindings'
|
import { GetServiceInterfaceParams } from '.././osBindings'
|
||||||
import { SetDependenciesParams } from '.././osBindings'
|
import { SetDependenciesParams } from '.././osBindings'
|
||||||
import { GetSystemSmtpParams } from '.././osBindings'
|
import { GetSystemSmtpParams } from '.././osBindings'
|
||||||
|
import { GetOutboundGatewayParams } from '.././osBindings'
|
||||||
import { GetServicePortForwardParams } from '.././osBindings'
|
import { GetServicePortForwardParams } from '.././osBindings'
|
||||||
import { ExportServiceInterfaceParams } from '.././osBindings'
|
import { ExportServiceInterfaceParams } from '.././osBindings'
|
||||||
import { ListServiceInterfacesParams } from '.././osBindings'
|
import { ListServiceInterfacesParams } from '.././osBindings'
|
||||||
@@ -83,6 +84,7 @@ describe('startosTypeValidation ', () => {
|
|||||||
getServiceManifest: {} as WithCallback<GetServiceManifestParams>,
|
getServiceManifest: {} as WithCallback<GetServiceManifestParams>,
|
||||||
getSystemSmtp: {} as WithCallback<GetSystemSmtpParams>,
|
getSystemSmtp: {} as WithCallback<GetSystemSmtpParams>,
|
||||||
getContainerIp: {} as WithCallback<GetContainerIpParams>,
|
getContainerIp: {} as WithCallback<GetContainerIpParams>,
|
||||||
|
getOutboundGateway: {} as WithCallback<GetOutboundGatewayParams>,
|
||||||
getOsIp: undefined,
|
getOsIp: undefined,
|
||||||
getServicePortForward: {} as GetServicePortForwardParams,
|
getServicePortForward: {} as GetServicePortForwardParams,
|
||||||
clearServiceInterfaces: {} as ClearServiceInterfacesParams,
|
clearServiceInterfaces: {} as ClearServiceInterfacesParams,
|
||||||
|
|||||||
105
sdk/base/lib/util/GetOutboundGateway.ts
Normal file
105
sdk/base/lib/util/GetOutboundGateway.ts
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
import { Effects } from '../Effects'
|
||||||
|
import { DropGenerator, DropPromise } from './Drop'
|
||||||
|
|
||||||
|
export class GetOutboundGateway {
|
||||||
|
constructor(readonly effects: Effects) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the effective outbound gateway. Reruns the context from which it has been called if the underlying value changes
|
||||||
|
*/
|
||||||
|
const() {
|
||||||
|
return this.effects.getOutboundGateway({
|
||||||
|
callback:
|
||||||
|
this.effects.constRetry &&
|
||||||
|
(() => this.effects.constRetry && this.effects.constRetry()),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Returns the effective outbound gateway. Does nothing if the value changes
|
||||||
|
*/
|
||||||
|
once() {
|
||||||
|
return this.effects.getOutboundGateway({})
|
||||||
|
}
|
||||||
|
|
||||||
|
private async *watchGen(abort?: AbortSignal) {
|
||||||
|
const resolveCell = { resolve: () => {} }
|
||||||
|
this.effects.onLeaveContext(() => {
|
||||||
|
resolveCell.resolve()
|
||||||
|
})
|
||||||
|
abort?.addEventListener('abort', () => resolveCell.resolve())
|
||||||
|
while (this.effects.isInContext && !abort?.aborted) {
|
||||||
|
let callback: () => void = () => {}
|
||||||
|
const waitForNext = new Promise<void>((resolve) => {
|
||||||
|
callback = resolve
|
||||||
|
resolveCell.resolve = resolve
|
||||||
|
})
|
||||||
|
yield await this.effects.getOutboundGateway({
|
||||||
|
callback: () => callback(),
|
||||||
|
})
|
||||||
|
await waitForNext
|
||||||
|
}
|
||||||
|
return new Promise<never>((_, rej) => rej(new Error('aborted')))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Watches the effective outbound gateway. Returns an async iterator that yields whenever the value changes
|
||||||
|
*/
|
||||||
|
watch(abort?: AbortSignal): AsyncGenerator<string, never, unknown> {
|
||||||
|
const ctrl = new AbortController()
|
||||||
|
abort?.addEventListener('abort', () => ctrl.abort())
|
||||||
|
return DropGenerator.of(this.watchGen(ctrl.signal), () => ctrl.abort())
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Watches the effective outbound gateway. Takes a custom callback function to run whenever the value changes
|
||||||
|
*/
|
||||||
|
onChange(
|
||||||
|
callback: (
|
||||||
|
value: string,
|
||||||
|
error?: Error,
|
||||||
|
) => { cancel: boolean } | Promise<{ cancel: boolean }>,
|
||||||
|
) {
|
||||||
|
;(async () => {
|
||||||
|
const ctrl = new AbortController()
|
||||||
|
for await (const value of this.watch(ctrl.signal)) {
|
||||||
|
try {
|
||||||
|
const res = await callback(value)
|
||||||
|
if (res.cancel) {
|
||||||
|
ctrl.abort()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
console.error(
|
||||||
|
'callback function threw an error @ GetOutboundGateway.onChange',
|
||||||
|
e,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})()
|
||||||
|
.catch((e) => callback('', e))
|
||||||
|
.catch((e) =>
|
||||||
|
console.error(
|
||||||
|
'callback function threw an error @ GetOutboundGateway.onChange',
|
||||||
|
e,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Watches the effective outbound gateway. Returns when the predicate is true
|
||||||
|
*/
|
||||||
|
waitFor(pred: (value: string) => boolean): Promise<string> {
|
||||||
|
const ctrl = new AbortController()
|
||||||
|
return DropPromise.of(
|
||||||
|
Promise.resolve().then(async () => {
|
||||||
|
for await (const next of this.watchGen(ctrl.signal)) {
|
||||||
|
if (pred(next)) {
|
||||||
|
return next
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ''
|
||||||
|
}),
|
||||||
|
() => ctrl.abort(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -14,6 +14,7 @@ export { once } from './once'
|
|||||||
export { asError } from './asError'
|
export { asError } from './asError'
|
||||||
export * as Patterns from './patterns'
|
export * as Patterns from './patterns'
|
||||||
export * from './typeHelpers'
|
export * from './typeHelpers'
|
||||||
|
export { GetOutboundGateway } from './GetOutboundGateway'
|
||||||
export { GetSystemSmtp } from './GetSystemSmtp'
|
export { GetSystemSmtp } from './GetSystemSmtp'
|
||||||
export { Graph, Vertex } from './graph'
|
export { Graph, Vertex } from './graph'
|
||||||
export { inMs } from './inMs'
|
export { inMs } from './inMs'
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ import { setupExportedUrls } from '../../base/lib/interfaces/setupExportedUrls'
|
|||||||
import { successFailure } from './trigger/successFailure'
|
import { successFailure } from './trigger/successFailure'
|
||||||
import { MultiHost, Scheme } from '../../base/lib/interfaces/Host'
|
import { MultiHost, Scheme } from '../../base/lib/interfaces/Host'
|
||||||
import { ServiceInterfaceBuilder } from '../../base/lib/interfaces/ServiceInterfaceBuilder'
|
import { ServiceInterfaceBuilder } from '../../base/lib/interfaces/ServiceInterfaceBuilder'
|
||||||
import { GetSystemSmtp } from './util'
|
import { GetOutboundGateway, GetSystemSmtp } from './util'
|
||||||
import { nullIfEmpty } from './util'
|
import { nullIfEmpty } from './util'
|
||||||
import { getServiceInterface, getServiceInterfaces } from './util'
|
import { getServiceInterface, getServiceInterfaces } from './util'
|
||||||
import {
|
import {
|
||||||
@@ -107,6 +107,7 @@ export class StartSdk<Manifest extends T.SDKManifest> {
|
|||||||
type AlreadyExposed =
|
type AlreadyExposed =
|
||||||
| 'getSslCertificate'
|
| 'getSslCertificate'
|
||||||
| 'getSystemSmtp'
|
| 'getSystemSmtp'
|
||||||
|
| 'getOutboundGateway'
|
||||||
| 'getContainerIp'
|
| 'getContainerIp'
|
||||||
| 'getDataVersion'
|
| 'getDataVersion'
|
||||||
| 'setDataVersion'
|
| 'setDataVersion'
|
||||||
@@ -445,6 +446,8 @@ export class StartSdk<Manifest extends T.SDKManifest> {
|
|||||||
) => new ServiceInterfaceBuilder({ ...options, effects }),
|
) => new ServiceInterfaceBuilder({ ...options, effects }),
|
||||||
getSystemSmtp: <E extends Effects>(effects: E) =>
|
getSystemSmtp: <E extends Effects>(effects: E) =>
|
||||||
new GetSystemSmtp(effects),
|
new GetSystemSmtp(effects),
|
||||||
|
getOutboundGateway: <E extends Effects>(effects: E) =>
|
||||||
|
new GetOutboundGateway(effects),
|
||||||
getSslCertificate: <E extends Effects>(
|
getSslCertificate: <E extends Effects>(
|
||||||
effects: E,
|
effects: E,
|
||||||
hostnames: string[],
|
hostnames: string[],
|
||||||
|
|||||||
@@ -64,8 +64,6 @@ export class VersionGraph<CurrentVersion extends string>
|
|||||||
private constructor(
|
private constructor(
|
||||||
readonly current: VersionInfo<CurrentVersion>,
|
readonly current: VersionInfo<CurrentVersion>,
|
||||||
versions: Array<VersionInfo<any>>,
|
versions: Array<VersionInfo<any>>,
|
||||||
private readonly preInstall?: InitScriptOrFn<'install'>,
|
|
||||||
private readonly uninstall?: UninitScript | UninitFn,
|
|
||||||
) {
|
) {
|
||||||
this.graph = once(() => {
|
this.graph = once(() => {
|
||||||
const graph = new Graph<
|
const graph = new Graph<
|
||||||
@@ -167,24 +165,8 @@ export class VersionGraph<CurrentVersion extends string>
|
|||||||
static of<
|
static of<
|
||||||
CurrentVersion extends string,
|
CurrentVersion extends string,
|
||||||
OtherVersions extends Array<VersionInfo<any>>,
|
OtherVersions extends Array<VersionInfo<any>>,
|
||||||
>(options: {
|
>(options: { current: VersionInfo<CurrentVersion>; other: OtherVersions }) {
|
||||||
current: VersionInfo<CurrentVersion>
|
return new VersionGraph(options.current, options.other)
|
||||||
other: OtherVersions
|
|
||||||
/**
|
|
||||||
* A script to run only on fresh install
|
|
||||||
*/
|
|
||||||
preInstall?: InitScriptOrFn<'install'>
|
|
||||||
/**
|
|
||||||
* A script to run only on uninstall
|
|
||||||
*/
|
|
||||||
uninstall?: UninitScriptOrFn
|
|
||||||
}) {
|
|
||||||
return new VersionGraph(
|
|
||||||
options.current,
|
|
||||||
options.other,
|
|
||||||
options.preInstall,
|
|
||||||
options.uninstall,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
async migrate({
|
async migrate({
|
||||||
effects,
|
effects,
|
||||||
@@ -270,7 +252,7 @@ export class VersionGraph<CurrentVersion extends string>
|
|||||||
.normalize(),
|
.normalize(),
|
||||||
)
|
)
|
||||||
|
|
||||||
async init(effects: T.Effects, kind: InitKind): Promise<void> {
|
async init(effects: T.Effects): Promise<void> {
|
||||||
const from = await getDataVersion(effects)
|
const from = await getDataVersion(effects)
|
||||||
if (from) {
|
if (from) {
|
||||||
await this.migrate({
|
await this.migrate({
|
||||||
@@ -279,10 +261,6 @@ export class VersionGraph<CurrentVersion extends string>
|
|||||||
to: this.currentVersion(),
|
to: this.currentVersion(),
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
kind = 'install' // implied by !dataVersion
|
|
||||||
if (this.preInstall)
|
|
||||||
if ('init' in this.preInstall) await this.preInstall.init(effects, kind)
|
|
||||||
else await this.preInstall(effects, kind)
|
|
||||||
await effects.setDataVersion({ version: this.current.options.version })
|
await effects.setDataVersion({ version: this.current.options.version })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -300,11 +278,6 @@ export class VersionGraph<CurrentVersion extends string>
|
|||||||
to: target,
|
to: target,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
if (this.uninstall)
|
|
||||||
if ('uninit' in this.uninstall)
|
|
||||||
await this.uninstall.uninit(effects, target)
|
|
||||||
else await this.uninstall(effects, target)
|
|
||||||
}
|
}
|
||||||
await setDataVersion(effects, target)
|
await setDataVersion(effects, target)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user