refactor: convert service callbacks to DbWatch pattern

Convert getServiceInterface, listServiceInterfaces, getSystemSmtp, and
getServiceManifest from manual callback triggers to DbWatchedCallbacks.
getServiceManifest now always returns the installed manifest.
This commit is contained in:
Aiden McClelland
2026-03-16 20:09:10 -06:00
parent 7313693a9e
commit dd9837b9b2
8 changed files with 108 additions and 220 deletions

View File

@@ -145,9 +145,10 @@ pub struct GatewayInfo {
pub public: bool,
}
#[derive(Clone, Debug, Deserialize, Serialize, TS)]
#[ts(export)]
#[derive(Clone, Debug, Deserialize, Serialize, HasModel, TS)]
#[serde(rename_all = "camelCase")]
#[model = "Model<Self>"]
#[ts(export)]
pub struct ServiceInterface {
pub id: ServiceInterfaceId,
pub name: String,

View File

@@ -12,18 +12,20 @@ use serde::{Deserialize, Serialize};
use tracing::warn;
use ts_rs::TS;
use crate::db::model::package::PackageState;
use crate::db::model::public::NetworkInterfaceInfo;
use crate::net::host::Host;
use crate::net::service_interface::ServiceInterface;
use crate::net::ssl::FullchainCertData;
use crate::prelude::*;
use crate::service::effects::context::EffectContext;
use crate::service::effects::net::ssl::Algorithm;
use crate::service::rpc::{CallbackHandle, CallbackId};
use crate::service::{Service, ServiceActorSeed};
use crate::status::StatusInfo;
use crate::util::collections::EqMap;
use crate::util::future::NonDetachingJoinHandle;
use crate::util::sync::SyncMutex;
use crate::status::StatusInfo;
use crate::{GatewayId, HostId, PackageId, ServiceInterfaceId};
/// Abstraction for callbacks that are triggered by patchdb subscriptions.
@@ -66,9 +68,7 @@ impl<K: Ord + Clone + Send + Sync + 'static> DbWatchedCallbacks<K> {
.map(|(_, handlers)| CallbackHandlers(handlers))
.filter(|cb| !cb.0.is_empty())
}) {
let value = watch
.peek_and_mark_seen()
.unwrap_or_default();
let value = watch.peek_and_mark_seen().unwrap_or_default();
if let Err(e) = cbs.call(vector![value]).await {
tracing::error!("Error in {label} callback: {e}");
tracing::debug!("{e:?}");
@@ -99,6 +99,10 @@ pub struct ServiceCallbacks {
inner: SyncMutex<ServiceCallbackMap>,
get_host_info: Arc<DbWatchedCallbacks<(PackageId, HostId)>>,
get_status: Arc<DbWatchedCallbacks<PackageId>>,
get_service_interface: Arc<DbWatchedCallbacks<(PackageId, ServiceInterfaceId)>>,
list_service_interfaces: Arc<DbWatchedCallbacks<PackageId>>,
get_system_smtp: Arc<DbWatchedCallbacks<()>>,
get_service_manifest: Arc<DbWatchedCallbacks<PackageId>>,
}
impl Default for ServiceCallbacks {
@@ -107,21 +111,21 @@ impl Default for ServiceCallbacks {
inner: SyncMutex::new(ServiceCallbackMap::default()),
get_host_info: Arc::new(DbWatchedCallbacks::new("host info")),
get_status: Arc::new(DbWatchedCallbacks::new("get_status")),
get_service_interface: Arc::new(DbWatchedCallbacks::new("get_service_interface")),
list_service_interfaces: Arc::new(DbWatchedCallbacks::new("list_service_interfaces")),
get_system_smtp: Arc::new(DbWatchedCallbacks::new("get_system_smtp")),
get_service_manifest: Arc::new(DbWatchedCallbacks::new("get_service_manifest")),
}
}
}
#[derive(Default)]
struct ServiceCallbackMap {
get_service_interface: BTreeMap<(PackageId, ServiceInterfaceId), Vec<CallbackHandler>>,
list_service_interfaces: BTreeMap<PackageId, Vec<CallbackHandler>>,
get_system_smtp: Vec<CallbackHandler>,
get_ssl_certificate: EqMap<
(BTreeSet<InternedString>, FullchainCertData, Algorithm),
(NonDetachingJoinHandle<()>, Vec<CallbackHandler>),
>,
get_container_ip: BTreeMap<PackageId, Vec<CallbackHandler>>,
get_service_manifest: BTreeMap<PackageId, Vec<CallbackHandler>>,
get_outbound_gateway: BTreeMap<PackageId, (NonDetachingJoinHandle<()>, Vec<CallbackHandler>)>,
}
@@ -132,24 +136,10 @@ impl ServiceCallbacks {
pub fn gc(&self) {
self.mutate(|this| {
this.get_service_interface.retain(|_, v| {
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
!v.is_empty()
});
this.list_service_interfaces.retain(|_, v| {
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
!v.is_empty()
});
this.get_system_smtp
.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
this.get_ssl_certificate.retain(|_, (_, v)| {
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
!v.is_empty()
});
this.get_service_manifest.retain(|_, v| {
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
!v.is_empty()
});
this.get_outbound_gateway.retain(|_, (_, v)| {
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
!v.is_empty()
@@ -157,70 +147,38 @@ impl ServiceCallbacks {
});
self.get_host_info.gc();
self.get_status.gc();
self.get_service_interface.gc();
self.list_service_interfaces.gc();
self.get_system_smtp.gc();
self.get_service_manifest.gc();
}
pub(super) fn add_get_service_interface(
&self,
package_id: PackageId,
service_interface_id: ServiceInterfaceId,
watch: TypedDbWatch<ServiceInterface>,
handler: CallbackHandler,
) {
self.mutate(|this| {
this.get_service_interface
.entry((package_id, service_interface_id))
.or_default()
.push(handler);
})
self.get_service_interface
.add((package_id, service_interface_id), watch, handler);
}
#[must_use]
pub fn get_service_interface(
&self,
id: &(PackageId, ServiceInterfaceId),
) -> Option<CallbackHandlers> {
self.mutate(|this| {
Some(CallbackHandlers(
this.get_service_interface.remove(id).unwrap_or_default(),
))
.filter(|cb| !cb.0.is_empty())
})
}
pub(super) fn add_list_service_interfaces(
pub(super) fn add_list_service_interfaces<T: Send + 'static>(
&self,
package_id: PackageId,
watch: TypedDbWatch<T>,
handler: CallbackHandler,
) {
self.mutate(|this| {
this.list_service_interfaces
.entry(package_id)
.or_default()
.push(handler);
})
self.list_service_interfaces.add(package_id, watch, handler);
}
#[must_use]
pub fn list_service_interfaces(&self, id: &PackageId) -> Option<CallbackHandlers> {
self.mutate(|this| {
Some(CallbackHandlers(
this.list_service_interfaces.remove(id).unwrap_or_default(),
))
.filter(|cb| !cb.0.is_empty())
})
}
pub(super) fn add_get_system_smtp(&self, handler: CallbackHandler) {
self.mutate(|this| {
this.get_system_smtp.push(handler);
})
}
#[must_use]
pub fn get_system_smtp(&self) -> Option<CallbackHandlers> {
self.mutate(|this| {
Some(CallbackHandlers(std::mem::take(&mut this.get_system_smtp)))
.filter(|cb| !cb.0.is_empty())
})
pub(super) fn add_get_system_smtp<T: Send + 'static>(
&self,
watch: TypedDbWatch<T>,
handler: CallbackHandler,
) {
self.get_system_smtp.add((), watch, handler);
}
pub(super) fn add_get_host_info(
@@ -376,23 +334,13 @@ impl ServiceCallbacks {
})
}
pub(super) fn add_get_service_manifest(&self, package_id: PackageId, handler: CallbackHandler) {
self.mutate(|this| {
this.get_service_manifest
.entry(package_id)
.or_default()
.push(handler)
})
}
#[must_use]
pub fn get_service_manifest(&self, package_id: &PackageId) -> Option<CallbackHandlers> {
self.mutate(|this| {
this.get_service_manifest
.remove(package_id)
.map(CallbackHandlers)
.filter(|cb| !cb.0.is_empty())
})
pub(super) fn add_get_service_manifest(
&self,
package_id: PackageId,
watch: TypedDbWatch<PackageState>,
handler: CallbackHandler,
) {
self.get_service_manifest.add(package_id, watch, handler);
}
}

View File

@@ -399,27 +399,38 @@ pub async fn get_service_manifest(
callback,
}: GetServiceManifestParams,
) -> Result<Manifest, Error> {
use crate::db::model::package::PackageState;
let context = context.deref()?;
let ptr = format!("/public/packageData/{}/stateInfo", package_id)
.parse()
.expect("valid json pointer");
let mut watch = context
.seed
.ctx
.db
.watch(ptr)
.await
.typed::<PackageState>();
let manifest = watch
.peek_and_mark_seen()?
.as_manifest(ManifestPreference::Old)
.de()?;
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context
.seed
.ctx
.callbacks
.add_get_service_manifest(package_id.clone(), CallbackHandler::new(&context, callback));
.add_get_service_manifest(
package_id.clone(),
watch,
CallbackHandler::new(&context, callback),
);
}
let db = context.seed.ctx.db.peek().await;
let manifest = db
.as_public()
.as_package_data()
.as_idx(&package_id)
.or_not_found(&package_id)?
.as_state_info()
.as_manifest(ManifestPreference::New)
.de()?;
Ok(manifest)
}

View File

@@ -1,7 +1,5 @@
use std::collections::BTreeMap;
use imbl::vector;
use crate::net::service_interface::{AddressInfo, ServiceInterface, ServiceInterfaceType};
use crate::service::effects::callbacks::CallbackHandler;
use crate::service::effects::prelude::*;
@@ -42,7 +40,7 @@ pub async fn export_service_interface(
interface_type: r#type,
};
let res = context
context
.seed
.ctx
.db
@@ -56,27 +54,8 @@ pub async fn export_service_interface(
ifaces.insert(&id, &service_interface)?;
Ok(())
})
.await;
res.result?;
if res.revision.is_some() {
if let Some(callbacks) = context
.seed
.ctx
.callbacks
.get_service_interface(&(package_id.clone(), id))
{
callbacks.call(vector![]).await?;
}
if let Some(callbacks) = context
.seed
.ctx
.callbacks
.list_service_interfaces(&package_id)
{
callbacks.call(vector![]).await?;
}
}
.await
.result?;
Ok(())
}
@@ -101,26 +80,34 @@ pub async fn get_service_interface(
) -> Result<Option<ServiceInterface>, Error> {
let context = context.deref()?;
let package_id = package_id.unwrap_or_else(|| context.seed.id.clone());
let db = context.seed.ctx.db.peek().await;
let ptr = format!(
"/public/packageData/{}/serviceInterfaces/{}",
package_id, service_interface_id
)
.parse()
.expect("valid json pointer");
let mut watch = context
.seed
.ctx
.db
.watch(ptr)
.await
.typed::<ServiceInterface>();
let res = watch.peek_and_mark_seen()?.de().ok();
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context.seed.ctx.callbacks.add_get_service_interface(
package_id.clone(),
service_interface_id.clone(),
watch,
CallbackHandler::new(&context, callback),
);
}
let interface = db
.as_public()
.as_package_data()
.as_idx(&package_id)
.and_then(|m| m.as_service_interfaces().as_idx(&service_interface_id))
.map(|m| m.de())
.transpose()?;
Ok(interface)
Ok(res)
}
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
@@ -142,27 +129,23 @@ pub async fn list_service_interfaces(
let context = context.deref()?;
let package_id = package_id.unwrap_or_else(|| context.seed.id.clone());
let ptr = format!("/public/packageData/{}/serviceInterfaces", package_id)
.parse()
.expect("valid json pointer");
let mut watch = context.seed.ctx.db.watch(ptr).await;
let res = imbl_value::from_value(watch.peek_and_mark_seen()?)
.unwrap_or_default();
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context.seed.ctx.callbacks.add_list_service_interfaces(
package_id.clone(),
watch.typed::<BTreeMap<ServiceInterfaceId, ServiceInterface>>(),
CallbackHandler::new(&context, callback),
);
}
let res = context
.seed
.ctx
.db
.peek()
.await
.into_public()
.into_package_data()
.into_idx(&package_id)
.map(|m| m.into_service_interfaces().de())
.transpose()?
.unwrap_or_default();
Ok(res)
}
@@ -180,52 +163,22 @@ pub async fn clear_service_interfaces(
let context = context.deref()?;
let package_id = context.seed.id.clone();
let res = context
context
.seed
.ctx
.db
.mutate(|db| {
let mut removed = Vec::new();
db.as_public_mut()
.as_package_data_mut()
.as_idx_mut(&package_id)
.or_not_found(&package_id)?
.as_service_interfaces_mut()
.mutate(|s| {
Ok(s.retain(|id, _| {
if except.contains(id) {
true
} else {
removed.push(id.clone());
false
}
}))
})?;
Ok(removed)
Ok(s.retain(|id, _| except.contains(id)))
})
})
.await;
let removed = res.result?;
if res.revision.is_some() {
for id in removed {
if let Some(callbacks) = context
.seed
.ctx
.callbacks
.get_service_interface(&(package_id.clone(), id))
{
callbacks.call(vector![]).await?;
}
}
if let Some(callbacks) = context
.seed
.ctx
.callbacks
.list_service_interfaces(&package_id)
{
callbacks.call(vector![]).await?;
}
}
.await
.result?;
Ok(())
}

