Revert "fix: RunAction task re-evaluation compared against partial input, not full config"

also apply alternative fix: only re-activate a task that explicitly conflicts with a run action's input

This reverts commit 2999d22d2a.
This commit is contained in:
Aiden McClelland
2026-03-20 16:35:09 -06:00
parent 8bccffcb5c
commit f5bfbe0465
2 changed files with 55 additions and 53 deletions

View File

@@ -9,7 +9,7 @@ use crate::db::model::package::{
};
use crate::prelude::*;
use crate::rpc_continuations::Guid;
use crate::service::{ProcedureName, Service, ServiceActor, ServiceActorSeed};
use crate::service::{ProcedureName, Service, ServiceActor};
use crate::util::actor::background::BackgroundJobQueue;
use crate::util::actor::{ConflictBuilder, Handler};
use crate::util::serde::is_partial_of;
@@ -83,6 +83,22 @@ impl Service {
}
}
fn conflicts(left: &Value, right: &Value) -> bool {
match (left, right) {
(Value::Object(left), Value::Object(right)) => left.iter().any(|(k, v)| {
if let Some(v_right) = right.get(k) {
conflicts(v, v_right)
} else {
false
}
}),
(Value::Array(left), Value::Array(right)) => left
.iter()
.any(|v| right.iter().all(|v_right| conflicts(v, v_right))),
(_, _) => left != right,
}
}
pub fn update_tasks(
tasks: &mut BTreeMap<ReplayId, TaskEntry>,
package_id: &PackageId,
@@ -105,7 +121,7 @@ pub fn update_tasks(
} else {
v.active = false;
}
} else {
} else if conflicts(value, input) {
v.active = true;
if v.task.severity == TaskSeverity::Critical {
critical_activated = true;
@@ -131,50 +147,6 @@ pub fn update_tasks(
critical_activated
}
impl ServiceActorSeed {
/// Fetch the current action input and re-evaluate all tasks targeting this action.
///
/// This is the single source of truth for task re-evaluation. Used both after
/// running an action and during service init (recheck_tasks).
pub(super) async fn eval_action_tasks(
&self,
action_id: &ActionId,
was_run: bool,
) -> Result<(), Error> {
let package_id = &self.id;
let current_input = self
.persistent_container
.execute::<Option<ActionInput>>(
Guid::new(),
ProcedureName::GetActionInput(action_id.clone()),
json!({ "prefill": Value::Null }),
Some(Duration::from_secs(30)),
)
.await
.log_err()
.flatten()
.and_then(|ai| ai.value);
let Some(input) = current_input else {
return Ok(());
};
self.ctx
.db
.mutate(|db| {
for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? {
if pde.as_tasks_mut().mutate(|tasks| {
Ok(update_tasks(tasks, package_id, action_id, &input, was_run))
})? {
pde.as_status_info_mut().stop()?;
}
}
Ok(())
})
.await
.result?;
Ok(())
}
}
pub(super) struct RunAction {
action_id: ActionId,
input: Value,
@@ -247,7 +219,22 @@ impl Handler<RunAction> for ServiceActor {
)
.await
.with_kind(ErrorKind::Action)?;
self.0.eval_action_tasks(action_id, true).await?;
let package_id = package_id.clone();
self.0
.ctx
.db
.mutate(|db| {
for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? {
if pde.as_tasks_mut().mutate(|tasks| {
Ok(update_tasks(tasks, &package_id, action_id, &input, true))
})? {
pde.as_status_info_mut().stop()?;
}
}
Ok(())
})
.await
.result?;
Ok(result)
}
}

View File

@@ -39,6 +39,7 @@ use crate::lxc::ContainerId;
use crate::prelude::*;
use crate::rpc_continuations::{Guid, RpcContinuation};
use crate::s9pk::S9pk;
use crate::service::action::update_tasks;
use crate::service::rpc::{ExitParams, InitKind};
use crate::service::service_map::InstallProgressHandles;
use crate::service::uninstall::cleanup;
@@ -237,7 +238,8 @@ impl Service {
async fn recheck_tasks(&self) -> Result<(), Error> {
let service_id = &self.seed.id;
let peek = self.seed.ctx.db.peek().await;
let action_ids: BTreeSet<_> = peek
let mut action_input: BTreeMap<ActionId, Value> = BTreeMap::new();
let tasks: BTreeSet<_> = peek
.as_public()
.as_package_data()
.as_entries()?
@@ -264,16 +266,29 @@ impl Service {
.flatten_ok()
.map(|a| a.and_then(|a| a))
.try_collect()?;
drop(peek);
for action_id in action_ids {
self.seed.eval_action_tasks(&action_id, false).await?;
let procedure_id = Guid::new();
for action_id in tasks {
if let Some(input) = self
.get_action_input(procedure_id.clone(), action_id.clone(), Value::Null)
.await
.log_err()
.flatten()
.and_then(|i| i.value)
{
action_input.insert(action_id, input);
}
}
// Defensive sweep: stop any package that still has an active critical task.
// This catches tasks that were already active before this init cycle.
self.seed
.ctx
.db
.mutate(|db| {
for (action_id, input) in &action_input {
for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? {
pde.as_tasks_mut().mutate(|tasks| {
Ok(update_tasks(tasks, service_id, action_id, input, false))
})?;
}
}
for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? {
if pde
.as_tasks()