Merge branch 'next/minor' of github.com:Start9Labs/start-os into next/major

This commit is contained in:
Aiden McClelland
2025-04-07 14:00:42 -06:00
77 changed files with 1474 additions and 716 deletions

View File

@@ -71,7 +71,8 @@ pub async fn export_action(
value.insert(id, metadata);
model.ser(&value)
})
.await?;
.await
.result?;
Ok(())
}
@@ -102,7 +103,8 @@ async fn clear_actions(
.as_actions_mut()
.mutate(|a| Ok(a.retain(|e, _| except.contains(e))))
})
.await?;
.await
.result?;
Ok(())
}
@@ -271,7 +273,8 @@ async fn request_action(
.as_requested_actions_mut()
.insert(&replay_id, &ActionRequestEntry { active, request })
})
.await?;
.await
.result?;
Ok(())
}
@@ -310,6 +313,7 @@ async fn clear_action_requests(
}))
})
})
.await?;
.await
.result?;
Ok(())
}

View File

@@ -8,8 +8,10 @@ use futures::future::join_all;
use helpers::NonDetachingJoinHandle;
use imbl::{vector, Vector};
use imbl_value::InternedString;
use lazy_static::lazy_static;
use models::{HostId, PackageId, ServiceInterfaceId};
use patch_db::json_ptr::JsonPointer;
use patch_db::Revision;
use serde::{Deserialize, Serialize};
use tracing::warn;
use ts_rs::TS;
@@ -37,6 +39,7 @@ struct ServiceCallbackMap {
>,
get_store: BTreeMap<PackageId, BTreeMap<JsonPointer, Vec<CallbackHandler>>>,
get_status: BTreeMap<PackageId, Vec<CallbackHandler>>,
get_container_ip: BTreeMap<PackageId, Vec<CallbackHandler>>,
}
impl ServiceCallbacks {
@@ -260,13 +263,19 @@ impl ServiceCallbacks {
pub fn get_store(
&self,
package_id: &PackageId,
path: &JsonPointer,
revision: &Revision,
) -> Option<CallbackHandlers> {
lazy_static! {
static ref BASE: JsonPointer = "/private/packageStores".parse().unwrap();
}
let for_pkg = BASE.clone().join_end(&**package_id);
self.mutate(|this| {
if let Some(watched) = this.get_store.get_mut(package_id) {
let mut res = Vec::new();
watched.retain(|ptr, cbs| {
if ptr.starts_with(path) || path.starts_with(ptr) {
let mut full_ptr = for_pkg.clone();
full_ptr.append(ptr);
if revision.patch.affects_path(&full_ptr) {
res.append(cbs);
false
} else {
@@ -280,6 +289,25 @@ impl ServiceCallbacks {
.filter(|cb| !cb.0.is_empty())
})
}
pub(super) fn add_get_container_ip(&self, package_id: PackageId, handler: CallbackHandler) {
self.mutate(|this| {
this.get_container_ip
.entry(package_id)
.or_default()
.push(handler)
})
}
#[must_use]
pub fn get_container_ip(&self, package_id: &PackageId) -> Option<CallbackHandlers> {
self.mutate(|this| {
this.get_container_ip
.remove(package_id)
.map(CallbackHandlers)
.filter(|cb| !cb.0.is_empty())
})
}
}
pub struct CallbackHandler {

View File

@@ -46,6 +46,15 @@ pub async fn get_status(
let context = context.deref()?;
let id = package_id.unwrap_or_else(|| context.seed.id.clone());
let db = context.seed.ctx.db.peek().await;
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context.seed.ctx.callbacks.add_get_status(
id.clone(),
super::callbacks::CallbackHandler::new(&context, callback),
);
}
let status = db
.as_public()
.as_package_data()
@@ -54,14 +63,6 @@ pub async fn get_status(
.as_status()
.de()?;
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context.seed.ctx.callbacks.add_get_status(
id,
super::callbacks::CallbackHandler::new(&context, callback),
);
}
Ok(status)
}

View File

@@ -249,6 +249,7 @@ pub async fn set_dependencies(
.ser(&CurrentDependencies(deps))
})
.await
.result
}
pub async fn get_dependencies(context: EffectContext) -> Result<Vec<DependencyRequirement>, Error> {

View File

@@ -40,6 +40,7 @@ pub async fn set_health(
Ok(())
})
})
.await?;
.await
.result?;
Ok(())
}

View File

@@ -192,6 +192,4 @@ pub fn handler<C: Context>() -> ParentHandler<C> {
"get-system-smtp",
from_fn_async(system::get_system_smtp).no_cli(),
)
// TODO Callbacks
}

View File

@@ -27,6 +27,15 @@ pub async fn get_host_info(
let db = context.seed.ctx.db.peek().await;
let package_id = package_id.unwrap_or_else(|| context.seed.id.clone());
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context.seed.ctx.callbacks.add_get_host_info(
package_id.clone(),
host_id.clone(),
CallbackHandler::new(&context, callback),
);
}
let res = db
.as_public()
.as_package_data()
@@ -35,14 +44,5 @@ pub async fn get_host_info(
.map(|m| m.de())
.transpose()?;
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context.seed.ctx.callbacks.add_get_host_info(
package_id,
host_id,
CallbackHandler::new(&context, callback),
);
}
Ok(res)
}