View File

@@ -16,25 +16,25 @@ pub async fn get_system_smtp(
) -> Result<Option<SmtpValue>, Error> {
let context = context.deref()?;
let ptr = "/public/serverInfo/smtp"
.parse()
.expect("valid json pointer");
let mut watch = context.seed.ctx.db.watch(ptr).await;
let res = imbl_value::from_value(watch.peek_and_mark_seen()?)
.with_kind(ErrorKind::Deserialization)?;
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context
.seed
.ctx
.callbacks
.add_get_system_smtp(CallbackHandler::new(&context, callback));
.add_get_system_smtp(
watch.typed::<Option<SmtpValue>>(),
CallbackHandler::new(&context, callback),
);
}
let res = context
.seed
.ctx
.db
.peek()
.await
.into_public()
.into_server_info()
.into_smtp()
.de()?;
Ok(res)
}

View File

@@ -640,17 +640,6 @@ impl Service {
tokio::task::yield_now().await;
}
// Trigger manifest callbacks after successful installation
let manifest = service.seed.persistent_container.s9pk.as_manifest();
if let Some(callbacks) = ctx.callbacks.get_service_manifest(&manifest.id) {
let manifest_value =
serde_json::to_value(manifest).with_kind(ErrorKind::Serialization)?;
callbacks
.call(imbl::vector![manifest_value.into()])
.await
.log_err();
}
Ok(service)
}

