diff --git a/core/src/service/action.rs b/core/src/service/action.rs index e3367918f..7694728a0 100644 --- a/core/src/service/action.rs +++ b/core/src/service/action.rs @@ -9,7 +9,7 @@ use crate::db::model::package::{ }; use crate::prelude::*; use crate::rpc_continuations::Guid; -use crate::service::{ProcedureName, Service, ServiceActor}; +use crate::service::{ProcedureName, Service, ServiceActor, ServiceActorSeed}; use crate::util::actor::background::BackgroundJobQueue; use crate::util::actor::{ConflictBuilder, Handler}; use crate::util::serde::is_partial_of; @@ -131,6 +131,51 @@ pub fn update_tasks( critical_activated } +impl ServiceActorSeed { + /// Fetch the current action input and re-evaluate all tasks targeting this action. + /// + /// This is the single source of truth for task re-evaluation. Used both after + /// running an action and during service init (recheck_tasks). + pub(super) async fn eval_action_tasks( + &self, + action_id: &ActionId, + was_run: bool, + ) -> Result<(), Error> { + let package_id = &self.id; + let current_input = self + .persistent_container + .execute::>( + Guid::new(), + ProcedureName::GetActionInput(action_id.clone()), + json!({ "prefill": Value::Null }), + Some(Duration::from_secs(30)), + ) + .await + .log_err() + .ok() + .flatten() + .and_then(|ai| ai.value); + let Some(input) = current_input else { + return Ok(()); + }; + self.ctx + .db + .mutate(|db| { + for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? { + if pde.as_tasks_mut().mutate(|tasks| { + Ok(update_tasks(tasks, package_id, action_id, &input, was_run)) + })? { + pde.as_status_info_mut().stop()?; + } + } + Ok(()) + }) + .await + .result?; + Ok(()) + } +} + pub(super) struct RunAction { action_id: ActionId, input: Value, @@ -203,22 +248,7 @@ impl Handler for ServiceActor { ) .await .with_kind(ErrorKind::Action)?; - let package_id = package_id.clone(); - self.0 - .ctx - .db - .mutate(|db| { - for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? { - if pde.as_tasks_mut().mutate(|tasks| { - Ok(update_tasks(tasks, &package_id, action_id, &input, true)) - })? { - pde.as_status_info_mut().stop()?; - } - } - Ok(()) - }) - .await - .result?; + self.0.eval_action_tasks(action_id, true).await?; Ok(result) } } diff --git a/core/src/service/mod.rs b/core/src/service/mod.rs index 9fbcfa300..05c6e56ea 100644 --- a/core/src/service/mod.rs +++ b/core/src/service/mod.rs @@ -39,7 +39,6 @@ use crate::lxc::ContainerId; use crate::prelude::*; use crate::rpc_continuations::{Guid, RpcContinuation}; use crate::s9pk::S9pk; -use crate::service::action::update_tasks; use crate::service::rpc::{ExitParams, InitKind}; use crate::service::service_map::InstallProgressHandles; use crate::service::uninstall::cleanup; @@ -238,8 +237,7 @@ impl Service { async fn recheck_tasks(&self) -> Result<(), Error> { let service_id = &self.seed.id; let peek = self.seed.ctx.db.peek().await; - let mut action_input: BTreeMap = BTreeMap::new(); - let tasks: BTreeSet<_> = peek + let action_ids: BTreeSet<_> = peek .as_public() .as_package_data() .as_entries()? @@ -266,29 +264,16 @@ impl Service { .flatten_ok() .map(|a| a.and_then(|a| a)) .try_collect()?; - let procedure_id = Guid::new(); - for action_id in tasks { - if let Some(input) = self - .get_action_input(procedure_id.clone(), action_id.clone(), Value::Null) - .await - .log_err() - .flatten() - .and_then(|i| i.value) - { - action_input.insert(action_id, input); - } + drop(peek); + for action_id in action_ids { + self.seed.eval_action_tasks(&action_id, false).await?; } + // Defensive sweep: stop any package that still has an active critical task. + // This catches tasks that were already active before this init cycle. self.seed .ctx .db .mutate(|db| { - for (action_id, input) in &action_input { - for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? { - pde.as_tasks_mut().mutate(|tasks| { - Ok(update_tasks(tasks, service_id, action_id, input, false)) - })?; - } - } for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? { if pde .as_tasks()