misc sdk changes (#2934)

* misc sdk changes

* delete the store ☠️

* port comments

* fix build

* fix removing

* fix tests

* beta.20

---------

Co-authored-by: Matt Hill <mattnine@protonmail.com>
This commit is contained in:
Aiden McClelland
2025-05-09 15:10:51 -06:00
committed by GitHub
parent d2c4741f0b
commit 7750e33f82
62 changed files with 1255 additions and 2130 deletions

View File

@@ -8,10 +8,7 @@ use futures::future::join_all;
use helpers::NonDetachingJoinHandle;
use imbl::{vector, Vector};
use imbl_value::InternedString;
use lazy_static::lazy_static;
use models::{HostId, PackageId, ServiceInterfaceId};
use patch_db::json_ptr::JsonPointer;
use patch_db::Revision;
use serde::{Deserialize, Serialize};
use tracing::warn;
use ts_rs::TS;
@@ -37,7 +34,6 @@ struct ServiceCallbackMap {
(BTreeSet<InternedString>, FullchainCertData, Algorithm),
(NonDetachingJoinHandle<()>, Vec<CallbackHandler>),
>,
get_store: BTreeMap<PackageId, BTreeMap<JsonPointer, Vec<CallbackHandler>>>,
get_status: BTreeMap<PackageId, Vec<CallbackHandler>>,
get_container_ip: BTreeMap<PackageId, Vec<CallbackHandler>>,
}
@@ -68,13 +64,6 @@ impl ServiceCallbacks {
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
!v.is_empty()
});
this.get_store.retain(|_, v| {
v.retain(|_, v| {
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
!v.is_empty()
});
!v.is_empty()
});
this.get_status.retain(|_, v| {
v.retain(|h| h.handle.is_active() && h.seed.strong_count() > 0);
!v.is_empty()
@@ -243,53 +232,6 @@ impl ServiceCallbacks {
})
}
pub(super) fn add_get_store(
&self,
package_id: PackageId,
path: JsonPointer,
handler: CallbackHandler,
) {
self.mutate(|this| {
this.get_store
.entry(package_id)
.or_default()
.entry(path)
.or_default()
.push(handler)
})
}
#[must_use]
pub fn get_store(
&self,
package_id: &PackageId,
revision: &Revision,
) -> Option<CallbackHandlers> {
lazy_static! {
static ref BASE: JsonPointer = "/private/packageStores".parse().unwrap();
}
let for_pkg = BASE.clone().join_end(&**package_id);
self.mutate(|this| {
if let Some(watched) = this.get_store.get_mut(package_id) {
let mut res = Vec::new();
watched.retain(|ptr, cbs| {
let mut full_ptr = for_pkg.clone();
full_ptr.append(ptr);
if revision.patch.affects_path(&full_ptr) {
res.append(cbs);
false
} else {
true
}
});
Some(CallbackHandlers(res))
} else {
None
}
.filter(|cb| !cb.0.is_empty())
})
}
pub(super) fn add_get_container_ip(&self, package_id: PackageId, handler: CallbackHandler) {
self.mutate(|this| {
this.get_container_ip

View File

@@ -7,7 +7,6 @@ use exver::VersionRange;
use imbl::OrdMap;
use imbl_value::InternedString;
use models::{FromStrParser, HealthCheckId, PackageId, ReplayId, VersionString, VolumeId};
use patch_db::json_ptr::JsonPointer;
use tokio::process::Command;
use crate::db::model::package::{
@@ -17,6 +16,7 @@ use crate::db::model::package::{
use crate::disk::mount::filesystem::bind::Bind;
use crate::disk::mount::filesystem::idmapped::IdMapped;
use crate::disk::mount::filesystem::{FileSystem, MountType};
use crate::disk::mount::util::{is_mountpoint, unmount};
use crate::service::effects::prelude::*;
use crate::status::health_check::NamedHealthCheckResult;
use crate::util::Invoke;
@@ -110,6 +110,9 @@ pub async fn mount(
}
tokio::fs::create_dir_all(&mountpoint).await?;
if is_mountpoint(&mountpoint).await? {
unmount(&mountpoint, true).await?;
}
Command::new("chown")
.arg("100000:100000")
.arg(&mountpoint)
@@ -142,21 +145,6 @@ pub async fn get_installed_packages(context: EffectContext) -> Result<BTreeSet<P
.keys()
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct ExposeForDependentsParams {
#[ts(type = "string[]")]
paths: Vec<JsonPointer>,
}
pub async fn expose_for_dependents(
context: EffectContext,
ExposeForDependentsParams { paths }: ExposeForDependentsParams,
) -> Result<(), Error> {
// TODO
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]

View File

@@ -15,9 +15,9 @@ mod dependency;
mod health;
mod net;
mod prelude;
mod store;
mod subcontainer;
mod system;
mod version;
pub fn handler<C: Context>() -> ParentHandler<C> {
ParentHandler::new()
@@ -88,10 +88,6 @@ pub fn handler<C: Context>() -> ParentHandler<C> {
"get-installed-packages",
from_fn_async(dependency::get_installed_packages).no_cli(),
)
.subcommand(
"expose-for-dependents",
from_fn_async(dependency::expose_for_dependents).no_cli(),
)
// health
.subcommand("set-health", from_fn_async(health::set_health).no_cli())
// subcontainer
@@ -167,22 +163,15 @@ pub fn handler<C: Context>() -> ParentHandler<C> {
from_fn_async(net::ssl::get_ssl_certificate).no_cli(),
)
.subcommand("get-ssl-key", from_fn_async(net::ssl::get_ssl_key).no_cli())
// store
.subcommand(
"store",
ParentHandler::<C>::new()
.subcommand("get", from_fn_async(store::get_store).no_cli())
.subcommand("set", from_fn_async(store::set_store).no_cli()),
)
.subcommand(
"set-data-version",
from_fn_async(store::set_data_version)
from_fn_async(version::set_data_version)
.no_display()
.with_call_remote::<ContainerCliContext>(),
)
.subcommand(
"get-data-version",
from_fn_async(store::get_data_version)
from_fn_async(version::get_data_version)
.with_custom_display_fn(|_, v| {
if let Some(v) = v {
println!("{v}")

View File

@@ -1,143 +0,0 @@
use imbl::vector;
use imbl_value::json;
use models::{PackageId, VersionString};
use patch_db::json_ptr::JsonPointer;
use crate::service::effects::callbacks::CallbackHandler;
use crate::service::effects::prelude::*;
use crate::service::rpc::CallbackId;
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct GetStoreParams {
#[ts(optional)]
package_id: Option<PackageId>,
#[ts(type = "string")]
path: JsonPointer,
#[ts(optional)]
callback: Option<CallbackId>,
}
pub async fn get_store(
context: EffectContext,
GetStoreParams {
package_id,
path,
callback,
}: GetStoreParams,
) -> Result<Value, Error> {
crate::dbg!(&callback);
let context = context.deref()?;
let peeked = context.seed.ctx.db.peek().await;
let package_id = package_id.unwrap_or(context.seed.id.clone());
let value = peeked
.as_private()
.as_package_stores()
.as_idx(&package_id)
.map(|s| s.de())
.transpose()?
.unwrap_or_default();
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
context.seed.ctx.callbacks.add_get_store(
package_id,
path.clone(),
CallbackHandler::new(&context, callback),
);
}
Ok(path.get(&value).cloned().unwrap_or_default())
}
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct SetStoreParams {
#[ts(type = "any")]
value: Value,
#[ts(type = "string")]
path: JsonPointer,
}
pub async fn set_store(
context: EffectContext,
SetStoreParams { value, path }: SetStoreParams,
) -> Result<(), Error> {
let context = context.deref()?;
let package_id = &context.seed.id;
let res = context
.seed
.ctx
.db
.mutate(|db| {
let model = db
.as_private_mut()
.as_package_stores_mut()
.upsert(package_id, || Ok(json!({})))?;
let mut model_value = model.de()?;
if model_value.is_null() {
model_value = json!({});
}
path.set(&mut model_value, value, true)
.with_kind(ErrorKind::ParseDbField)?;
model.ser(&model_value)
})
.await;
res.result?;
if let Some(revision) = res.revision {
if let Some(callbacks) = context.seed.ctx.callbacks.get_store(package_id, &revision) {
callbacks.call(vector![]).await?;
}
}
Ok(())
}
#[derive(Debug, Clone, Serialize, Deserialize, TS, Parser)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct SetDataVersionParams {
#[ts(type = "string")]
version: VersionString,
}
pub async fn set_data_version(
context: EffectContext,
SetDataVersionParams { version }: SetDataVersionParams,
) -> Result<(), Error> {
let context = context.deref()?;
let package_id = &context.seed.id;
context
.seed
.ctx
.db
.mutate(|db| {
db.as_public_mut()
.as_package_data_mut()
.as_idx_mut(package_id)
.or_not_found(package_id)?
.as_data_version_mut()
.ser(&Some(version))
})
.await
.result?;
Ok(())
}
pub async fn get_data_version(context: EffectContext) -> Result<Option<VersionString>, Error> {
let context = context.deref()?;
let package_id = &context.seed.id;
context
.seed
.ctx
.db
.peek()
.await
.as_public()
.as_package_data()
.as_idx(package_id)
.or_not_found(package_id)?
.as_data_version()
.de()
}

View File

@@ -0,0 +1,51 @@
use models::VersionString;
use crate::service::effects::prelude::*;
#[derive(Debug, Clone, Serialize, Deserialize, TS, Parser)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct SetDataVersionParams {
#[ts(type = "string")]
version: VersionString,
}
pub async fn set_data_version(
context: EffectContext,
SetDataVersionParams { version }: SetDataVersionParams,
) -> Result<(), Error> {
let context = context.deref()?;
let package_id = &context.seed.id;
context
.seed
.ctx
.db
.mutate(|db| {
db.as_public_mut()
.as_package_data_mut()
.as_idx_mut(package_id)
.or_not_found(package_id)?
.as_data_version_mut()
.ser(&Some(version))
})
.await
.result?;
Ok(())
}
pub async fn get_data_version(context: EffectContext) -> Result<Option<VersionString>, Error> {
let context = context.deref()?;
let package_id = &context.seed.id;
context
.seed
.ctx
.db
.peek()
.await
.as_public()
.as_package_data()
.as_idx(package_id)
.or_not_found(package_id)?
.as_data_version()
.de()
}

View File

@@ -5,7 +5,7 @@ use std::time::Duration;
use color_eyre::eyre::eyre;
use futures::future::{BoxFuture, Fuse};
use futures::stream::FuturesUnordered;
use futures::{Future, FutureExt, StreamExt};
use futures::{Future, FutureExt, StreamExt, TryFutureExt};
use helpers::NonDetachingJoinHandle;
use imbl::OrdMap;
use imbl_value::InternedString;
@@ -332,22 +332,48 @@ impl ServiceMap {
#[instrument(skip_all)]
pub async fn uninstall(
&self,
ctx: &RpcContext,
id: &PackageId,
ctx: RpcContext,
id: PackageId,
soft: bool,
force: bool,
) -> Result<(), Error> {
let mut guard = self.get_mut(id).await;
if let Some(service) = guard.take() {
) -> Result<impl Future<Output = Result<(), Error>> + Send, Error> {
let mut guard = self.get_mut(&id).await;
ctx.db
.mutate(|db| {
let entry = db
.as_public_mut()
.as_package_data_mut()
.as_idx_mut(&id)
.or_not_found(&id)?;
entry.as_state_info_mut().map_mutate(|s| match s {
PackageState::Installed(s) => Ok(PackageState::Removing(s)),
_ => Err(Error::new(
eyre!("Package {id} is not installed."),
crate::ErrorKind::NotFound,
)),
})
})
.await
.result?;
Ok(async move {
ServiceRefReloadCancelGuard::new(ctx.clone(), id.clone(), "Uninstall", None)
.handle_last(async move {
let res = service.uninstall(None, soft, force).await;
drop(guard);
res
if let Some(service) = guard.take() {
let res = service.uninstall(None, soft, force).await;
drop(guard);
res
} else {
Err(Error::new(
eyre!("service {id} failed to initialize - cannot remove gracefully"),
ErrorKind::Uninitialized,
))
}
})
.await?;
Ok(())
}
Ok(())
.or_else(|e: Error| e.wait().map(Err)))
}
pub async fn shutdown_all(&self) -> Result<(), Error> {
@@ -412,9 +438,13 @@ impl ServiceRefReloadCancelGuard {
Ok(a) => Ok(a),
Err(e) => {
if let Some(info) = self.0.take() {
tokio::spawn(info.reload(Some(e.clone_output())));
let task_e = e.clone_output();
Err(e.with_task(tokio::spawn(async move {
info.reload(Some(task_e)).await.log_err();
})))
} else {
Err(e)
}
Err(e)
}
}
}