View File

@@ -1,8 +1,6 @@
use std::collections::BTreeSet;
use std::path::Path;
use imbl::vector;
use crate::context::RpcContext;
use crate::db::model::package::{InstalledState, InstallingInfo, InstallingState, PackageState};
use crate::net::host::all_hosts;
@@ -94,11 +92,6 @@ pub async fn cleanup(ctx: &RpcContext, id: &PackageId, soft: bool) -> Result<(),
));
}
};
// Trigger manifest callbacks with null to indicate uninstall
if let Some(callbacks) = ctx.callbacks.get_service_manifest(&manifest.id) {
callbacks.call(vector![Value::Null]).await.log_err();
}
if !soft {
let path = Path::new(DATA_DIR).join(PKG_VOLUME_DIR).join(&manifest.id);
crate::util::io::delete_dir(&path).await?;

View File

@@ -6,7 +6,6 @@ use chrono::Utc;
use clap::Parser;
use color_eyre::eyre::eyre;
use futures::FutureExt;
use imbl::vector;
use imbl_value::InternedString;
use rpc_toolkit::{Context, Empty, HandlerExt, ParentHandler, from_fn_async};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@@ -1148,9 +1147,6 @@ pub async fn set_system_smtp(ctx: RpcContext, smtp: SmtpValue) -> Result<(), Err
})
.await
.result?;
if let Some(callbacks) = ctx.callbacks.get_system_smtp() {
callbacks.call(vector![to_value(&smtp)?]).await?;
}
Ok(())
}
pub async fn clear_system_smtp(ctx: RpcContext) -> Result<(), Error> {
@@ -1163,9 +1159,6 @@ pub async fn clear_system_smtp(ctx: RpcContext) -> Result<(), Error> {
})
.await
.result?;
if let Some(callbacks) = ctx.callbacks.get_system_smtp() {
callbacks.call(vector![Value::Null]).await?;
}
Ok(())
}