From dd9837b9b279dc881de29357b60a51138985ee20 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Mon, 16 Mar 2026 20:09:10 -0600 Subject: [PATCH] refactor: convert service callbacks to DbWatch pattern Convert getServiceInterface, listServiceInterfaces, getSystemSmtp, and getServiceManifest from manual callback triggers to DbWatchedCallbacks. getServiceManifest now always returns the installed manifest. --- core/src/net/service_interface.rs | 5 +- core/src/service/effects/callbacks.rs | 122 +++++++--------------- core/src/service/effects/dependency.rs | 35 ++++--- core/src/service/effects/net/interface.rs | 117 +++++++-------------- core/src/service/effects/system.rs | 24 ++--- core/src/service/mod.rs | 11 -- core/src/service/uninstall.rs | 7 -- core/src/system/mod.rs | 7 -- 8 files changed, 108 insertions(+), 220 deletions(-) diff --git a/core/src/net/service_interface.rs b/core/src/net/service_interface.rs index 7c4b294aa..6fc2aa52e 100644 --- a/core/src/net/service_interface.rs +++ b/core/src/net/service_interface.rs @@ -145,9 +145,10 @@ pub struct GatewayInfo { pub public: bool, } -#[derive(Clone, Debug, Deserialize, Serialize, TS)] -#[ts(export)] +#[derive(Clone, Debug, Deserialize, Serialize, HasModel, TS)] #[serde(rename_all = "camelCase")] +#[model = "Model"] +#[ts(export)] pub struct ServiceInterface { pub id: ServiceInterfaceId, pub name: String, diff --git a/core/src/service/effects/callbacks.rs b/core/src/service/effects/callbacks.rs index b50c526f8..a3acc5b18 100644 --- a/core/src/service/effects/callbacks.rs +++ b/core/src/service/effects/callbacks.rs @@ -12,18 +12,20 @@ use serde::{Deserialize, Serialize}; use tracing::warn; use ts_rs::TS; +use crate::db::model::package::PackageState; use crate::db::model::public::NetworkInterfaceInfo; use crate::net::host::Host; +use crate::net::service_interface::ServiceInterface; use crate::net::ssl::FullchainCertData; use crate::prelude::*; use crate::service::effects::context::EffectContext; use crate::service::effects::net::ssl::Algorithm; use crate::service::rpc::{CallbackHandle, CallbackId}; use crate::service::{Service, ServiceActorSeed}; +use crate::status::StatusInfo; use crate::util::collections::EqMap; use crate::util::future::NonDetachingJoinHandle; use crate::util::sync::SyncMutex; -use crate::status::StatusInfo; use crate::{GatewayId, HostId, PackageId, ServiceInterfaceId}; /// Abstraction for callbacks that are triggered by patchdb subscriptions. @@ -66,9 +68,7 @@ impl DbWatchedCallbacks { .map(|(_, handlers)| CallbackHandlers(handlers)) .filter(|cb| !cb.0.is_empty()) }) { - let value = watch - .peek_and_mark_seen() - .unwrap_or_default(); + let value = watch.peek_and_mark_seen().unwrap_or_default(); if let Err(e) = cbs.call(vector![value]).await { tracing::error!("Error in {label} callback: {e}"); tracing::debug!("{e:?}"); @@ -99,6 +99,10 @@ pub struct ServiceCallbacks { inner: SyncMutex, get_host_info: Arc>, get_status: Arc>, + get_service_interface: Arc>, + list_service_interfaces: Arc>, + get_system_smtp: Arc>, + get_service_manifest: Arc>, } impl Default for ServiceCallbacks { @@ -107,21 +111,21 @@ impl Default for ServiceCallbacks { inner: SyncMutex::new(ServiceCallbackMap::default()), get_host_info: Arc::new(DbWatchedCallbacks::new("host info")), get_status: Arc::new(DbWatchedCallbacks::new("get_status")), + get_service_interface: Arc::new(DbWatchedCallbacks::new("get_service_interface")), + list_service_interfaces: Arc::new(DbWatchedCallbacks::new("list_service_interfaces")), + get_system_smtp: Arc::new(DbWatchedCallbacks::new("get_system_smtp")), + get_service_manifest: Arc::new(DbWatchedCallbacks::new("get_service_manifest")), } } } #[derive(Default)] struct ServiceCallbackMap { - get_service_interface: BTreeMap<(PackageId, ServiceInterfaceId), Vec>, - list_service_interfaces: BTreeMap>, - get_system_smtp: Vec, get_ssl_certificate: EqMap< (BTreeSet, FullchainCertData, Algorithm), (NonDetachingJoinHandle<()>, Vec), >, get_container_ip: BTreeMap>, - get_service_manifest: BTreeMap>, get_outbound_gateway: BTreeMap, Vec)>, } @@ -132,24 +136,10 @@ impl ServiceCallbacks { pub fn gc(&self) { self.mutate(|this| { - this.get_service_interface.retain(|_, v| { - v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0); - !v.is_empty() - }); - this.list_service_interfaces.retain(|_, v| { - v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0); - !v.is_empty() - }); - this.get_system_smtp - .retain(|h| h.handle.is_active() && h.seed.strong_count() > 0); this.get_ssl_certificate.retain(|_, (_, v)| { v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0); !v.is_empty() }); - this.get_service_manifest.retain(|_, v| { - v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0); - !v.is_empty() - }); this.get_outbound_gateway.retain(|_, (_, v)| { v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0); !v.is_empty() @@ -157,70 +147,38 @@ impl ServiceCallbacks { }); self.get_host_info.gc(); self.get_status.gc(); + self.get_service_interface.gc(); + self.list_service_interfaces.gc(); + self.get_system_smtp.gc(); + self.get_service_manifest.gc(); } pub(super) fn add_get_service_interface( &self, package_id: PackageId, service_interface_id: ServiceInterfaceId, + watch: TypedDbWatch, handler: CallbackHandler, ) { - self.mutate(|this| { - this.get_service_interface - .entry((package_id, service_interface_id)) - .or_default() - .push(handler); - }) + self.get_service_interface + .add((package_id, service_interface_id), watch, handler); } - #[must_use] - pub fn get_service_interface( - &self, - id: &(PackageId, ServiceInterfaceId), - ) -> Option { - self.mutate(|this| { - Some(CallbackHandlers( - this.get_service_interface.remove(id).unwrap_or_default(), - )) - .filter(|cb| !cb.0.is_empty()) - }) - } - - pub(super) fn add_list_service_interfaces( + pub(super) fn add_list_service_interfaces( &self, package_id: PackageId, + watch: TypedDbWatch, handler: CallbackHandler, ) { - self.mutate(|this| { - this.list_service_interfaces - .entry(package_id) - .or_default() - .push(handler); - }) + self.list_service_interfaces.add(package_id, watch, handler); } - #[must_use] - pub fn list_service_interfaces(&self, id: &PackageId) -> Option { - self.mutate(|this| { - Some(CallbackHandlers( - this.list_service_interfaces.remove(id).unwrap_or_default(), - )) - .filter(|cb| !cb.0.is_empty()) - }) - } - - pub(super) fn add_get_system_smtp(&self, handler: CallbackHandler) { - self.mutate(|this| { - this.get_system_smtp.push(handler); - }) - } - - #[must_use] - pub fn get_system_smtp(&self) -> Option { - self.mutate(|this| { - Some(CallbackHandlers(std::mem::take(&mut this.get_system_smtp))) - .filter(|cb| !cb.0.is_empty()) - }) + pub(super) fn add_get_system_smtp( + &self, + watch: TypedDbWatch, + handler: CallbackHandler, + ) { + self.get_system_smtp.add((), watch, handler); } pub(super) fn add_get_host_info( @@ -376,23 +334,13 @@ impl ServiceCallbacks { }) } - pub(super) fn add_get_service_manifest(&self, package_id: PackageId, handler: CallbackHandler) { - self.mutate(|this| { - this.get_service_manifest - .entry(package_id) - .or_default() - .push(handler) - }) - } - - #[must_use] - pub fn get_service_manifest(&self, package_id: &PackageId) -> Option { - self.mutate(|this| { - this.get_service_manifest - .remove(package_id) - .map(CallbackHandlers) - .filter(|cb| !cb.0.is_empty()) - }) + pub(super) fn add_get_service_manifest( + &self, + package_id: PackageId, + watch: TypedDbWatch, + handler: CallbackHandler, + ) { + self.get_service_manifest.add(package_id, watch, handler); } } diff --git a/core/src/service/effects/dependency.rs b/core/src/service/effects/dependency.rs index 7cf233452..cf50c8674 100644 --- a/core/src/service/effects/dependency.rs +++ b/core/src/service/effects/dependency.rs @@ -399,27 +399,38 @@ pub async fn get_service_manifest( callback, }: GetServiceManifestParams, ) -> Result { + use crate::db::model::package::PackageState; + let context = context.deref()?; + let ptr = format!("/public/packageData/{}/stateInfo", package_id) + .parse() + .expect("valid json pointer"); + let mut watch = context + .seed + .ctx + .db + .watch(ptr) + .await + .typed::(); + + let manifest = watch + .peek_and_mark_seen()? + .as_manifest(ManifestPreference::Old) + .de()?; + if let Some(callback) = callback { let callback = callback.register(&context.seed.persistent_container); context .seed .ctx .callbacks - .add_get_service_manifest(package_id.clone(), CallbackHandler::new(&context, callback)); + .add_get_service_manifest( + package_id.clone(), + watch, + CallbackHandler::new(&context, callback), + ); } - let db = context.seed.ctx.db.peek().await; - - let manifest = db - .as_public() - .as_package_data() - .as_idx(&package_id) - .or_not_found(&package_id)? - .as_state_info() - .as_manifest(ManifestPreference::New) - .de()?; - Ok(manifest) } diff --git a/core/src/service/effects/net/interface.rs b/core/src/service/effects/net/interface.rs index ff0452976..0716efffc 100644 --- a/core/src/service/effects/net/interface.rs +++ b/core/src/service/effects/net/interface.rs @@ -1,7 +1,5 @@ use std::collections::BTreeMap; -use imbl::vector; - use crate::net::service_interface::{AddressInfo, ServiceInterface, ServiceInterfaceType}; use crate::service::effects::callbacks::CallbackHandler; use crate::service::effects::prelude::*; @@ -42,7 +40,7 @@ pub async fn export_service_interface( interface_type: r#type, }; - let res = context + context .seed .ctx .db @@ -56,27 +54,8 @@ pub async fn export_service_interface( ifaces.insert(&id, &service_interface)?; Ok(()) }) - .await; - res.result?; - - if res.revision.is_some() { - if let Some(callbacks) = context - .seed - .ctx - .callbacks - .get_service_interface(&(package_id.clone(), id)) - { - callbacks.call(vector![]).await?; - } - if let Some(callbacks) = context - .seed - .ctx - .callbacks - .list_service_interfaces(&package_id) - { - callbacks.call(vector![]).await?; - } - } + .await + .result?; Ok(()) } @@ -101,26 +80,34 @@ pub async fn get_service_interface( ) -> Result, Error> { let context = context.deref()?; let package_id = package_id.unwrap_or_else(|| context.seed.id.clone()); - let db = context.seed.ctx.db.peek().await; + + let ptr = format!( + "/public/packageData/{}/serviceInterfaces/{}", + package_id, service_interface_id + ) + .parse() + .expect("valid json pointer"); + let mut watch = context + .seed + .ctx + .db + .watch(ptr) + .await + .typed::(); + + let res = watch.peek_and_mark_seen()?.de().ok(); if let Some(callback) = callback { let callback = callback.register(&context.seed.persistent_container); context.seed.ctx.callbacks.add_get_service_interface( package_id.clone(), service_interface_id.clone(), + watch, CallbackHandler::new(&context, callback), ); } - let interface = db - .as_public() - .as_package_data() - .as_idx(&package_id) - .and_then(|m| m.as_service_interfaces().as_idx(&service_interface_id)) - .map(|m| m.de()) - .transpose()?; - - Ok(interface) + Ok(res) } #[derive(Debug, Clone, Serialize, Deserialize, TS)] @@ -142,27 +129,23 @@ pub async fn list_service_interfaces( let context = context.deref()?; let package_id = package_id.unwrap_or_else(|| context.seed.id.clone()); + let ptr = format!("/public/packageData/{}/serviceInterfaces", package_id) + .parse() + .expect("valid json pointer"); + let mut watch = context.seed.ctx.db.watch(ptr).await; + + let res = imbl_value::from_value(watch.peek_and_mark_seen()?) + .unwrap_or_default(); + if let Some(callback) = callback { let callback = callback.register(&context.seed.persistent_container); context.seed.ctx.callbacks.add_list_service_interfaces( package_id.clone(), + watch.typed::>(), CallbackHandler::new(&context, callback), ); } - let res = context - .seed - .ctx - .db - .peek() - .await - .into_public() - .into_package_data() - .into_idx(&package_id) - .map(|m| m.into_service_interfaces().de()) - .transpose()? - .unwrap_or_default(); - Ok(res) } @@ -180,52 +163,22 @@ pub async fn clear_service_interfaces( let context = context.deref()?; let package_id = context.seed.id.clone(); - let res = context + context .seed .ctx .db .mutate(|db| { - let mut removed = Vec::new(); db.as_public_mut() .as_package_data_mut() .as_idx_mut(&package_id) .or_not_found(&package_id)? .as_service_interfaces_mut() .mutate(|s| { - Ok(s.retain(|id, _| { - if except.contains(id) { - true - } else { - removed.push(id.clone()); - false - } - })) - })?; - Ok(removed) + Ok(s.retain(|id, _| except.contains(id))) + }) }) - .await; - let removed = res.result?; - - if res.revision.is_some() { - for id in removed { - if let Some(callbacks) = context - .seed - .ctx - .callbacks - .get_service_interface(&(package_id.clone(), id)) - { - callbacks.call(vector![]).await?; - } - } - if let Some(callbacks) = context - .seed - .ctx - .callbacks - .list_service_interfaces(&package_id) - { - callbacks.call(vector![]).await?; - } - } + .await + .result?; Ok(()) } diff --git a/core/src/service/effects/system.rs b/core/src/service/effects/system.rs index abf6f36ad..e6a9afcba 100644 --- a/core/src/service/effects/system.rs +++ b/core/src/service/effects/system.rs @@ -16,25 +16,25 @@ pub async fn get_system_smtp( ) -> Result, Error> { let context = context.deref()?; + let ptr = "/public/serverInfo/smtp" + .parse() + .expect("valid json pointer"); + let mut watch = context.seed.ctx.db.watch(ptr).await; + + let res = imbl_value::from_value(watch.peek_and_mark_seen()?) + .with_kind(ErrorKind::Deserialization)?; + if let Some(callback) = callback { let callback = callback.register(&context.seed.persistent_container); context .seed .ctx .callbacks - .add_get_system_smtp(CallbackHandler::new(&context, callback)); + .add_get_system_smtp( + watch.typed::>(), + CallbackHandler::new(&context, callback), + ); } - let res = context - .seed - .ctx - .db - .peek() - .await - .into_public() - .into_server_info() - .into_smtp() - .de()?; - Ok(res) } diff --git a/core/src/service/mod.rs b/core/src/service/mod.rs index d904e77c9..243c73b00 100644 --- a/core/src/service/mod.rs +++ b/core/src/service/mod.rs @@ -640,17 +640,6 @@ impl Service { tokio::task::yield_now().await; } - // Trigger manifest callbacks after successful installation - let manifest = service.seed.persistent_container.s9pk.as_manifest(); - if let Some(callbacks) = ctx.callbacks.get_service_manifest(&manifest.id) { - let manifest_value = - serde_json::to_value(manifest).with_kind(ErrorKind::Serialization)?; - callbacks - .call(imbl::vector![manifest_value.into()]) - .await - .log_err(); - } - Ok(service) } diff --git a/core/src/service/uninstall.rs b/core/src/service/uninstall.rs index ec8b92468..8f4bd8ad1 100644 --- a/core/src/service/uninstall.rs +++ b/core/src/service/uninstall.rs @@ -1,8 +1,6 @@ use std::collections::BTreeSet; use std::path::Path; -use imbl::vector; - use crate::context::RpcContext; use crate::db::model::package::{InstalledState, InstallingInfo, InstallingState, PackageState}; use crate::net::host::all_hosts; @@ -94,11 +92,6 @@ pub async fn cleanup(ctx: &RpcContext, id: &PackageId, soft: bool) -> Result<(), )); } }; - // Trigger manifest callbacks with null to indicate uninstall - if let Some(callbacks) = ctx.callbacks.get_service_manifest(&manifest.id) { - callbacks.call(vector![Value::Null]).await.log_err(); - } - if !soft { let path = Path::new(DATA_DIR).join(PKG_VOLUME_DIR).join(&manifest.id); crate::util::io::delete_dir(&path).await?; diff --git a/core/src/system/mod.rs b/core/src/system/mod.rs index 893a45df9..4bb404e8b 100644 --- a/core/src/system/mod.rs +++ b/core/src/system/mod.rs @@ -6,7 +6,6 @@ use chrono::Utc; use clap::Parser; use color_eyre::eyre::eyre; use futures::FutureExt; -use imbl::vector; use imbl_value::InternedString; use rpc_toolkit::{Context, Empty, HandlerExt, ParentHandler, from_fn_async}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -1148,9 +1147,6 @@ pub async fn set_system_smtp(ctx: RpcContext, smtp: SmtpValue) -> Result<(), Err }) .await .result?; - if let Some(callbacks) = ctx.callbacks.get_system_smtp() { - callbacks.call(vector![to_value(&smtp)?]).await?; - } Ok(()) } pub async fn clear_system_smtp(ctx: RpcContext) -> Result<(), Error> { @@ -1163,9 +1159,6 @@ pub async fn clear_system_smtp(ctx: RpcContext) -> Result<(), Error> { }) .await .result?; - if let Some(callbacks) = ctx.callbacks.get_system_smtp() { - callbacks.call(vector![Value::Null]).await?; - } Ok(()) }