Refactor/sdk init (#2947)

* fixes for main

* refactor package initialization

* fixes from testing

* more fixes

* beta.21

* do not use instanceof

* closes #2921

* beta22

* allow disabling kiosk

* migration

* fix /etc/shadow

* actionRequest -> task

* beta.23
This commit is contained in:
Aiden McClelland
2025-05-21 10:24:37 -06:00
committed by GitHub
parent 46fd01c264
commit 44560c8da8
237 changed files with 1827 additions and 98800 deletions

View File

@@ -6,8 +6,7 @@ use models::{ActionId, PackageId, ProcedureName, ReplayId};
use crate::action::{ActionInput, ActionResult};
use crate::db::model::package::{
ActionRequestCondition, ActionRequestEntry, ActionRequestInput, ActionVisibility,
AllowedStatuses,
ActionVisibility, AllowedStatuses, TaskCondition, TaskEntry, TaskInput,
};
use crate::prelude::*;
use crate::rpc_continuations::Guid;
@@ -73,21 +72,21 @@ impl Service {
}
}
pub fn update_requested_actions(
requested_actions: &mut BTreeMap<ReplayId, ActionRequestEntry>,
pub fn update_tasks(
tasks: &mut BTreeMap<ReplayId, TaskEntry>,
package_id: &PackageId,
action_id: &ActionId,
input: &Value,
was_run: bool,
) {
requested_actions.retain(|_, v| {
if &v.request.package_id != package_id || &v.request.action_id != action_id {
tasks.retain(|_, v| {
if &v.task.package_id != package_id || &v.task.action_id != action_id {
return true;
}
if let Some(when) = &v.request.when {
if let Some(when) = &v.task.when {
match &when.condition {
ActionRequestCondition::InputNotMatches => match &v.request.input {
Some(ActionRequestInput::Partial { value }) => {
TaskCondition::InputNotMatches => match &v.task.input {
Some(TaskInput::Partial { value }) => {
if is_partial_of(value, input) {
if when.once {
return !was_run;
@@ -99,10 +98,7 @@ pub fn update_requested_actions(
}
}
None => {
tracing::error!(
"action request exists in an invalid state {:?}",
v.request
);
tracing::error!("action request exists in an invalid state {:?}", v.task);
}
},
}
@@ -180,14 +176,8 @@ impl Handler<RunAction> for ServiceActor {
.db
.mutate(|db| {
for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? {
pde.as_requested_actions_mut().mutate(|requested_actions| {
Ok(update_requested_actions(
requested_actions,
package_id,
action_id,
&input,
true,
))
pde.as_tasks_mut().mutate(|tasks| {
Ok(update_tasks(tasks, package_id, action_id, &input, true))
})?;
}
Ok(())

View File

@@ -5,7 +5,7 @@ use rpc_toolkit::{from_fn_async, Context, HandlerExt, ParentHandler};
use crate::action::{display_action_result, ActionInput, ActionResult};
use crate::db::model::package::{
ActionMetadata, ActionRequest, ActionRequestCondition, ActionRequestEntry, ActionRequestTrigger,
ActionMetadata, Task, TaskCondition, TaskEntry, TaskSeverity, TaskTrigger,
};
use crate::rpc_continuations::Guid;
use crate::service::cli::ContainerCliContext;
@@ -34,10 +34,10 @@ pub fn action_api<C: Context>() -> ParentHandler<C> {
.with_custom_display_fn(|args, res| Ok(display_action_result(args.params, res)))
.with_call_remote::<ContainerCliContext>(),
)
.subcommand("request", from_fn_async(request_action).no_cli())
.subcommand("create-task", from_fn_async(create_task).no_cli())
.subcommand(
"clear-requests",
from_fn_async(clear_action_requests)
"clear-tasks",
from_fn_async(clear_tasks)
.no_display()
.with_call_remote::<ContainerCliContext>(),
)
@@ -196,29 +196,29 @@ async fn run_action(
#[derive(Clone, Debug, Deserialize, Serialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct RequestActionParams {
pub struct CreateTaskParams {
#[serde(default)]
#[ts(skip)]
procedure_id: Guid,
replay_id: ReplayId,
#[serde(flatten)]
request: ActionRequest,
task: Task,
}
async fn request_action(
async fn create_task(
context: EffectContext,
RequestActionParams {
CreateTaskParams {
procedure_id,
replay_id,
request,
}: RequestActionParams,
task,
}: CreateTaskParams,
) -> Result<(), Error> {
let context = context.deref()?;
let src_id = &context.seed.id;
let active = match &request.when {
Some(ActionRequestTrigger { once, condition }) => match condition {
ActionRequestCondition::InputNotMatches => {
let Some(input) = request.input.as_ref() else {
let active = match &task.when {
Some(TaskTrigger { once, condition }) => match condition {
TaskCondition::InputNotMatches => {
let Some(input) = task.input.as_ref() else {
return Err(Error::new(
eyre!("input-not-matches trigger requires input to be specified"),
ErrorKind::InvalidRequest,
@@ -228,19 +228,19 @@ async fn request_action(
.seed
.ctx
.services
.get(&request.package_id)
.get(&task.package_id)
.await
.as_ref()
{
let Some(prev) = service
.get_action_input(procedure_id, request.action_id.clone())
.get_action_input(procedure_id.clone(), task.action_id.clone())
.await?
else {
return Err(Error::new(
eyre!(
"action {} of {} has no input",
request.action_id,
request.package_id
task.action_id,
task.package_id
),
ErrorKind::InvalidRequest,
));
@@ -261,6 +261,9 @@ async fn request_action(
},
None => true,
};
if active && task.severity == TaskSeverity::Critical {
context.stop(procedure_id).await?;
}
context
.seed
.ctx
@@ -270,8 +273,8 @@ async fn request_action(
.as_package_data_mut()
.as_idx_mut(src_id)
.or_not_found(src_id)?
.as_requested_actions_mut()
.insert(&replay_id, &ActionRequestEntry { active, request })
.as_tasks_mut()
.insert(&replay_id, &TaskEntry { active, task })
})
.await
.result?;
@@ -281,16 +284,16 @@ async fn request_action(
#[derive(Debug, Clone, Serialize, Deserialize, TS, Parser)]
#[ts(type = "{ only: string[] } | { except: string[] }")]
#[ts(export)]
pub struct ClearActionRequestsParams {
pub struct ClearTasksParams {
#[arg(long, conflicts_with = "except")]
pub only: Option<Vec<ReplayId>>,
#[arg(long, conflicts_with = "only")]
pub except: Option<Vec<ReplayId>>,
}
async fn clear_action_requests(
async fn clear_tasks(
context: EffectContext,
ClearActionRequestsParams { only, except }: ClearActionRequestsParams,
ClearTasksParams { only, except }: ClearTasksParams,
) -> Result<(), Error> {
let context = context.deref()?;
let package_id = context.seed.id.clone();
@@ -305,7 +308,7 @@ async fn clear_action_requests(
.as_package_data_mut()
.as_idx_mut(&package_id)
.or_not_found(&package_id)?
.as_requested_actions_mut()
.as_tasks_mut()
.mutate(|a| {
Ok(a.retain(|e, _| {
only.as_ref().map_or(true, |only| !only.contains(e))

View File

@@ -10,7 +10,7 @@ use models::{FromStrParser, HealthCheckId, PackageId, ReplayId, VersionString, V
use tokio::process::Command;
use crate::db::model::package::{
ActionRequestEntry, CurrentDependencies, CurrentDependencyInfo, CurrentDependencyKind,
TaskEntry, CurrentDependencies, CurrentDependencyInfo, CurrentDependencyKind,
ManifestPreference,
};
use crate::disk::mount::filesystem::bind::Bind;
@@ -335,7 +335,7 @@ pub struct CheckDependenciesResult {
installed_version: Option<VersionString>,
satisfies: BTreeSet<VersionString>,
is_running: bool,
requested_actions: BTreeMap<ReplayId, ActionRequestEntry>,
tasks: BTreeMap<ReplayId, TaskEntry>,
#[ts(as = "BTreeMap::<HealthCheckId, NamedHealthCheckResult>")]
health_checks: OrdMap<HealthCheckId, NamedHealthCheckResult>,
}
@@ -351,7 +351,7 @@ pub async fn check_dependencies(
.as_idx(&context.seed.id)
.or_not_found(&context.seed.id)?;
let current_dependencies = pde.as_current_dependencies().de()?;
let requested_actions = pde.as_requested_actions().de()?;
let tasks = pde.as_tasks().de()?;
let package_dependency_info: Vec<_> = package_ids
.unwrap_or_else(|| current_dependencies.0.keys().cloned().collect())
.into_iter()
@@ -365,9 +365,9 @@ pub async fn check_dependencies(
for (package_id, dependency_info) in package_dependency_info {
let title = dependency_info.title.clone();
let Some(package) = db.as_public().as_package_data().as_idx(&package_id) else {
let requested_actions = requested_actions
let tasks = tasks
.iter()
.filter(|(_, v)| v.request.package_id == package_id)
.filter(|(_, v)| v.task.package_id == package_id)
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
results.push(CheckDependenciesResult {
@@ -376,7 +376,7 @@ pub async fn check_dependencies(
installed_version: None,
satisfies: BTreeSet::new(),
is_running: false,
requested_actions,
tasks,
health_checks: Default::default(),
});
continue;
@@ -393,9 +393,9 @@ pub async fn check_dependencies(
false
};
let health_checks = status.health().cloned().unwrap_or_default();
let requested_actions = requested_actions
let tasks = tasks
.iter()
.filter(|(_, v)| v.request.package_id == package_id)
.filter(|(_, v)| v.task.package_id == package_id)
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
results.push(CheckDependenciesResult {
@@ -404,7 +404,7 @@ pub async fn check_dependencies(
installed_version,
satisfies,
is_running,
requested_actions,
tasks,
health_checks,
});
}

View File

@@ -1,5 +1,3 @@
use models::VersionString;
use crate::service::effects::prelude::*;
#[derive(Debug, Clone, Serialize, Deserialize, TS, Parser)]
@@ -7,7 +5,7 @@ use crate::service::effects::prelude::*;
#[ts(export)]
pub struct SetDataVersionParams {
#[ts(type = "string")]
version: VersionString,
version: Option<String>,
}
pub async fn set_data_version(
context: EffectContext,
@@ -25,7 +23,7 @@ pub async fn set_data_version(
.as_idx_mut(package_id)
.or_not_found(package_id)?
.as_data_version_mut()
.ser(&Some(version))
.ser(&version)
})
.await
.result?;
@@ -33,7 +31,7 @@ pub async fn set_data_version(
Ok(())
}
pub async fn get_data_version(context: EffectContext) -> Result<Option<VersionString>, Error> {
pub async fn get_data_version(context: EffectContext) -> Result<Option<String>, Error> {
let context = context.deref()?;
let package_id = &context.seed.id;
context

View File

@@ -17,7 +17,7 @@ use futures::{FutureExt, SinkExt, StreamExt, TryStreamExt};
use helpers::NonDetachingJoinHandle;
use imbl_value::{json, InternedString};
use itertools::Itertools;
use models::{ActionId, HostId, ImageId, PackageId, ProcedureName};
use models::{ActionId, HostId, ImageId, PackageId};
use nix::sys::signal::Signal;
use persistent_container::{PersistentContainer, Subcontainer};
use rpc_toolkit::{from_fn_async, CallRemoteHandler, Empty, HandlerArgs, HandlerFor};
@@ -30,27 +30,31 @@ use tokio::process::Command;
use tokio::sync::Notify;
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
use ts_rs::TS;
use url::Url;
use crate::context::{CliContext, RpcContext};
use crate::db::model::package::{
InstalledState, PackageDataEntry, PackageState, PackageStateMatchModelRef, UpdatingState,
InstalledState, ManifestPreference, PackageDataEntry, PackageState, PackageStateMatchModelRef,
UpdatingState,
};
use crate::disk::mount::guard::GenericMountGuard;
use crate::disk::mount::filesystem::ReadOnly;
use crate::disk::mount::guard::{GenericMountGuard, MountGuard};
use crate::install::PKG_ARCHIVE_DIR;
use crate::lxc::ContainerId;
use crate::prelude::*;
use crate::progress::{NamedProgress, Progress};
use crate::rpc_continuations::{Guid, RpcContinuation};
use crate::s9pk::S9pk;
use crate::service::action::update_requested_actions;
use crate::service::action::update_tasks;
use crate::service::rpc::{ExitParams, InitKind};
use crate::service::service_map::InstallProgressHandles;
use crate::util::actor::concurrent::ConcurrentActor;
use crate::util::io::{create_file, AsyncReadStream, TermSize};
use crate::util::net::WebSocketExt;
use crate::util::serde::{NoOutput, Pem};
use crate::util::serde::Pem;
use crate::util::Never;
use crate::volume::data_dir;
use crate::{CAP_1_KiB, DATA_DIR, PACKAGE_DATA};
use crate::{CAP_1_KiB, DATA_DIR};
pub mod action;
pub mod cli;
@@ -62,6 +66,7 @@ mod service_actor;
pub mod service_map;
pub mod start_stop;
mod transition;
pub mod uninstall;
mod util;
pub use service_map::ServiceMap;
@@ -116,99 +121,35 @@ impl ServiceRef {
pub fn weak(&self) -> Weak<Service> {
Arc::downgrade(&self.0)
}
pub async fn uninstall(
self,
target_version: Option<models::VersionString>,
soft: bool,
force: bool,
) -> Result<(), Error> {
let uninit_res = self
.seed
.persistent_container
.execute::<NoOutput>(
Guid::new(),
ProcedureName::PackageUninit,
to_value(&target_version)?,
None,
) // TODO timeout
.await;
pub async fn uninstall(self, uninit: ExitParams, soft: bool, force: bool) -> Result<(), Error> {
let id = self.seed.persistent_container.s9pk.as_manifest().id.clone();
let ctx = self.seed.ctx.clone();
let uninit_res = self.shutdown(Some(uninit.clone())).await;
if force {
uninit_res.log_err();
} else {
uninit_res?;
}
let id = self.seed.persistent_container.s9pk.as_manifest().id.clone();
let ctx = self.seed.ctx.clone();
self.shutdown().await?;
if target_version.is_none() {
if let Some(pde) = ctx
.db
.mutate(|d| {
if let Some(pde) = d
.as_public_mut()
.as_package_data_mut()
.remove(&id)?
.map(|d| d.de())
.transpose()?
{
d.as_private_mut().as_available_ports_mut().mutate(|p| {
p.free(
pde.hosts
.0
.values()
.flat_map(|h| h.bindings.values())
.flat_map(|b| {
b.net
.assigned_port
.into_iter()
.chain(b.net.assigned_ssl_port)
}),
);
Ok(())
})?;
d.as_private_mut().as_package_stores_mut().remove(&id)?;
Ok(Some(pde))
} else {
Ok(None)
}
})
.await
.result?
{
let state = pde.state_info.expect_removing()?;
if !soft {
for volume_id in &state.manifest.volumes {
let path = data_dir(DATA_DIR, &state.manifest.id, volume_id);
if tokio::fs::metadata(&path).await.is_ok() {
tokio::fs::remove_dir_all(&path).await?;
}
}
let logs_dir = Path::new(PACKAGE_DATA)
.join("logs")
.join(&state.manifest.id);
if tokio::fs::metadata(&logs_dir).await.is_ok() {
tokio::fs::remove_dir_all(&logs_dir).await?;
}
let archive_path = Path::new(PACKAGE_DATA)
.join("archive")
.join("installed")
.join(&state.manifest.id);
if tokio::fs::metadata(&archive_path).await.is_ok() {
tokio::fs::remove_file(&archive_path).await?;
}
}
}
if uninit.is_uninstall() {
uninstall::cleanup(&ctx, &id, soft).await?;
}
Ok(())
}
pub async fn shutdown(self) -> Result<(), Error> {
pub async fn shutdown(self, uninit: Option<ExitParams>) -> Result<(), Error> {
if let Some((hdl, shutdown)) = self.seed.persistent_container.rpc_server.send_replace(None)
{
self.seed
.persistent_container
.rpc_client
.request(rpc::Exit, Empty {})
.request(
rpc::Exit,
uninit.clone().unwrap_or_else(|| {
ExitParams::target_version(
&*self.seed.persistent_container.s9pk.as_manifest().version,
)
}),
)
.await?;
shutdown.shutdown();
tokio::time::timeout(Duration::from_secs(30), hdl)
@@ -234,11 +175,12 @@ impl ServiceRef {
)
})?
.persistent_container
.exit()
.exit(uninit)
.await?;
Ok(())
}
}
impl Deref for ServiceRef {
type Target = Service;
fn deref(&self) -> &Self::Target {
@@ -257,7 +199,14 @@ pub struct Service {
}
impl Service {
#[instrument(skip_all)]
async fn new(ctx: RpcContext, s9pk: S9pk, start: StartStop) -> Result<ServiceRef, Error> {
async fn new(
ctx: RpcContext,
s9pk: S9pk,
start: StartStop,
procedure_id: Guid,
init_kind: Option<InitKind>,
recovery_source: Option<impl GenericMountGuard>,
) -> Result<ServiceRef, Error> {
let id = s9pk.as_manifest().id.clone();
let persistent_container = PersistentContainer::new(
&ctx, s9pk,
@@ -277,11 +226,28 @@ impl Service {
seed,
}
.into();
let recovery_guard = if let Some(recovery_source) = &recovery_source {
Some(
service
.seed
.persistent_container
.mount_backup(recovery_source.path().join("data"), ReadOnly)
.await?,
)
} else {
None
};
service
.seed
.persistent_container
.init(service.weak())
.init(service.weak(), procedure_id, init_kind)
.await?;
if let Some(recovery_guard) = recovery_guard {
recovery_guard.unmount(true).await?;
}
if let Some(recovery_source) = recovery_source {
recovery_source.unmount().await?;
}
Ok(service)
}
@@ -305,7 +271,9 @@ impl Service {
} else {
StartStop::Stop
};
Self::new(ctx, s9pk, start_stop).await.map(Some)
Self::new(ctx, s9pk, start_stop, Guid::new(), None, None::<MountGuard>)
.await
.map(Some)
}
};
let s9pk_dir = Path::new(DATA_DIR).join(PKG_ARCHIVE_DIR).join("installed"); // TODO: make this based on hash
@@ -328,7 +296,7 @@ impl Service {
tracing::debug!("{e:?}")
}) {
if let Ok(service) =
Self::install(ctx.clone(), s9pk, None, None::<Never>, None)
Self::install(ctx.clone(), s9pk, &None, None, None::<Never>, None)
.await
.map_err(|e| {
tracing::error!("Error installing service: {e}");
@@ -365,7 +333,8 @@ impl Service {
if let Ok(service) = Self::install(
ctx.clone(),
s9pk,
Some(s.as_manifest().as_version().de()?),
&None,
Some(entry.as_status().de()?.run_state()),
None::<Never>,
None,
)
@@ -407,27 +376,66 @@ impl Service {
tracing::error!("Error opening s9pk for removal: {e}");
tracing::debug!("{e:?}")
}) {
if let Ok(service) = Self::new(ctx.clone(), s9pk, StartStop::Stop)
.await
.map_err(|e| {
tracing::error!("Error loading service for removal: {e}");
tracing::debug!("{e:?}")
})
let err_state = |e: Error| async move {
let state = crate::status::MainStatus::Error {
on_rebuild: StartStop::Stop,
message: e.to_string(),
debug: Some(format!("{e:?}")),
};
ctx.db
.mutate(move |db| {
if let Some(pde) =
db.as_public_mut().as_package_data_mut().as_idx_mut(&id)
{
pde.as_state_info_mut().map_mutate(|s| {
Ok(PackageState::Installed(InstalledState {
manifest: s
.as_manifest(ManifestPreference::Old)
.clone(),
}))
})?;
pde.as_status_mut().ser(&state)?;
}
Ok(())
})
.await
.result
};
match Self::new(
ctx.clone(),
s9pk,
StartStop::Stop,
Guid::new(),
None,
None::<MountGuard>,
)
.await
{
match service.uninstall(None, false, false).await {
Ok(service) => match service
.uninstall(ExitParams::uninstall(), false, false)
.await
{
Err(e) => {
tracing::error!("Error uninstalling service: {e}");
tracing::debug!("{e:?}")
tracing::debug!("{e:?}");
err_state(e).await?;
}
Ok(()) => return Ok(None),
},
Err(e) => {
tracing::error!("Error loading service for removal: {e}");
tracing::debug!("{e:?}");
err_state(e).await?;
}
}
}
ctx.db
.mutate(|v| v.as_public_mut().as_package_data_mut().remove(id))
.await
.result?;
if disposition == LoadDisposition::Retry {
ctx.db
.mutate(|v| v.as_public_mut().as_package_data_mut().remove(id))
.await
.result?;
}
Ok(None)
}
@@ -445,53 +453,30 @@ impl Service {
pub async fn install(
ctx: RpcContext,
s9pk: S9pk,
mut src_version: Option<models::VersionString>,
registry: &Option<Url>,
prev_state: Option<StartStop>,
recovery_source: Option<impl GenericMountGuard>,
progress: Option<InstallProgressHandles>,
) -> Result<ServiceRef, Error> {
let manifest = s9pk.as_manifest().clone();
let developer_key = s9pk.as_archive().signer();
let icon = s9pk.icon_data_url().await?;
let service = Self::new(ctx.clone(), s9pk, StartStop::Stop).await?;
if let Some(recovery_source) = recovery_source {
service
.actor
.send(
Guid::new(),
transition::restore::Restore {
path: recovery_source.path().to_path_buf(),
},
)
.await??;
recovery_source.unmount().await?;
src_version = Some(
service
.seed
.persistent_container
.s9pk
.as_manifest()
.version
.clone(),
);
}
let procedure_id = Guid::new();
service
.seed
.persistent_container
.execute::<NoOutput>(
procedure_id.clone(),
ProcedureName::PackageInit,
to_value(&src_version)?,
None,
) // TODO timeout
.await
.with_kind(if src_version.is_some() {
ErrorKind::UpdateFailed
let service = Self::new(
ctx.clone(),
s9pk,
StartStop::Stop,
procedure_id.clone(),
Some(if recovery_source.is_some() {
InitKind::Restore
} else if prev_state.is_some() {
InitKind::Update
} else {
ErrorKind::InstallFailed
})?; // TODO: handle cancellation
InitKind::Install
}),
recovery_source,
)
.await?;
if let Some(mut progress) = progress {
progress.finalization_progress.complete();
@@ -501,19 +486,19 @@ impl Service {
let peek = ctx.db.peek().await;
let mut action_input: BTreeMap<ActionId, Value> = BTreeMap::new();
let requested_actions: BTreeSet<_> = peek
let tasks: BTreeSet<_> = peek
.as_public()
.as_package_data()
.as_entries()?
.into_iter()
.map(|(_, pde)| {
Ok(pde
.as_requested_actions()
.as_tasks()
.as_entries()?
.into_iter()
.map(|(_, r)| {
Ok::<_, Error>(if r.as_request().as_package_id().de()? == manifest.id {
Some(r.as_request().as_action_id().de()?)
Ok::<_, Error>(if r.as_task().as_package_id().de()? == manifest.id {
Some(r.as_task().as_action_id().de()?)
} else {
None
})
@@ -523,27 +508,30 @@ impl Service {
.flatten_ok()
.map(|a| a.and_then(|a| a))
.try_collect()?;
for action_id in requested_actions {
if let Some(input) = service
.get_action_input(procedure_id.clone(), action_id.clone())
.await?
.and_then(|i| i.value)
for action_id in tasks {
if peek
.as_public()
.as_package_data()
.as_idx(&manifest.id)
.or_not_found(&manifest.id)?
.as_actions()
.contains_key(&action_id)?
{
action_input.insert(action_id, input);
if let Some(input) = service
.get_action_input(procedure_id.clone(), action_id.clone())
.await?
.and_then(|i| i.value)
{
action_input.insert(action_id, input);
}
}
}
ctx.db
.mutate(|db| {
for (action_id, input) in &action_input {
for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? {
pde.as_requested_actions_mut().mutate(|requested_actions| {
Ok(update_requested_actions(
requested_actions,
&manifest.id,
action_id,
input,
false,
))
pde.as_tasks_mut().mutate(|tasks| {
Ok(update_tasks(tasks, &manifest.id, action_id, input, false))
})?;
}
}
@@ -552,13 +540,18 @@ impl Service {
.as_package_data_mut()
.as_idx_mut(&manifest.id)
.or_not_found(&manifest.id)?;
let actions = entry.as_actions().keys()?;
entry.as_tasks_mut().mutate(|t| {
Ok(t.retain(|_, v| {
v.task.package_id != manifest.id || actions.contains(&v.task.action_id)
}))
})?;
entry
.as_state_info_mut()
.ser(&PackageState::Installed(InstalledState { manifest }))?;
entry.as_developer_key_mut().ser(&Pem::new(developer_key))?;
entry.as_icon_mut().ser(&icon)?;
// TODO: marketplace url
// TODO: dependency info
entry.as_registry_mut().ser(registry)?;
Ok(())
})
@@ -583,7 +576,7 @@ impl Service {
.send(
Guid::new(),
transition::backup::Backup {
path: guard.path().to_path_buf(),
path: guard.path().join("data"),
},
)
.await??

View File

@@ -31,7 +31,9 @@ use crate::s9pk::merkle_archive::source::FileSource;
use crate::s9pk::S9pk;
use crate::service::effects::context::EffectContext;
use crate::service::effects::handler;
use crate::service::rpc::{CallbackHandle, CallbackId, CallbackParams};
use crate::service::rpc::{
CallbackHandle, CallbackId, CallbackParams, ExitParams, InitKind, InitParams,
};
use crate::service::start_stop::StartStop;
use crate::service::transition::{TransitionKind, TransitionState};
use crate::service::{rpc, RunningStatus, Service};
@@ -369,7 +371,12 @@ impl PersistentContainer {
}
#[instrument(skip_all)]
pub async fn init(&self, seed: Weak<Service>) -> Result<(), Error> {
pub async fn init(
&self,
seed: Weak<Service>,
procedure_id: Guid,
kind: Option<InitKind>,
) -> Result<(), Error> {
let socket_server_context = EffectContext::new(seed);
let server = Server::new(move || ready(Ok(socket_server_context.clone())), handler());
let path = self
@@ -424,7 +431,15 @@ impl PersistentContainer {
));
}
self.rpc_client.request(rpc::Init, Empty {}).await?;
self.rpc_client
.request(
rpc::Init,
InitParams {
id: procedure_id,
kind,
},
)
.await?;
self.state.send_modify(|s| s.rt_initialized = true);
@@ -435,10 +450,12 @@ impl PersistentContainer {
fn destroy(
&mut self,
error: bool,
uninit: Option<ExitParams>,
) -> Option<impl Future<Output = Result<(), Error>> + 'static> {
if self.destroyed {
return None;
}
let version = self.s9pk.as_manifest().version.clone();
let rpc_client = self.rpc_client.clone();
let rpc_server = self.rpc_server.send_replace(None);
let js_mount = self.js_mount.take();
@@ -469,7 +486,14 @@ impl PersistentContainer {
}
}
if let Some((hdl, shutdown)) = rpc_server {
errs.handle(rpc_client.request(rpc::Exit, Empty {}).await);
errs.handle(
rpc_client
.request(
rpc::Exit,
uninit.unwrap_or_else(|| ExitParams::target_version(&*version)),
)
.await,
);
shutdown.shutdown();
errs.handle(hdl.await.with_kind(ErrorKind::Cancelled));
}
@@ -494,8 +518,8 @@ impl PersistentContainer {
}
#[instrument(skip_all)]
pub async fn exit(mut self) -> Result<(), Error> {
if let Some(destroy) = self.destroy(false) {
pub async fn exit(mut self, uninit: Option<ExitParams>) -> Result<(), Error> {
if let Some(destroy) = self.destroy(false, uninit) {
destroy.await?;
}
tracing::info!("Service for {} exited", self.s9pk.as_manifest().id);
@@ -613,7 +637,7 @@ impl PersistentContainer {
impl Drop for PersistentContainer {
fn drop(&mut self) {
if let Some(destroy) = self.destroy(true) {
if let Some(destroy) = self.destroy(true, None) {
tokio::spawn(async move { destroy.await.log_err() });
}
}

View File

@@ -4,8 +4,9 @@ use std::sync::{Arc, Weak};
use std::time::Duration;
use clap::builder::ValueParserFactory;
use exver::{ExtendedVersion, VersionRange};
use imbl::Vector;
use imbl_value::Value;
use imbl_value::{InternedString, Value};
use models::{FromStrParser, ProcedureName};
use rpc_toolkit::yajrc::RpcMethod;
use rpc_toolkit::Empty;
@@ -16,10 +17,25 @@ use crate::rpc_continuations::Guid;
use crate::service::persistent_container::PersistentContainer;
use crate::util::Never;
#[derive(Clone, serde::Deserialize, serde::Serialize, TS)]
#[serde(rename_all = "kebab-case")]
pub enum InitKind {
Install,
Update,
Restore,
}
#[derive(Clone, serde::Deserialize, serde::Serialize, TS)]
#[serde(rename_all = "camelCase")]
pub struct InitParams {
pub id: Guid,
pub kind: Option<InitKind>,
}
#[derive(Clone)]
pub struct Init;
impl RpcMethod for Init {
type Params = Empty;
type Params = InitParams;
type Response = ();
fn as_str<'a>(&'a self) -> &'a str {
"init"
@@ -70,10 +86,42 @@ impl serde::Serialize for Stop {
}
}
#[derive(Clone, serde::Deserialize, serde::Serialize, TS)]
#[serde(rename_all = "camelCase")]
pub struct ExitParams {
id: Guid,
/// VersionRange or ExtendedVersion
#[ts(type = "string | null")]
target: Option<InternedString>,
}
impl ExitParams {
pub fn target_version(version: &ExtendedVersion) -> Self {
Self {
id: Guid::new(),
target: Some(InternedString::from_display(version)),
}
}
pub fn target_range(range: &VersionRange) -> Self {
Self {
id: Guid::new(),
target: Some(InternedString::from_display(range)),
}
}
pub fn uninstall() -> Self {
Self {
id: Guid::new(),
target: None,
}
}
pub fn is_uninstall(&self) -> bool {
self.target.is_none()
}
}
#[derive(Clone)]
pub struct Exit;
impl RpcMethod for Exit {
type Params = Empty;
type Params = ExitParams;
type Response = ();
fn as_str<'a>(&'a self) -> &'a str {
"exit"

View File

@@ -58,10 +58,6 @@ async fn service_actor_loop(
transition_state: Some(TransitionKind::Restarting),
..
} => MainStatus::Restarting,
ServiceStateKinds {
transition_state: Some(TransitionKind::Restoring),
..
} => MainStatus::Restoring,
ServiceStateKinds {
transition_state: Some(TransitionKind::BackingUp),
..

View File

@@ -3,15 +3,16 @@ use std::sync::Arc;
use std::time::Duration;
use color_eyre::eyre::eyre;
use exver::VersionRange;
use futures::future::{BoxFuture, Fuse};
use futures::stream::FuturesUnordered;
use futures::{Future, FutureExt, StreamExt, TryFutureExt};
use helpers::NonDetachingJoinHandle;
use imbl::OrdMap;
use imbl_value::InternedString;
use models::ErrorData;
use tokio::sync::{oneshot, Mutex, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
use tracing::instrument;
use url::Url;
use crate::context::RpcContext;
use crate::db::model::package::{
@@ -22,9 +23,11 @@ use crate::install::PKG_ARCHIVE_DIR;
use crate::notifications::{notify, NotificationLevel};
use crate::prelude::*;
use crate::progress::{FullProgressTracker, PhaseProgressTrackerHandle, ProgressTrackerWriter};
use crate::rpc_continuations::Guid;
use crate::s9pk::manifest::PackageId;
use crate::s9pk::merkle_archive::source::FileSource;
use crate::s9pk::S9pk;
use crate::service::rpc::ExitParams;
use crate::service::start_stop::StartStop;
use crate::service::{LoadDisposition, Service, ServiceRef};
use crate::status::MainStatus;
@@ -94,7 +97,7 @@ impl ServiceMap {
let mut shutdown_err = Ok(());
let mut service = self.get_mut(id).await;
if let Some(service) = service.take() {
shutdown_err = service.shutdown().await;
shutdown_err = service.shutdown(None).await;
}
match Service::load(ctx, id, disposition).await {
Ok(s) => *service = s.into(),
@@ -130,6 +133,7 @@ impl ServiceMap {
&self,
ctx: RpcContext,
s9pk: F,
registry: Option<Url>,
recovery_source: Option<impl GenericMountGuard>,
progress: Option<FullProgressTracker>,
) -> Result<DownloadInstallFuture, Error>
@@ -181,6 +185,7 @@ impl ServiceMap {
let manifest = manifest.clone();
let id = id.clone();
let install_progress = progress.snapshot();
let registry = registry.clone();
move |db| {
if let Some(pde) =
db.as_public_mut().as_package_data_mut().as_idx_mut(&id)
@@ -212,13 +217,13 @@ impl ServiceMap {
},
data_version: None,
status: MainStatus::Stopped,
registry: None,
registry,
developer_key: Pem::new(developer_key),
icon,
last_backup: None,
current_dependencies: Default::default(),
actions: Default::default(),
requested_actions: Default::default(),
tasks: Default::default(),
service_interfaces: Default::default(),
hosts: Default::default(),
store_exposed_dependents: Default::default(),
@@ -287,35 +292,59 @@ impl ServiceMap {
ErrorKind::InvalidRequest,
"cannot restore over existing package"
);
let version = service
let prev_version = service
.seed
.persistent_container
.s9pk
.as_manifest()
.version
.clone();
service
.uninstall(Some(s9pk.as_manifest().version.clone()), false, false)
.await?;
let prev_can_migrate_to = &service
.seed
.persistent_container
.s9pk
.as_manifest()
.can_migrate_to;
let next_version = &s9pk.as_manifest().version;
let next_can_migrate_from = &s9pk.as_manifest().can_migrate_from;
let uninit = if prev_version.satisfies(next_can_migrate_from) {
ExitParams::target_version(&*prev_version)
} else if next_version.satisfies(prev_can_migrate_to) {
ExitParams::target_version(&s9pk.as_manifest().version)
} else {
ExitParams::target_range(&VersionRange::and(
prev_can_migrate_to.clone(),
next_can_migrate_from.clone(),
))
};
let run_state = service
.seed
.persistent_container
.state
.borrow()
.desired_state;
service.uninstall(uninit, false, false).await?;
progress.complete();
Some(version)
Some(run_state)
} else {
None
};
*service = Some(
Service::install(
ctx,
s9pk,
prev,
recovery_source,
Some(InstallProgressHandles {
finalization_progress,
progress,
}),
)
.await?
.into(),
);
let new_service = Service::install(
ctx,
s9pk,
&registry,
prev,
recovery_source,
Some(InstallProgressHandles {
finalization_progress,
progress,
}),
)
.await?;
if prev == Some(StartStop::Start) {
new_service.start(Guid::new()).await?;
}
*service = Some(new_service.into());
drop(service);
sync_progress_task.await.map_err(|_| {
@@ -359,14 +388,23 @@ impl ServiceMap {
ServiceRefReloadCancelGuard::new(ctx.clone(), id.clone(), "Uninstall", None)
.handle_last(async move {
if let Some(service) = guard.take() {
let res = service.uninstall(None, soft, force).await;
let res = service
.uninstall(ExitParams::uninstall(), soft, force)
.await;
drop(guard);
res
} else {
Err(Error::new(
eyre!("service {id} failed to initialize - cannot remove gracefully"),
ErrorKind::Uninitialized,
))
if force {
super::uninstall::cleanup(&ctx, &id, soft).await?;
Ok(())
} else {
Err(Error::new(
eyre!(
"service {id} failed to initialize - cannot remove gracefully"
),
ErrorKind::Uninitialized,
))
}
}
})
.await?;
@@ -382,7 +420,7 @@ impl ServiceMap {
for service in lock.values().cloned() {
futs.push(async move {
if let Some(service) = service.write_owned().await.take() {
service.shutdown().await?
service.shutdown(None).await?
}
Ok::<_, Error>(())
});

View File

@@ -10,13 +10,11 @@ use crate::util::future::{CancellationHandle, RemoteCancellable};
pub mod backup;
pub mod restart;
pub mod restore;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum TransitionKind {
BackingUp,
Restarting,
Restoring,
}
/// Used only in the manager/mod and is used to keep track of the state of the manager during the

View File

@@ -1,76 +0,0 @@
use std::path::PathBuf;
use futures::channel::oneshot;
use futures::FutureExt;
use models::ProcedureName;
use crate::disk::mount::filesystem::ReadOnly;
use crate::prelude::*;
use crate::rpc_continuations::Guid;
use crate::service::transition::{TransitionKind, TransitionState};
use crate::service::ServiceActor;
use crate::util::actor::background::BackgroundJobQueue;
use crate::util::actor::{ConflictBuilder, Handler};
use crate::util::future::RemoteCancellable;
use crate::util::serde::NoOutput;
pub(in crate::service) struct Restore {
pub path: PathBuf,
}
impl Handler<Restore> for ServiceActor {
type Response = Result<(), Error>;
fn conflicts_with(_: &Restore) -> ConflictBuilder<Self> {
ConflictBuilder::everything()
}
async fn handle(
&mut self,
id: Guid,
restore: Restore,
jobs: &BackgroundJobQueue,
) -> Self::Response {
// So Need a handle to just a single field in the state
let path = restore.path;
let seed = self.0.clone();
let state = self.0.persistent_container.state.clone();
let (send_res, recv_res) = oneshot::channel();
let transition = RemoteCancellable::new(
async move {
let backup_guard = seed
.persistent_container
.mount_backup(path, ReadOnly)
.await?;
seed.persistent_container
.execute::<NoOutput>(id, ProcedureName::RestoreBackup, Value::Null, None)
.await?;
backup_guard.unmount(true).await?;
state.send_modify(|s| {
s.transition_state.take();
});
Ok::<_, Error>(())
}
.map(|res| send_res.send(res)),
);
let cancel_handle = transition.cancellation_handle();
jobs.add_job(transition.map(|_| ()));
let mut old = None;
self.0.persistent_container.state.send_modify(|s| {
old = std::mem::replace(
&mut s.transition_state,
Some(TransitionState {
kind: TransitionKind::Restoring,
cancel_handle,
}),
)
});
if let Some(t) = old {
t.abort().await;
}
match recv_res.await {
Err(_) => Err(Error::new(eyre!("Restoring canceled"), ErrorKind::Unknown)),
Ok(res) => res,
}
}
}

View File

@@ -0,0 +1,70 @@
use std::path::Path;
use models::PackageId;
use crate::context::RpcContext;
use crate::prelude::*;
use crate::volume::data_dir;
use crate::{DATA_DIR, PACKAGE_DATA};
pub async fn cleanup(ctx: &RpcContext, id: &PackageId, soft: bool) -> Result<(), Error> {
Ok(
if let Some(pde) = ctx
.db
.mutate(|d| {
if let Some(pde) = d
.as_public_mut()
.as_package_data_mut()
.remove(&id)?
.map(|d| d.de())
.transpose()?
{
d.as_private_mut().as_available_ports_mut().mutate(|p| {
p.free(
pde.hosts
.0
.values()
.flat_map(|h| h.bindings.values())
.flat_map(|b| {
b.net
.assigned_port
.into_iter()
.chain(b.net.assigned_ssl_port)
}),
);
Ok(())
})?;
d.as_private_mut().as_package_stores_mut().remove(&id)?;
Ok(Some(pde))
} else {
Ok(None)
}
})
.await
.result?
{
let state = pde.state_info.expect_removing()?;
if !soft {
for volume_id in &state.manifest.volumes {
let path = data_dir(DATA_DIR, &state.manifest.id, volume_id);
if tokio::fs::metadata(&path).await.is_ok() {
tokio::fs::remove_dir_all(&path).await?;
}
}
let logs_dir = Path::new(PACKAGE_DATA)
.join("logs")
.join(&state.manifest.id);
if tokio::fs::metadata(&logs_dir).await.is_ok() {
tokio::fs::remove_dir_all(&logs_dir).await?;
}
let archive_path = Path::new(PACKAGE_DATA)
.join("archive")
.join("installed")
.join(&state.manifest.id);
if tokio::fs::metadata(&archive_path).await.is_ok() {
tokio::fs::remove_file(&archive_path).await?;
}
}
},
)
}