View File

@@ -1,9 +1,55 @@
use std::net::Ipv4Addr;
use models::PackageId;
use crate::service::effects::callbacks::CallbackHandler;
use crate::service::effects::prelude::*;
use crate::service::rpc::CallbackId;
use crate::HOST_IP;
pub async fn get_container_ip(context: EffectContext) -> Result<Ipv4Addr, Error> {
let context = context.deref()?;
Ok(context.seed.persistent_container.net_service.get_ip().await)
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct GetContainerIpParams {
#[ts(optional)]
package_id: Option<PackageId>,
#[ts(optional)]
callback: Option<CallbackId>,
}
pub async fn get_container_ip(
context: EffectContext,
GetContainerIpParams {
package_id,
callback,
}: GetContainerIpParams,
) -> Result<Option<Ipv4Addr>, Error> {
let context = context.deref()?;
if let Some(package_id) = package_id.filter(|id| id != &context.seed.id) {
if let Some(callback) = callback {
// ip is static for the lifetime of the container, so callback unnecessary for self
let callback = callback.register(&context.seed.persistent_container);
context
.seed
.ctx
.callbacks
.add_get_container_ip(package_id.clone(), CallbackHandler::new(&context, callback));
}
let Some(svc) = &*context.seed.ctx.services.get(&package_id).await else {
return Ok(None);
};
let Some(lxc) = svc.seed.persistent_container.lxc_container.get() else {
return Ok(None);
};
let res = lxc.ip().await?;
Ok(Some(res))
} else {
let Some(lxc) = context.seed.persistent_container.lxc_container.get() else {
return Ok(None);
};
lxc.ip().await.map(Some)
}
}

View File

@@ -42,35 +42,40 @@ pub async fn export_service_interface(
interface_type: r#type,
};
context
let res = context
.seed
.ctx
.db
.mutate(|db| {
db.as_public_mut()
let ifaces = db
.as_public_mut()
.as_package_data_mut()
.as_idx_mut(&package_id)
.or_not_found(&package_id)?
.as_service_interfaces_mut()
.insert(&id, &service_interface)?;
.as_service_interfaces_mut();
ifaces.insert(&id, &service_interface)?;
Ok(())
})
.await?;
if let Some(callbacks) = context
.seed
.ctx
.callbacks
.get_service_interface(&(package_id.clone(), id))
{
callbacks.call(vector![]).await?;
}
if let Some(callbacks) = context
.seed
.ctx
.callbacks
.list_service_interfaces(&package_id)
{
callbacks.call(vector![]).await?;
.await;
res.result?;
if res.revision.is_some() {
if let Some(callbacks) = context
.seed
.ctx
.callbacks
.get_service_interface(&(package_id.clone(), id))
{
callbacks.call(vector![]).await?;
}
if let Some(callbacks) = context
.seed
.ctx
.callbacks
.list_service_interfaces(&package_id)
{
callbacks.call(vector![]).await?;
}
}
Ok(())
@@ -98,6 +103,15 @@ pub async fn get_service_interface(
let package_id = package_id.unwrap_or_else(|| context.seed.id.clone());
let db = context.seed.ctx.db.peek().await;
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context.seed.ctx.callbacks.add_get_service_interface(
package_id.clone(),
service_interface_id.clone(),
CallbackHandler::new(&context, callback),
);
}
let interface = db
.as_public()
.as_package_data()
@@ -106,15 +120,6 @@ pub async fn get_service_interface(
.map(|m| m.de())
.transpose()?;
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context.seed.ctx.callbacks.add_get_service_interface(
package_id,
service_interface_id,
CallbackHandler::new(&context, callback),
);
}
Ok(interface)
}
@@ -137,6 +142,14 @@ pub async fn list_service_interfaces(
let context = context.deref()?;
let package_id = package_id.unwrap_or_else(|| context.seed.id.clone());
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context.seed.ctx.callbacks.add_list_service_interfaces(
package_id.clone(),
CallbackHandler::new(&context, callback),
);
}
let res = context
.seed
.ctx
@@ -150,15 +163,6 @@ pub async fn list_service_interfaces(
.transpose()?
.unwrap_or_default();
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context
.seed
.ctx
.callbacks
.add_list_service_interfaces(package_id, CallbackHandler::new(&context, callback));
}
Ok(res)
}
@@ -176,17 +180,52 @@ pub async fn clear_service_interfaces(
let context = context.deref()?;
let package_id = context.seed.id.clone();
context
let res = context
.seed
.ctx
.db
.mutate(|db| {
let mut removed = Vec::new();
db.as_public_mut()
.as_package_data_mut()
.as_idx_mut(&package_id)
.or_not_found(&package_id)?
.as_service_interfaces_mut()
.mutate(|s| Ok(s.retain(|id, _| except.contains(id))))
.mutate(|s| {
Ok(s.retain(|id, _| {
if except.contains(id) {
true
} else {
removed.push(id.clone());
false
}
}))
})?;
Ok(removed)
})
.await
.await;
let removed = res.result?;
if res.revision.is_some() {
for id in removed {
if let Some(callbacks) = context
.seed
.ctx
.callbacks
.get_service_interface(&(package_id.clone(), id))
{
callbacks.call(vector![]).await?;
}
}
if let Some(callbacks) = context
.seed
.ctx
.callbacks
.list_service_interfaces(&package_id)
{
callbacks.call(vector![]).await?;
}
}
Ok(())
}

