diff --git a/core/src/service/action.rs b/core/src/service/action.rs index 772f0dfc6..814a3c94f 100644 --- a/core/src/service/action.rs +++ b/core/src/service/action.rs @@ -9,7 +9,7 @@ use crate::db::model::package::{ }; use crate::prelude::*; use crate::rpc_continuations::Guid; -use crate::service::{ProcedureName, Service, ServiceActor, ServiceActorSeed}; +use crate::service::{ProcedureName, Service, ServiceActor}; use crate::util::actor::background::BackgroundJobQueue; use crate::util::actor::{ConflictBuilder, Handler}; use crate::util::serde::is_partial_of; @@ -83,6 +83,22 @@ impl Service { } } +fn conflicts(left: &Value, right: &Value) -> bool { + match (left, right) { + (Value::Object(left), Value::Object(right)) => left.iter().any(|(k, v)| { + if let Some(v_right) = right.get(k) { + conflicts(v, v_right) + } else { + false + } + }), + (Value::Array(left), Value::Array(right)) => left + .iter() + .any(|v| right.iter().all(|v_right| conflicts(v, v_right))), + (_, _) => left != right, + } +} + pub fn update_tasks( tasks: &mut BTreeMap, package_id: &PackageId, @@ -105,7 +121,7 @@ pub fn update_tasks( } else { v.active = false; } - } else { + } else if conflicts(value, input) { v.active = true; if v.task.severity == TaskSeverity::Critical { critical_activated = true; @@ -131,50 +147,6 @@ pub fn update_tasks( critical_activated } -impl ServiceActorSeed { - /// Fetch the current action input and re-evaluate all tasks targeting this action. - /// - /// This is the single source of truth for task re-evaluation. Used both after - /// running an action and during service init (recheck_tasks). - pub(super) async fn eval_action_tasks( - &self, - action_id: &ActionId, - was_run: bool, - ) -> Result<(), Error> { - let package_id = &self.id; - let current_input = self - .persistent_container - .execute::>( - Guid::new(), - ProcedureName::GetActionInput(action_id.clone()), - json!({ "prefill": Value::Null }), - Some(Duration::from_secs(30)), - ) - .await - .log_err() - .flatten() - .and_then(|ai| ai.value); - let Some(input) = current_input else { - return Ok(()); - }; - self.ctx - .db - .mutate(|db| { - for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? { - if pde.as_tasks_mut().mutate(|tasks| { - Ok(update_tasks(tasks, package_id, action_id, &input, was_run)) - })? { - pde.as_status_info_mut().stop()?; - } - } - Ok(()) - }) - .await - .result?; - Ok(()) - } -} - pub(super) struct RunAction { action_id: ActionId, input: Value, @@ -247,7 +219,22 @@ impl Handler for ServiceActor { ) .await .with_kind(ErrorKind::Action)?; - self.0.eval_action_tasks(action_id, true).await?; + let package_id = package_id.clone(); + self.0 + .ctx + .db + .mutate(|db| { + for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? { + if pde.as_tasks_mut().mutate(|tasks| { + Ok(update_tasks(tasks, &package_id, action_id, &input, true)) + })? { + pde.as_status_info_mut().stop()?; + } + } + Ok(()) + }) + .await + .result?; Ok(result) } } diff --git a/core/src/service/mod.rs b/core/src/service/mod.rs index 05c6e56ea..9fbcfa300 100644 --- a/core/src/service/mod.rs +++ b/core/src/service/mod.rs @@ -39,6 +39,7 @@ use crate::lxc::ContainerId; use crate::prelude::*; use crate::rpc_continuations::{Guid, RpcContinuation}; use crate::s9pk::S9pk; +use crate::service::action::update_tasks; use crate::service::rpc::{ExitParams, InitKind}; use crate::service::service_map::InstallProgressHandles; use crate::service::uninstall::cleanup; @@ -237,7 +238,8 @@ impl Service { async fn recheck_tasks(&self) -> Result<(), Error> { let service_id = &self.seed.id; let peek = self.seed.ctx.db.peek().await; - let action_ids: BTreeSet<_> = peek + let mut action_input: BTreeMap = BTreeMap::new(); + let tasks: BTreeSet<_> = peek .as_public() .as_package_data() .as_entries()? @@ -264,16 +266,29 @@ impl Service { .flatten_ok() .map(|a| a.and_then(|a| a)) .try_collect()?; - drop(peek); - for action_id in action_ids { - self.seed.eval_action_tasks(&action_id, false).await?; + let procedure_id = Guid::new(); + for action_id in tasks { + if let Some(input) = self + .get_action_input(procedure_id.clone(), action_id.clone(), Value::Null) + .await + .log_err() + .flatten() + .and_then(|i| i.value) + { + action_input.insert(action_id, input); + } } - // Defensive sweep: stop any package that still has an active critical task. - // This catches tasks that were already active before this init cycle. self.seed .ctx .db .mutate(|db| { + for (action_id, input) in &action_input { + for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? { + pde.as_tasks_mut().mutate(|tasks| { + Ok(update_tasks(tasks, service_id, action_id, input, false)) + })?; + } + } for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? { if pde .as_tasks()