View File

@@ -81,7 +81,8 @@ pub async fn get_ssl_certificate(
.as_local_certs_mut()
.cert_for(&hostnames)
})
.await?;
.await
.result?;
let fullchain = match algorithm {
Algorithm::Ecdsa => cert.fullchain_nistp256(),
Algorithm::Ed25519 => cert.fullchain_ed25519(),
@@ -171,7 +172,8 @@ pub async fn get_ssl_key(
.as_local_certs_mut()
.cert_for(&hostnames)
})
.await?;
.await
.result?;
let key = match algorithm {
Algorithm::Ecdsa => cert.leaf.keys.nistp256,
Algorithm::Ed25519 => cert.leaf.keys.ed25519,

View File

@@ -65,7 +65,7 @@ pub async fn set_store(
) -> Result<(), Error> {
let context = context.deref()?;
let package_id = &context.seed.id;
context
let res = context
.seed
.ctx
.db
@@ -82,10 +82,13 @@ pub async fn set_store(
.with_kind(ErrorKind::ParseDbField)?;
model.ser(&model_value)
})
.await?;
.await;
res.result?;
if let Some(callbacks) = context.seed.ctx.callbacks.get_store(package_id, &path) {
callbacks.call(vector![]).await?;
if let Some(revision) = res.revision {
if let Some(callbacks) = context.seed.ctx.callbacks.get_store(package_id, &revision) {
callbacks.call(vector![]).await?;
}
}
Ok(())
@@ -116,7 +119,8 @@ pub async fn set_data_version(
.as_data_version_mut()
.ser(&Some(version))
})
.await?;
.await
.result?;
Ok(())
}

View File

@@ -15,6 +15,16 @@ pub async fn get_system_smtp(
GetSystemSmtpParams { callback }: GetSystemSmtpParams,
) -> Result<Option<SmtpValue>, Error> {
let context = context.deref()?;
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context
.seed
.ctx
.callbacks
.add_get_system_smtp(CallbackHandler::new(&context, callback));
}
let res = context
.seed
.ctx
@@ -26,14 +36,5 @@ pub async fn get_system_smtp(
.into_smtp()
.de()?;
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context
.seed
.ctx
.callbacks
.add_get_system_smtp(CallbackHandler::new(&context, callback));
}
Ok(res)
}