Refactor/actions (#2733)

* store, properties, manifest

* interfaces

* init and backups

* fix init and backups

* file models

* more versions

* dependencies

* config except dynamic types

* clean up config

* remove disabled from non-dynamic vaues

* actions

* standardize example code block formats

* wip: actions refactor

Co-authored-by: Jade <Blu-J@users.noreply.github.com>

* commit types

* fix types

* update types

* update action request type

* update apis

* add description to actionrequest

* clean up imports

* revert package json

* chore: Remove the recursive to the index

* chore: Remove the other thing I was testing

* flatten action requests

* update container runtime with new config paradigm

* new actions strategy

* seems to be working

* misc backend fixes

* fix fe bugs

* only show breakages if breakages

* only show success modal if result

* don't panic on failed removal

* hide config from actions page

* polyfill autoconfig

* use metadata strategy for actions instead of prev

* misc fixes

* chore: split the sdk into 2 libs (#2736)

* follow sideload progress (#2718)

* follow sideload progress

* small bugfix

* shareReplay with no refcount false

* don't wrap sideload progress in RPCResult

* dont present toast

---------

Co-authored-by: Aiden McClelland <me@drbonez.dev>

* chore: Add the initial of the creation of the two sdk

* chore: Add in the baseDist

* chore: Add in the baseDist

* chore: Get the web and the runtime-container running

* chore: Remove the empty file

* chore: Fix it so the container-runtime works

---------

Co-authored-by: Matt Hill <MattDHill@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>

* misc fixes

* update todos

* minor clean up

* fix link script

* update node version in CI test

* fix node version syntax in ci build

* wip: fixing callbacks

* fix sdk makefile dependencies

* add support for const outside of main

* update apis

* don't panic!

* Chore: Capture weird case on rpc, and log that

* fix procedure id issue

* pass input value for dep auto config

* handle disabled and warning for actions

* chore: Fix for link not having node_modules

* sdk fixes

* fix build

* fix build

* fix build

---------

Co-authored-by: Matt Hill <mattnine@protonmail.com>
Co-authored-by: Jade <Blu-J@users.noreply.github.com>
Co-authored-by: J H <dragondef@gmail.com>
Co-authored-by: Jade <2364004+Blu-J@users.noreply.github.com>
Co-authored-by: Matt Hill <MattDHill@users.noreply.github.com>
This commit is contained in:
Aiden McClelland
2024-09-25 16:12:52 -06:00
committed by GitHub
parent eec5cf6b65
commit db0695126f
469 changed files with 16218 additions and 10485 deletions

View File

@@ -1,15 +1,76 @@
use clap::Parser;
use std::fmt;
use clap::{CommandFactory, FromArgMatches, Parser};
pub use models::ActionId;
use models::PackageId;
use qrcode::QrCode;
use rpc_toolkit::{from_fn_async, Context, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
use tracing::instrument;
use ts_rs::TS;
use crate::config::Config;
use crate::context::RpcContext;
use crate::context::{CliContext, RpcContext};
use crate::prelude::*;
use crate::rpc_continuations::Guid;
use crate::util::serde::{display_serializable, StdinDeserializable, WithIoFormat};
use crate::util::serde::{
display_serializable, HandlerExtSerde, StdinDeserializable, WithIoFormat,
};
pub fn action_api<C: Context>() -> ParentHandler<C> {
ParentHandler::new()
.subcommand(
"get-input",
from_fn_async(get_action_input)
.with_display_serializable()
.with_call_remote::<CliContext>(),
)
.subcommand(
"run",
from_fn_async(run_action)
.with_display_serializable()
.with_custom_display_fn(|_, res| {
if let Some(res) = res {
println!("{res}")
}
Ok(())
})
.with_call_remote::<CliContext>(),
)
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[ts(export)]
#[serde(rename_all = "camelCase")]
pub struct ActionInput {
#[ts(type = "Record<string, unknown>")]
pub spec: Value,
#[ts(type = "Record<string, unknown> | null")]
pub value: Option<Value>,
}
#[derive(Deserialize, Serialize, TS, Parser)]
#[serde(rename_all = "camelCase")]
pub struct GetActionInputParams {
pub package_id: PackageId,
pub action_id: ActionId,
}
#[instrument(skip_all)]
pub async fn get_action_input(
ctx: RpcContext,
GetActionInputParams {
package_id,
action_id,
}: GetActionInputParams,
) -> Result<Option<ActionInput>, Error> {
ctx.services
.get(&package_id)
.await
.as_ref()
.or_not_found(lazy_format!("Manager for {}", package_id))?
.get_action_input(Guid::new(), action_id)
.await
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "version")]
@@ -17,6 +78,13 @@ pub enum ActionResult {
#[serde(rename = "0")]
V0(ActionResultV0),
}
impl fmt::Display for ActionResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::V0(res) => res.fmt(f),
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ActionResultV0 {
@@ -25,63 +93,111 @@ pub struct ActionResultV0 {
pub copyable: bool,
pub qr: bool,
}
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum DockerStatus {
Running,
Stopped,
impl fmt::Display for ActionResultV0 {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.message)?;
if let Some(value) = &self.value {
write!(f, ":\n{value}")?;
if self.qr {
use qrcode::render::unicode;
write!(
f,
"\n{}",
QrCode::new(value.as_bytes())
.unwrap()
.render::<unicode::Dense1x2>()
.build()
)?;
}
}
Ok(())
}
}
pub fn display_action_result(params: WithIoFormat<ActionParams>, result: ActionResult) {
pub fn display_action_result<T: Serialize>(params: WithIoFormat<T>, result: Option<ActionResult>) {
let Some(result) = result else {
return;
};
if let Some(format) = params.format {
return display_serializable(format, result);
}
match result {
ActionResult::V0(ar) => {
println!(
"{}: {}",
ar.message,
serde_json::to_string(&ar.value).unwrap()
);
}
}
println!("{result}")
}
#[derive(Deserialize, Serialize, Parser, TS)]
#[derive(Deserialize, Serialize, TS)]
#[serde(rename_all = "camelCase")]
#[command(rename_all = "kebab-case")]
pub struct ActionParams {
#[arg(id = "id")]
#[serde(rename = "id")]
pub struct RunActionParams {
pub package_id: PackageId,
pub action_id: ActionId,
#[ts(optional, type = "any")]
pub input: Option<Value>,
}
#[derive(Parser)]
struct CliRunActionParams {
pub package_id: PackageId,
pub action_id: ActionId,
#[command(flatten)]
#[ts(type = "{ [key: string]: any } | null")]
#[serde(default)]
pub input: StdinDeserializable<Option<Config>>,
pub input: StdinDeserializable<Option<Value>>,
}
impl From<CliRunActionParams> for RunActionParams {
fn from(
CliRunActionParams {
package_id,
action_id,
input,
}: CliRunActionParams,
) -> Self {
Self {
package_id,
action_id,
input: input.0,
}
}
}
impl CommandFactory for RunActionParams {
fn command() -> clap::Command {
CliRunActionParams::command()
}
fn command_for_update() -> clap::Command {
CliRunActionParams::command_for_update()
}
}
impl FromArgMatches for RunActionParams {
fn from_arg_matches(matches: &clap::ArgMatches) -> Result<Self, clap::Error> {
CliRunActionParams::from_arg_matches(matches).map(Self::from)
}
fn from_arg_matches_mut(matches: &mut clap::ArgMatches) -> Result<Self, clap::Error> {
CliRunActionParams::from_arg_matches_mut(matches).map(Self::from)
}
fn update_from_arg_matches(&mut self, matches: &clap::ArgMatches) -> Result<(), clap::Error> {
*self = CliRunActionParams::from_arg_matches(matches).map(Self::from)?;
Ok(())
}
fn update_from_arg_matches_mut(
&mut self,
matches: &mut clap::ArgMatches,
) -> Result<(), clap::Error> {
*self = CliRunActionParams::from_arg_matches_mut(matches).map(Self::from)?;
Ok(())
}
}
// impl C
// #[command(about = "Executes an action", display(display_action_result))]
#[instrument(skip_all)]
pub async fn action(
pub async fn run_action(
ctx: RpcContext,
ActionParams {
RunActionParams {
package_id,
action_id,
input: StdinDeserializable(input),
}: ActionParams,
) -> Result<ActionResult, Error> {
input,
}: RunActionParams,
) -> Result<Option<ActionResult>, Error> {
ctx.services
.get(&package_id)
.await
.as_ref()
.or_not_found(lazy_format!("Manager for {}", package_id))?
.action(
Guid::new(),
action_id,
input.map(|c| to_value(&c)).transpose()?.unwrap_or_default(),
)
.run_action(Guid::new(), action_id, input.unwrap_or_default())
.await
}

View File

@@ -141,7 +141,7 @@ impl Drop for BackupStatusGuard {
.ser(&None)
})
.await
.unwrap()
.log_err()
});
}
}

View File

@@ -9,7 +9,7 @@ use digest::generic_array::GenericArray;
use digest::OutputSizeUser;
use exver::Version;
use imbl_value::InternedString;
use models::PackageId;
use models::{FromStrParser, PackageId};
use rpc_toolkit::{from_fn_async, Context, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
use sha2::Sha256;
@@ -27,7 +27,6 @@ use crate::disk::mount::filesystem::{FileSystem, MountType, ReadWrite};
use crate::disk::mount::guard::{GenericMountGuard, TmpMountGuard};
use crate::disk::util::PartitionInfo;
use crate::prelude::*;
use crate::util::clap::FromStrParser;
use crate::util::serde::{
deserialize_from_str, display_serializable, serialize_display, HandlerExtSerde, WithIoFormat,
};

View File

@@ -1,22 +0,0 @@
use std::collections::{BTreeMap, BTreeSet};
use models::PackageId;
use serde::{Deserialize, Serialize};
use super::{Config, ConfigSpec};
#[allow(unused_imports)]
use crate::prelude::*;
use crate::status::health_check::HealthCheckId;
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ConfigRes {
pub config: Option<Config>,
pub spec: ConfigSpec,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SetResult {
pub depends_on: BTreeMap<PackageId, BTreeSet<HealthCheckId>>,
}

View File

@@ -1,281 +0,0 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Duration;
use clap::Parser;
use color_eyre::eyre::eyre;
use indexmap::{IndexMap, IndexSet};
use itertools::Itertools;
use models::{ErrorKind, OptionExt, PackageId};
use patch_db::value::InternedString;
use patch_db::Value;
use regex::Regex;
use rpc_toolkit::{from_fn_async, Context, Empty, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
use tracing::instrument;
use ts_rs::TS;
use crate::context::{CliContext, RpcContext};
use crate::prelude::*;
use crate::rpc_continuations::Guid;
use crate::util::serde::{HandlerExtSerde, StdinDeserializable};
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct ConfigSpec(pub IndexMap<InternedString, Value>);
pub mod action;
pub mod util;
use util::NumRange;
use self::action::ConfigRes;
pub type Config = patch_db::value::InOMap<InternedString, Value>;
pub trait TypeOf {
fn type_of(&self) -> &'static str;
}
impl TypeOf for Value {
fn type_of(&self) -> &'static str {
match self {
Value::Array(_) => "list",
Value::Bool(_) => "boolean",
Value::Null => "null",
Value::Number(_) => "number",
Value::Object(_) => "object",
Value::String(_) => "string",
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum ConfigurationError {
#[error("Timeout Error")]
TimeoutError(#[from] TimeoutError),
#[error("No Match: {0}")]
NoMatch(#[from] NoMatchWithPath),
#[error("System Error: {0}")]
SystemError(Error),
}
impl From<ConfigurationError> for Error {
fn from(err: ConfigurationError) -> Self {
let kind = match &err {
ConfigurationError::SystemError(e) => e.kind,
_ => crate::ErrorKind::ConfigGen,
};
crate::Error::new(err, kind)
}
}
#[derive(Clone, Copy, Debug, thiserror::Error)]
#[error("Timeout Error")]
pub struct TimeoutError;
#[derive(Clone, Debug, thiserror::Error)]
pub struct NoMatchWithPath {
pub path: Vec<InternedString>,
pub error: MatchError,
}
impl NoMatchWithPath {
pub fn new(error: MatchError) -> Self {
NoMatchWithPath {
path: Vec::new(),
error,
}
}
pub fn prepend(mut self, seg: InternedString) -> Self {
self.path.push(seg);
self
}
}
impl std::fmt::Display for NoMatchWithPath {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {}", self.path.iter().rev().join("."), self.error)
}
}
impl From<NoMatchWithPath> for Error {
fn from(e: NoMatchWithPath) -> Self {
ConfigurationError::from(e).into()
}
}
#[derive(Clone, Debug, thiserror::Error)]
pub enum MatchError {
#[error("String {0:?} Does Not Match Pattern {1}")]
Pattern(Arc<String>, Regex),
#[error("String {0:?} Is Not In Enum {1:?}")]
Enum(Arc<String>, IndexSet<String>),
#[error("Field Is Not Nullable")]
NotNullable,
#[error("Length Mismatch: expected {0}, actual: {1}")]
LengthMismatch(NumRange<usize>, usize),
#[error("Invalid Type: expected {0}, actual: {1}")]
InvalidType(&'static str, &'static str),
#[error("Number Out Of Range: expected {0}, actual: {1}")]
OutOfRange(NumRange<f64>, f64),
#[error("Number Is Not Integral: {0}")]
NonIntegral(f64),
#[error("Variant {0:?} Is Not In Union {1:?}")]
Union(Arc<String>, IndexSet<String>),
#[error("Variant Is Missing Tag {0:?}")]
MissingTag(InternedString),
#[error("Property {0:?} Of Variant {1:?} Conflicts With Union Tag")]
PropertyMatchesUnionTag(InternedString, String),
#[error("Name of Property {0:?} Conflicts With Map Tag Name")]
PropertyNameMatchesMapTag(String),
#[error("Object Key Is Invalid: {0}")]
InvalidKey(String),
#[error("Value In List Is Not Unique")]
ListUniquenessViolation,
}
#[derive(Deserialize, Serialize, Parser, TS)]
#[serde(rename_all = "camelCase")]
#[command(rename_all = "kebab-case")]
pub struct ConfigParams {
pub id: PackageId,
}
// #[command(subcommands(get, set))]
pub fn config<C: Context>() -> ParentHandler<C, ConfigParams> {
ParentHandler::new()
.subcommand(
"get",
from_fn_async(get)
.with_inherited(|ConfigParams { id }, _| id)
.with_display_serializable()
.with_call_remote::<CliContext>(),
)
.subcommand(
"set",
set::<C>().with_inherited(|ConfigParams { id }, _| id),
)
}
#[instrument(skip_all)]
pub async fn get(ctx: RpcContext, _: Empty, id: PackageId) -> Result<ConfigRes, Error> {
ctx.services
.get(&id)
.await
.as_ref()
.or_not_found(lazy_format!("Manager for {id}"))?
.get_config(Guid::new())
.await
}
#[derive(Deserialize, Serialize, Parser, TS)]
#[serde(rename_all = "camelCase")]
pub struct SetParams {
#[arg(long = "timeout")]
pub timeout: Option<crate::util::serde::Duration>,
#[command(flatten)]
#[ts(type = "{ [key: string]: any } | null")]
pub config: StdinDeserializable<Option<Config>>,
}
// #[command(
// subcommands(self(set_impl(async, context(RpcContext))), set_dry),
// display(display_none),
// metadata(sync_db = true)
// )]
#[instrument(skip_all)]
pub fn set<C: Context>() -> ParentHandler<C, SetParams, PackageId> {
ParentHandler::new()
.root_handler(
from_fn_async(set_impl)
.with_metadata("sync_db", Value::Bool(true))
.with_inherited(|set_params, id| (id, set_params))
.no_display()
.with_call_remote::<CliContext>(),
)
.subcommand(
"dry",
from_fn_async(set_dry)
.with_inherited(|set_params, id| (id, set_params))
.no_display()
.with_call_remote::<CliContext>(),
)
}
pub async fn set_dry(
ctx: RpcContext,
_: Empty,
(
id,
SetParams {
timeout,
config: StdinDeserializable(config),
},
): (PackageId, SetParams),
) -> Result<BTreeSet<PackageId>, Error> {
let mut breakages = BTreeSet::new();
let procedure_id = Guid::new();
let db = ctx.db.peek().await;
for dep in db
.as_public()
.as_package_data()
.as_entries()?
.into_iter()
.filter_map(
|(k, v)| match v.as_current_dependencies().contains_key(&id) {
Ok(true) => Some(Ok(k)),
Ok(false) => None,
Err(e) => Some(Err(e)),
},
)
{
let dep_id = dep?;
let Some(dependent) = &*ctx.services.get(&dep_id).await else {
continue;
};
if dependent
.dependency_config(procedure_id.clone(), id.clone(), config.clone())
.await?
.is_some()
{
breakages.insert(dep_id);
}
}
Ok(breakages)
}
#[derive(Default)]
pub struct ConfigureContext {
pub timeout: Option<Duration>,
pub config: Option<Config>,
}
#[instrument(skip_all)]
pub async fn set_impl(
ctx: RpcContext,
_: Empty,
(
id,
SetParams {
timeout,
config: StdinDeserializable(config),
},
): (PackageId, SetParams),
) -> Result<(), Error> {
let configure_context = ConfigureContext {
timeout: timeout.map(|t| *t),
config,
};
ctx.services
.get(&id)
.await
.as_ref()
.ok_or_else(|| {
Error::new(
eyre!("There is no manager running for {id}"),
ErrorKind::Unknown,
)
})?
.configure(Guid::new(), configure_context)
.await?;
Ok(())
}

View File

@@ -1,406 +0,0 @@
use std::borrow::Cow;
use std::ops::{Bound, RangeBounds, RangeInclusive};
use patch_db::Value;
use rand::distributions::Distribution;
use rand::Rng;
use super::Config;
pub const STATIC_NULL: Value = Value::Null;
#[derive(Clone, Debug)]
pub struct CharSet(pub Vec<(RangeInclusive<char>, usize)>, usize);
impl CharSet {
pub fn contains(&self, c: &char) -> bool {
self.0.iter().any(|r| r.0.contains(c))
}
pub fn gen<R: Rng>(&self, rng: &mut R) -> char {
let mut idx = rng.gen_range(0..self.1);
for r in &self.0 {
if idx < r.1 {
return std::convert::TryFrom::try_from(
rand::distributions::Uniform::new_inclusive(
u32::from(*r.0.start()),
u32::from(*r.0.end()),
)
.sample(rng),
)
.unwrap();
} else {
idx -= r.1;
}
}
unreachable!()
}
}
impl Default for CharSet {
fn default() -> Self {
CharSet(vec![('!'..='~', 94)], 94)
}
}
impl<'de> serde::de::Deserialize<'de> for CharSet {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::de::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
let mut res = Vec::new();
let mut len = 0;
let mut a: Option<char> = None;
let mut b: Option<char> = None;
let mut in_range = false;
for c in s.chars() {
match c {
',' => match (a, b, in_range) {
(Some(start), Some(end), _) => {
if !end.is_ascii() {
return Err(serde::de::Error::custom("Invalid Character"));
}
if start >= end {
return Err(serde::de::Error::custom("Invalid Bounds"));
}
let l = u32::from(end) - u32::from(start) + 1;
res.push((start..=end, l as usize));
len += l as usize;
a = None;
b = None;
in_range = false;
}
(Some(start), None, false) => {
len += 1;
res.push((start..=start, 1));
a = None;
}
(Some(_), None, true) => {
b = Some(',');
}
(None, None, false) => {
a = Some(',');
}
_ => {
return Err(serde::de::Error::custom("Syntax Error"));
}
},
'-' => {
if a.is_none() {
a = Some('-');
} else if !in_range {
in_range = true;
} else if b.is_none() {
b = Some('-')
} else {
return Err(serde::de::Error::custom("Syntax Error"));
}
}
_ => {
if a.is_none() {
a = Some(c);
} else if in_range && b.is_none() {
b = Some(c);
} else {
return Err(serde::de::Error::custom("Syntax Error"));
}
}
}
}
match (a, b) {
(Some(start), Some(end)) => {
if !end.is_ascii() {
return Err(serde::de::Error::custom("Invalid Character"));
}
if start >= end {
return Err(serde::de::Error::custom("Invalid Bounds"));
}
let l = u32::from(end) - u32::from(start) + 1;
res.push((start..=end, l as usize));
len += l as usize;
}
(Some(c), None) => {
len += 1;
res.push((c..=c, 1));
}
_ => (),
}
Ok(CharSet(res, len))
}
}
impl serde::ser::Serialize for CharSet {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
<&str>::serialize(
&self
.0
.iter()
.map(|r| match r.1 {
1 => format!("{}", r.0.start()),
_ => format!("{}-{}", r.0.start(), r.0.end()),
})
.collect::<Vec<_>>()
.join(",")
.as_str(),
serializer,
)
}
}
pub trait MergeWith {
fn merge_with(&mut self, other: &serde_json::Value);
}
impl MergeWith for serde_json::Value {
fn merge_with(&mut self, other: &serde_json::Value) {
use serde_json::Value::Object;
if let (Object(orig), Object(ref other)) = (self, other) {
for (key, val) in other.into_iter() {
match (orig.get_mut(key), val) {
(Some(new_orig @ Object(_)), other @ Object(_)) => {
new_orig.merge_with(other);
}
(None, _) => {
orig.insert(key.clone(), val.clone());
}
_ => (),
}
}
}
}
}
#[test]
fn merge_with_tests() {
use serde_json::json;
let mut a = json!(
{"a": 1, "c": {"d": "123"}, "i": [1,2,3], "j": {}, "k":[1,2,3], "l": "test"}
);
a.merge_with(
&json!({"a":"a", "b": "b", "c":{"d":"d", "e":"e"}, "f":{"g":"g"}, "h": [1,2,3], "i":"i", "j":[1,2,3], "k":{}}),
);
assert_eq!(
a,
json!({"a": 1, "c": {"d": "123", "e":"e"}, "b":"b", "f": {"g":"g"}, "h":[1,2,3], "i":[1,2,3], "j": {}, "k":[1,2,3], "l": "test"})
)
}
pub mod serde_regex {
use regex::Regex;
use serde::*;
pub fn serialize<S>(regex: &Regex, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
<&str>::serialize(&regex.as_str(), serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Regex, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
Regex::new(&s).map_err(|e| de::Error::custom(e))
}
}
#[derive(Clone, Debug)]
pub struct NumRange<T: std::str::FromStr + std::fmt::Display + std::cmp::PartialOrd>(
pub (Bound<T>, Bound<T>),
);
impl<T> std::ops::Deref for NumRange<T>
where
T: std::str::FromStr + std::fmt::Display + std::cmp::PartialOrd,
{
type Target = (Bound<T>, Bound<T>);
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<'de, T> serde::de::Deserialize<'de> for NumRange<T>
where
T: std::str::FromStr + std::fmt::Display + std::cmp::PartialOrd,
<T as std::str::FromStr>::Err: std::fmt::Display,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::de::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
let mut split = s.split(",");
let start = split
.next()
.map(|s| match s.get(..1) {
Some("(") => match s.get(1..2) {
Some("*") => Ok(Bound::Unbounded),
_ => s[1..]
.trim()
.parse()
.map(Bound::Excluded)
.map_err(|e| serde::de::Error::custom(e)),
},
Some("[") => s[1..]
.trim()
.parse()
.map(Bound::Included)
.map_err(|e| serde::de::Error::custom(e)),
_ => Err(serde::de::Error::custom(format!(
"Could not parse left bound: {}",
s
))),
})
.transpose()?
.unwrap();
let end = split
.next()
.map(|s| match s.get(s.len() - 1..) {
Some(")") => match s.get(s.len() - 2..s.len() - 1) {
Some("*") => Ok(Bound::Unbounded),
_ => s[..s.len() - 1]
.trim()
.parse()
.map(Bound::Excluded)
.map_err(|e| serde::de::Error::custom(e)),
},
Some("]") => s[..s.len() - 1]
.trim()
.parse()
.map(Bound::Included)
.map_err(|e| serde::de::Error::custom(e)),
_ => Err(serde::de::Error::custom(format!(
"Could not parse right bound: {}",
s
))),
})
.transpose()?
.unwrap_or(Bound::Unbounded);
Ok(NumRange((start, end)))
}
}
impl<T> std::fmt::Display for NumRange<T>
where
T: std::str::FromStr + std::fmt::Display + std::cmp::PartialOrd,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.start_bound() {
Bound::Excluded(n) => write!(f, "({},", n)?,
Bound::Included(n) => write!(f, "[{},", n)?,
Bound::Unbounded => write!(f, "(*,")?,
};
match self.end_bound() {
Bound::Excluded(n) => write!(f, "{})", n),
Bound::Included(n) => write!(f, "{}]", n),
Bound::Unbounded => write!(f, "*)"),
}
}
}
impl<T> serde::ser::Serialize for NumRange<T>
where
T: std::str::FromStr + std::fmt::Display + std::cmp::PartialOrd,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
<&str>::serialize(&format!("{}", self).as_str(), serializer)
}
}
#[derive(Clone, Debug)]
pub enum UniqueBy {
Any(Vec<UniqueBy>),
All(Vec<UniqueBy>),
Exactly(String),
NotUnique,
}
impl UniqueBy {
pub fn eq(&self, lhs: &Config, rhs: &Config) -> bool {
match self {
UniqueBy::Any(any) => any.iter().any(|u| u.eq(lhs, rhs)),
UniqueBy::All(all) => all.iter().all(|u| u.eq(lhs, rhs)),
UniqueBy::Exactly(key) => lhs.get(&**key) == rhs.get(&**key),
UniqueBy::NotUnique => false,
}
}
}
impl Default for UniqueBy {
fn default() -> Self {
UniqueBy::NotUnique
}
}
impl<'de> serde::de::Deserialize<'de> for UniqueBy {
fn deserialize<D: serde::de::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
struct Visitor;
impl<'de> serde::de::Visitor<'de> for Visitor {
type Value = UniqueBy;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "a key, an \"any\" object, or an \"all\" object")
}
fn visit_str<E: serde::de::Error>(self, v: &str) -> Result<Self::Value, E> {
Ok(UniqueBy::Exactly(v.to_owned()))
}
fn visit_string<E: serde::de::Error>(self, v: String) -> Result<Self::Value, E> {
Ok(UniqueBy::Exactly(v))
}
fn visit_map<A: serde::de::MapAccess<'de>>(
self,
mut map: A,
) -> Result<Self::Value, A::Error> {
let mut variant = None;
while let Some(key) = map.next_key::<Cow<str>>()? {
match key.as_ref() {
"any" => {
return Ok(UniqueBy::Any(map.next_value()?));
}
"all" => {
return Ok(UniqueBy::All(map.next_value()?));
}
_ => {
variant = Some(key);
}
}
}
Err(serde::de::Error::unknown_variant(
variant.unwrap_or_default().as_ref(),
&["any", "all"],
))
}
fn visit_unit<E: serde::de::Error>(self) -> Result<Self::Value, E> {
Ok(UniqueBy::NotUnique)
}
fn visit_none<E: serde::de::Error>(self) -> Result<Self::Value, E> {
Ok(UniqueBy::NotUnique)
}
}
deserializer.deserialize_any(Visitor)
}
}
impl serde::ser::Serialize for UniqueBy {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
use serde::ser::SerializeMap;
match self {
UniqueBy::Any(any) => {
let mut map = serializer.serialize_map(Some(1))?;
map.serialize_key("any")?;
map.serialize_value(any)?;
map.end()
}
UniqueBy::All(all) => {
let mut map = serializer.serialize_map(Some(1))?;
map.serialize_key("all")?;
map.serialize_value(all)?;
map.end()
}
UniqueBy::Exactly(key) => serializer.serialize_str(key),
UniqueBy::NotUnique => serializer.serialize_unit(),
}
}
}

View File

@@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::future::Future;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::ops::Deref;
@@ -9,8 +9,11 @@ use std::time::Duration;
use chrono::{TimeDelta, Utc};
use helpers::NonDetachingJoinHandle;
use imbl::OrdMap;
use imbl_value::InternedString;
use itertools::Itertools;
use josekit::jwk::Jwk;
use models::{ActionId, PackageId};
use reqwest::{Client, Proxy};
use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::{CallRemote, Context, Empty};
@@ -23,7 +26,6 @@ use crate::account::AccountInfo;
use crate::auth::Sessions;
use crate::context::config::ServerConfig;
use crate::db::model::Database;
use crate::dependencies::compute_dependency_config_errs;
use crate::disk::OsPartitionInfo;
use crate::init::check_time_is_synchronized;
use crate::lxc::{ContainerId, LxcContainer, LxcManager};
@@ -33,6 +35,7 @@ use crate::net::wifi::WpaCli;
use crate::prelude::*;
use crate::progress::{FullProgressTracker, PhaseProgressTrackerHandle};
use crate::rpc_continuations::{Guid, OpenAuthedContinuations, RpcContinuations};
use crate::service::action::update_requested_actions;
use crate::service::effects::callbacks::ServiceCallbacks;
use crate::service::ServiceMap;
use crate::shutdown::Shutdown;
@@ -100,14 +103,14 @@ impl InitRpcContextPhases {
pub struct CleanupInitPhases {
cleanup_sessions: PhaseProgressTrackerHandle,
init_services: PhaseProgressTrackerHandle,
check_dependencies: PhaseProgressTrackerHandle,
check_requested_actions: PhaseProgressTrackerHandle,
}
impl CleanupInitPhases {
pub fn new(handle: &FullProgressTracker) -> Self {
Self {
cleanup_sessions: handle.add_phase("Cleaning up sessions".into(), Some(1)),
init_services: handle.add_phase("Initializing services".into(), Some(10)),
check_dependencies: handle.add_phase("Checking dependencies".into(), Some(1)),
check_requested_actions: handle.add_phase("Checking action requests".into(), Some(1)),
}
}
}
@@ -309,7 +312,7 @@ impl RpcContext {
CleanupInitPhases {
mut cleanup_sessions,
init_services,
mut check_dependencies,
mut check_requested_actions,
}: CleanupInitPhases,
) -> Result<(), Error> {
cleanup_sessions.start();
@@ -366,35 +369,68 @@ impl RpcContext {
cleanup_sessions.complete();
self.services.init(&self, init_services).await?;
tracing::info!("Initialized Package Managers");
tracing::info!("Initialized Services");
check_dependencies.start();
let mut updated_current_dependents = BTreeMap::new();
// TODO
check_requested_actions.start();
let peek = self.db.peek().await;
for (package_id, package) in peek.as_public().as_package_data().as_entries()?.into_iter() {
let package = package.clone();
let mut current_dependencies = package.as_current_dependencies().de()?;
compute_dependency_config_errs(self, &package_id, &mut current_dependencies)
.await
.log_err();
updated_current_dependents.insert(package_id.clone(), current_dependencies);
let mut action_input: OrdMap<PackageId, BTreeMap<ActionId, Value>> = OrdMap::new();
let requested_actions: BTreeSet<_> = peek
.as_public()
.as_package_data()
.as_entries()?
.into_iter()
.map(|(_, pde)| {
Ok(pde
.as_requested_actions()
.as_entries()?
.into_iter()
.map(|(_, r)| {
Ok::<_, Error>((
r.as_request().as_package_id().de()?,
r.as_request().as_action_id().de()?,
))
}))
})
.flatten_ok()
.map(|a| a.and_then(|a| a))
.try_collect()?;
let procedure_id = Guid::new();
for (package_id, action_id) in requested_actions {
if let Some(service) = self.services.get(&package_id).await.as_ref() {
if let Some(input) = service
.get_action_input(procedure_id.clone(), action_id.clone())
.await?
.and_then(|i| i.value)
{
action_input
.entry(package_id)
.or_default()
.insert(action_id, input);
}
}
}
self.db
.mutate(|v| {
for (package_id, deps) in updated_current_dependents {
if let Some(model) = v
.as_public_mut()
.as_package_data_mut()
.as_idx_mut(&package_id)
.map(|i| i.as_current_dependencies_mut())
{
model.ser(&deps)?;
.mutate(|db| {
for (package_id, action_input) in &action_input {
for (action_id, input) in action_input {
for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? {
pde.as_requested_actions_mut().mutate(|requested_actions| {
Ok(update_requested_actions(
requested_actions,
package_id,
action_id,
input,
false,
))
})?;
}
}
}
Ok(())
})
.await?;
check_dependencies.complete();
check_requested_actions.complete();
Ok(())
}

View File

@@ -4,7 +4,8 @@ use chrono::{DateTime, Utc};
use exver::VersionRange;
use imbl_value::InternedString;
use models::{
ActionId, DataUrl, HealthCheckId, HostId, PackageId, ServiceInterfaceId, VersionString,
ActionId, DataUrl, HealthCheckId, HostId, PackageId, ReplayId, ServiceInterfaceId,
VersionString,
};
use patch_db::json_ptr::JsonPointer;
use patch_db::HasModel;
@@ -17,8 +18,8 @@ use crate::net::service_interface::ServiceInterface;
use crate::prelude::*;
use crate::progress::FullProgress;
use crate::s9pk::manifest::Manifest;
use crate::status::Status;
use crate::util::serde::Pem;
use crate::status::MainStatus;
use crate::util::serde::{is_partial_of, Pem};
#[derive(Debug, Default, Deserialize, Serialize, TS)]
#[ts(export)]
@@ -310,9 +311,9 @@ pub struct InstallingInfo {
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, TS)]
#[ts(export)]
#[serde(rename_all = "camelCase")]
#[serde(rename_all = "kebab-case")]
pub enum AllowedStatuses {
OnlyRunning, // onlyRunning
OnlyRunning,
OnlyStopped,
Any,
}
@@ -324,13 +325,28 @@ pub struct ActionMetadata {
pub name: String,
pub description: String,
pub warning: Option<String>,
#[ts(type = "any")]
pub input: Value,
pub disabled: bool,
#[serde(default)]
pub visibility: ActionVisibility,
pub allowed_statuses: AllowedStatuses,
pub has_input: bool,
pub group: Option<String>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, TS)]
#[ts(export)]
#[serde(rename_all = "kebab-case")]
#[serde(rename_all_fields = "camelCase")]
pub enum ActionVisibility {
Hidden,
Disabled { reason: String },
Enabled,
}
impl Default for ActionVisibility {
fn default() -> Self {
Self::Enabled
}
}
#[derive(Debug, Deserialize, Serialize, HasModel, TS)]
#[serde(rename_all = "camelCase")]
#[model = "Model<Self>"]
@@ -338,7 +354,7 @@ pub struct ActionMetadata {
pub struct PackageDataEntry {
pub state_info: PackageState,
pub data_version: Option<VersionString>,
pub status: Status,
pub status: MainStatus,
#[ts(type = "string | null")]
pub registry: Option<Url>,
#[ts(type = "string")]
@@ -348,6 +364,8 @@ pub struct PackageDataEntry {
pub last_backup: Option<DateTime<Utc>>,
pub current_dependencies: CurrentDependencies,
pub actions: BTreeMap<ActionId, ActionMetadata>,
#[ts(as = "BTreeMap::<String, ActionRequestEntry>")]
pub requested_actions: BTreeMap<ReplayId, ActionRequestEntry>,
pub service_interfaces: BTreeMap<ServiceInterfaceId, ServiceInterface>,
pub hosts: Hosts,
#[ts(type = "string[]")]
@@ -384,8 +402,9 @@ impl Map for CurrentDependencies {
}
}
#[derive(Clone, Debug, Deserialize, Serialize, TS)]
#[derive(Clone, Debug, Deserialize, Serialize, TS, HasModel)]
#[serde(rename_all = "camelCase")]
#[model = "Model<Self>"]
pub struct CurrentDependencyInfo {
#[ts(type = "string | null")]
pub title: Option<InternedString>,
@@ -394,11 +413,10 @@ pub struct CurrentDependencyInfo {
pub kind: CurrentDependencyKind,
#[ts(type = "string")]
pub version_range: VersionRange,
pub config_satisfied: bool,
}
#[derive(Clone, Debug, Deserialize, Serialize, TS)]
#[serde(rename_all = "camelCase")]
#[serde(rename_all = "kebab-case")]
#[serde(tag = "kind")]
pub enum CurrentDependencyKind {
Exists,
@@ -410,6 +428,66 @@ pub enum CurrentDependencyKind {
},
}
#[derive(Clone, Debug, Deserialize, Serialize, TS, HasModel)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
#[model = "Model<Self>"]
pub struct ActionRequestEntry {
pub request: ActionRequest,
pub active: bool,
}
#[derive(Clone, Debug, Deserialize, Serialize, TS, HasModel)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
#[model = "Model<Self>"]
pub struct ActionRequest {
pub package_id: PackageId,
pub action_id: ActionId,
#[ts(optional)]
pub description: Option<String>,
#[ts(optional)]
pub when: Option<ActionRequestTrigger>,
#[ts(optional)]
pub input: Option<ActionRequestInput>,
}
#[derive(Clone, Debug, Deserialize, Serialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct ActionRequestTrigger {
#[serde(default)]
pub once: bool,
pub condition: ActionRequestCondition,
}
#[derive(Clone, Debug, Deserialize, Serialize, TS)]
#[serde(rename_all = "kebab-case")]
#[ts(export)]
pub enum ActionRequestCondition {
InputNotMatches,
}
#[derive(Clone, Debug, Deserialize, Serialize, TS)]
#[serde(rename_all = "kebab-case")]
#[serde(tag = "kind")]
pub enum ActionRequestInput {
Partial {
#[ts(type = "Record<string, unknown>")]
value: Value,
},
}
impl ActionRequestInput {
pub fn matches(&self, input: Option<&Value>) -> bool {
match self {
Self::Partial { value } => match input {
None => false,
Some(full) => is_partial_of(value, full),
},
}
}
}
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct InterfaceAddressMap(pub BTreeMap<HostId, InterfaceAddresses>);
impl Map for InterfaceAddressMap {

View File

@@ -1,28 +1,14 @@
use std::collections::BTreeMap;
use std::time::Duration;
use clap::Parser;
use imbl_value::InternedString;
use models::PackageId;
use patch_db::json_patch::merge;
use rpc_toolkit::{from_fn_async, Context, Empty, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
use tracing::instrument;
use ts_rs::TS;
use crate::config::{Config, ConfigSpec, ConfigureContext};
use crate::context::{CliContext, RpcContext};
use crate::db::model::package::CurrentDependencies;
use crate::prelude::*;
use crate::rpc_continuations::Guid;
use crate::util::serde::HandlerExtSerde;
use crate::util::PathOrUrl;
use crate::Error;
pub fn dependency<C: Context>() -> ParentHandler<C> {
ParentHandler::new().subcommand("configure", configure::<C>())
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, HasModel, TS)]
#[model = "Model<Self>"]
#[ts(export)]
@@ -56,129 +42,3 @@ pub struct DependencyMetadata {
#[ts(type = "string")]
pub title: InternedString,
}
#[derive(Deserialize, Serialize, Parser, TS)]
#[serde(rename_all = "camelCase")]
#[command(rename_all = "kebab-case")]
pub struct ConfigureParams {
dependent_id: PackageId,
dependency_id: PackageId,
}
pub fn configure<C: Context>() -> ParentHandler<C, ConfigureParams> {
ParentHandler::new()
.root_handler(
from_fn_async(configure_impl)
.with_inherited(|params, _| params)
.no_display()
.with_call_remote::<CliContext>(),
)
.subcommand(
"dry",
from_fn_async(configure_dry)
.with_inherited(|params, _| params)
.with_display_serializable()
.with_call_remote::<CliContext>(),
)
}
pub async fn configure_impl(
ctx: RpcContext,
_: Empty,
ConfigureParams {
dependent_id,
dependency_id,
}: ConfigureParams,
) -> Result<(), Error> {
let ConfigDryRes {
old_config: _,
new_config,
spec: _,
} = configure_logic(ctx.clone(), (dependent_id, dependency_id.clone())).await?;
let configure_context = ConfigureContext {
timeout: Some(Duration::from_secs(3).into()),
config: Some(new_config),
};
ctx.services
.get(&dependency_id)
.await
.as_ref()
.ok_or_else(|| {
Error::new(
eyre!("There is no manager running for {dependency_id}"),
ErrorKind::Unknown,
)
})?
.configure(Guid::new(), configure_context)
.await?;
Ok(())
}
pub async fn configure_dry(
ctx: RpcContext,
_: Empty,
ConfigureParams {
dependent_id,
dependency_id,
}: ConfigureParams,
) -> Result<ConfigDryRes, Error> {
configure_logic(ctx.clone(), (dependent_id, dependency_id.clone())).await
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ConfigDryRes {
pub old_config: Config,
pub new_config: Config,
pub spec: ConfigSpec,
}
pub async fn configure_logic(
ctx: RpcContext,
(dependent_id, dependency_id): (PackageId, PackageId),
) -> Result<ConfigDryRes, Error> {
let procedure_id = Guid::new();
let dependency_guard = ctx.services.get(&dependency_id).await;
let dependency = dependency_guard.as_ref().or_not_found(&dependency_id)?;
let dependent_guard = ctx.services.get(&dependent_id).await;
let dependent = dependent_guard.as_ref().or_not_found(&dependent_id)?;
let config_res = dependency.get_config(procedure_id.clone()).await?;
let diff = Value::Object(
dependent
.dependency_config(procedure_id, dependency_id, config_res.config.clone())
.await?
.unwrap_or_default(),
);
let mut new_config = Value::Object(config_res.config.clone().unwrap_or_default());
merge(&mut new_config, &diff);
Ok(ConfigDryRes {
old_config: config_res.config.unwrap_or_default(),
new_config: new_config.as_object().cloned().unwrap_or_default(),
spec: config_res.spec,
})
}
#[instrument(skip_all)]
pub async fn compute_dependency_config_errs(
ctx: &RpcContext,
id: &PackageId,
current_dependencies: &mut CurrentDependencies,
) -> Result<(), Error> {
let procedure_id = Guid::new();
let service_guard = ctx.services.get(id).await;
let service = service_guard.as_ref().or_not_found(id)?;
for (dep_id, dep_info) in current_dependencies.0.iter_mut() {
// check if config passes dependency check
let Some(dependency) = &*ctx.services.get(dep_id).await else {
continue;
};
let dep_config = dependency.get_config(procedure_id.clone()).await?.config;
dep_info.config_satisfied = service
.dependency_config(procedure_id.clone(), dep_id.clone(), dep_config)
.await?
.is_none();
}
Ok(())
}

View File

@@ -219,10 +219,10 @@ impl<G: GenericMountGuard> Drop for BackupMountGuard<G> {
let second = self.backup_disk_mount_guard.take();
tokio::spawn(async move {
if let Some(guard) = first {
guard.unmount().await.unwrap();
guard.unmount().await.log_err();
}
if let Some(guard) = second {
guard.unmount().await.unwrap();
guard.unmount().await.log_err();
}
});
}

View File

@@ -151,12 +151,12 @@ impl<G: GenericMountGuard> Drop for OverlayGuard<G> {
let guard = self.inner_guard.take();
if lower.is_some() || upper.is_some() || guard.mounted {
tokio::spawn(async move {
guard.unmount(false).await.unwrap();
guard.unmount(false).await.log_err();
if let Some(lower) = lower {
lower.unmount().await.unwrap();
lower.unmount().await.log_err();
}
if let Some(upper) = upper {
upper.delete().await.unwrap();
upper.delete().await.log_err();
}
});
}

View File

@@ -96,7 +96,7 @@ impl Drop for MountGuard {
fn drop(&mut self) {
if self.mounted {
let mountpoint = std::mem::take(&mut self.mountpoint);
tokio::spawn(async move { unmount(mountpoint, true).await.unwrap() });
tokio::spawn(async move { unmount(mountpoint, true).await.log_err() });
}
}
}

View File

@@ -9,7 +9,7 @@ use exver::VersionRange;
use futures::{AsyncWriteExt, StreamExt};
use imbl_value::{json, InternedString};
use itertools::Itertools;
use models::VersionString;
use models::{FromStrParser, VersionString};
use reqwest::header::{HeaderMap, CONTENT_LENGTH};
use reqwest::Url;
use rpc_toolkit::yajrc::{GenericRpcMethod, RpcError};
@@ -30,7 +30,6 @@ use crate::registry::package::get::GetPackageResponse;
use crate::rpc_continuations::{Guid, RpcContinuation};
use crate::s9pk::manifest::PackageId;
use crate::upload::upload;
use crate::util::clap::FromStrParser;
use crate::util::io::open_file;
use crate::util::net::WebSocketExt;
use crate::util::Never;

View File

@@ -29,7 +29,6 @@ pub mod action;
pub mod auth;
pub mod backup;
pub mod bins;
pub mod config;
pub mod context;
pub mod control;
pub mod db;
@@ -70,7 +69,6 @@ pub mod volume;
use std::time::SystemTime;
use clap::Parser;
pub use config::Config;
pub use error::{Error, ErrorKind, ResultExt};
use imbl_value::Value;
use rpc_toolkit::yajrc::RpcError;
@@ -240,15 +238,7 @@ pub fn server<C: Context>() -> ParentHandler<C> {
pub fn package<C: Context>() -> ParentHandler<C> {
ParentHandler::new()
.subcommand(
"action",
from_fn_async(action::action)
.with_display_serializable()
.with_custom_display_fn(|handle, result| {
Ok(action::display_action_result(handle.params, result))
})
.with_call_remote::<CliContext>(),
)
.subcommand("action", action::action_api::<C>())
.subcommand(
"install",
from_fn_async(install::install)
@@ -281,7 +271,6 @@ pub fn package<C: Context>() -> ParentHandler<C> {
.with_display_serializable()
.with_call_remote::<CliContext>(),
)
.subcommand("config", config::config::<C>())
.subcommand(
"start",
from_fn_async(control::start)
@@ -316,7 +305,6 @@ pub fn package<C: Context>() -> ParentHandler<C> {
})
.with_call_remote::<CliContext>(),
)
.subcommand("dependency", dependencies::dependency::<C>())
.subcommand("backup", backup::package_backup::<C>())
.subcommand("connect", from_fn_async(service::connect_rpc).no_cli())
.subcommand(

View File

@@ -12,7 +12,7 @@ use color_eyre::eyre::eyre;
use futures::stream::BoxStream;
use futures::{Future, FutureExt, Stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use models::PackageId;
use models::{FromStrParser, PackageId};
use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::{
from_fn_async, CallRemote, Context, Empty, HandlerArgs, HandlerExt, HandlerFor, ParentHandler,
@@ -30,7 +30,6 @@ use crate::error::ResultExt;
use crate::lxc::ContainerId;
use crate::prelude::*;
use crate::rpc_continuations::{Guid, RpcContinuation, RpcContinuations};
use crate::util::clap::FromStrParser;
use crate::util::serde::Reversible;
use crate::util::Invoke;
@@ -487,14 +486,18 @@ fn logs_follow<
context,
inherited_params:
LogsParams {
extra, limit, boot, ..
extra,
cursor,
limit,
boot,
..
},
..
}: HandlerArgs<C, Empty, LogsParams<Extra>>| {
let f = f.clone();
async move {
let src = f.call(&context, extra).await?;
follow_logs(context, src, limit, boot.map(String::from)).await
follow_logs(context, src, cursor, limit, boot.map(String::from)).await
}
},
)
@@ -525,7 +528,7 @@ pub fn package_logs() -> ParentHandler<RpcContext, LogsParams<PackageIdParams>>
pub async fn journalctl(
id: LogSource,
limit: usize,
limit: Option<usize>,
cursor: Option<&str>,
boot: Option<&str>,
before: bool,
@@ -533,11 +536,12 @@ pub async fn journalctl(
) -> Result<LogStream, Error> {
let mut cmd = gen_journalctl_command(&id);
cmd.arg(format!("--lines={}", limit));
if let Some(limit) = limit {
cmd.arg(format!("--lines={}", limit));
}
let cursor_formatted = format!("--after-cursor={}", cursor.unwrap_or(""));
if cursor.is_some() {
cmd.arg(&cursor_formatted);
if let Some(cursor) = cursor {
cmd.arg(&format!("--after-cursor={}", cursor));
if before {
cmd.arg("--reverse");
}
@@ -638,8 +642,15 @@ pub async fn fetch_logs(
before: bool,
) -> Result<LogResponse, Error> {
let limit = limit.unwrap_or(50);
let mut stream =
journalctl(id, limit, cursor.as_deref(), boot.as_deref(), before, false).await?;
let mut stream = journalctl(
id,
Some(limit),
cursor.as_deref(),
boot.as_deref(),
before,
false,
)
.await?;
let mut entries = Vec::with_capacity(limit);
let mut start_cursor = None;
@@ -682,11 +693,16 @@ pub async fn fetch_logs(
pub async fn follow_logs<Context: AsRef<RpcContinuations>>(
ctx: Context,
id: LogSource,
cursor: Option<String>,
limit: Option<usize>,
boot: Option<String>,
) -> Result<LogFollowResponse, Error> {
let limit = limit.unwrap_or(50);
let mut stream = journalctl(id, limit, None, boot.as_deref(), false, true).await?;
let limit = if cursor.is_some() {
None
} else {
Some(limit.unwrap_or(50))
};
let mut stream = journalctl(id, limit, cursor.as_deref(), boot.as_deref(), false, true).await?;
let mut start_cursor = None;
let mut first_entry = None;

View File

@@ -7,7 +7,7 @@ use std::time::Duration;
use clap::builder::ValueParserFactory;
use futures::{AsyncWriteExt, StreamExt};
use imbl_value::{InOMap, InternedString};
use models::InvalidId;
use models::{FromStrParser, InvalidId};
use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::{GenericRpcMethod, RpcRequest, RpcResponse};
use rustyline_async::{ReadlineEvent, SharedWriter};
@@ -28,7 +28,6 @@ use crate::disk::mount::guard::{GenericMountGuard, MountGuard, TmpMountGuard};
use crate::disk::mount::util::unmount;
use crate::prelude::*;
use crate::rpc_continuations::{Guid, RpcContinuation};
use crate::util::clap::FromStrParser;
use crate::util::io::open_file;
use crate::util::rpc_client::UnixRpcClient;
use crate::util::{new_guid, Invoke};
@@ -365,7 +364,7 @@ impl Drop for LxcContainer {
tracing::error!("Error reading logs from crashed container: {e}");
tracing::debug!("{e:?}")
}
rootfs.unmount(true).await.unwrap();
rootfs.unmount(true).await.log_err();
drop(guid);
if let Err(e) = manager.gc().await {
tracing::error!("Error cleaning up dangling LXC containers: {e}");

View File

@@ -1,3 +1,7 @@
use std::str::FromStr;
use clap::builder::ValueParserFactory;
use models::{FromStrParser, HostId};
use serde::{Deserialize, Serialize};
use ts_rs::TS;
@@ -5,10 +9,37 @@ use crate::net::forward::AvailablePorts;
use crate::net::vhost::AlpnInfo;
use crate::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, TS)]
#[ts(export)]
#[serde(rename_all = "camelCase")]
pub struct BindId {
pub id: HostId,
pub internal_port: u16,
}
impl ValueParserFactory for BindId {
type Parser = FromStrParser<Self>;
fn value_parser() -> Self::Parser {
FromStrParser::new()
}
}
impl FromStr for BindId {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (id, port) = s
.split_once(":")
.ok_or_else(|| Error::new(eyre!("expected <id>:<port>"), ErrorKind::ParseUrl))?;
Ok(Self {
id: id.parse()?,
internal_port: port.parse()?,
})
}
}
#[derive(Debug, Deserialize, Serialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct BindInfo {
pub enabled: bool,
pub options: BindOptions,
pub lan: LanInfo,
}
@@ -30,6 +61,7 @@ impl BindInfo {
assigned_ssl_port = Some(available_ports.alloc()?);
}
Ok(Self {
enabled: true,
options,
lan: LanInfo {
assigned_port,
@@ -69,7 +101,14 @@ impl BindInfo {
available_ports.free([port]);
}
}
Ok(Self { options, lan })
Ok(Self {
enabled: true,
options,
lan,
})
}
pub fn disable(&mut self) {
self.enabled = false;
}
}

View File

@@ -15,8 +15,8 @@ use crate::hostname::Hostname;
use crate::net::dns::DnsController;
use crate::net::forward::LanPortForwardController;
use crate::net::host::address::HostAddress;
use crate::net::host::binding::{AddSslOptions, BindOptions, LanInfo};
use crate::net::host::{host_for, Host, HostKind};
use crate::net::host::binding::{AddSslOptions, BindId, BindOptions, LanInfo};
use crate::net::host::{host_for, Host, HostKind, Hosts};
use crate::net::service_interface::{HostnameInfo, IpHostname, OnionHostname};
use crate::net::tor::TorController;
use crate::net::vhost::{AlpnInfo, VHostController};
@@ -154,14 +154,16 @@ impl NetController {
) -> Result<NetService, Error> {
let dns = self.dns.add(Some(package.clone()), ip).await?;
Ok(NetService {
let mut res = NetService {
shutdown: false,
id: package,
ip,
dns,
controller: Arc::downgrade(self),
binds: BTreeMap::new(),
})
};
res.clear_bindings(Default::default()).await?;
Ok(res)
}
}
@@ -221,31 +223,41 @@ impl NetService {
self.update(id, host).await
}
pub async fn clear_bindings(&mut self) -> Result<(), Error> {
let ctrl = self.net_controller()?;
pub async fn clear_bindings(&mut self, except: BTreeSet<BindId>) -> Result<(), Error> {
let pkg_id = &self.id;
let hosts = self
.net_controller()?
.db
.mutate(|db| {
let mut res = Hosts::default();
for (host_id, host) in db
.as_public_mut()
.as_package_data_mut()
.as_idx_mut(pkg_id)
.or_not_found(pkg_id)?
.as_hosts_mut()
.as_entries_mut()?
{
host.as_bindings_mut().mutate(|b| {
for (internal_port, info) in b {
if !except.contains(&BindId {
id: host_id.clone(),
internal_port: *internal_port,
}) {
info.disable();
}
}
Ok(())
})?;
res.0.insert(host_id, host.de()?);
}
Ok(res)
})
.await?;
let mut errors = ErrorCollection::new();
for (_, binds) in std::mem::take(&mut self.binds) {
for (_, (lan, _, hostnames, rc)) in binds.lan {
drop(rc);
if let Some(external) = lan.assigned_ssl_port {
for hostname in ctrl.server_hostnames.iter().cloned() {
ctrl.vhost.gc(hostname, external).await?;
}
for hostname in hostnames {
ctrl.vhost.gc(Some(hostname), external).await?;
}
}
if let Some(external) = lan.assigned_port {
ctrl.forward.gc(external).await?;
}
}
for (addr, (_, rcs)) in binds.tor {
drop(rcs);
errors.handle(ctrl.tor.gc(Some(addr), None).await);
}
for (id, host) in hosts.0 {
errors.handle(self.update(id, host).await);
}
std::mem::take(&mut self.dns);
errors.handle(ctrl.dns.gc(Some(self.id.clone()), self.ip).await);
errors.into_result()
}
@@ -261,6 +273,9 @@ impl NetService {
let ip_info = server_info.as_ip_info().de()?;
let hostname = server_info.as_hostname().de()?;
for (port, bind) in &host.bindings {
if !bind.enabled {
continue;
}
let old_lan_bind = binds.lan.remove(port);
let lan_bind = old_lan_bind
.as_ref()
@@ -395,7 +410,7 @@ impl NetService {
}
let mut removed = BTreeSet::new();
binds.lan.retain(|internal, (external, _, hostnames, _)| {
if host.bindings.contains_key(internal) {
if host.bindings.get(internal).map_or(false, |b| b.enabled) {
true
} else {
removed.insert((*external, std::mem::take(hostnames)));
@@ -424,6 +439,9 @@ impl NetService {
let mut tor_hostname_ports = BTreeMap::<u16, TorHostnamePorts>::new();
let mut tor_binds = OrdMap::<u16, SocketAddr>::new();
for (internal, info) in &host.bindings {
if !info.enabled {
continue;
}
tor_binds.insert(
info.options.preferred_external_port,
SocketAddr::from((self.ip, *internal)),
@@ -511,7 +529,7 @@ impl NetService {
pub async fn remove_all(mut self) -> Result<(), Error> {
self.shutdown = true;
if let Some(ctrl) = Weak::upgrade(&self.controller) {
self.clear_bindings().await?;
self.clear_bindings(Default::default()).await?;
drop(ctrl);
Ok(())
} else {
@@ -566,7 +584,7 @@ impl Drop for NetService {
binds: BTreeMap::new(),
},
);
tokio::spawn(async move { svc.remove_all().await.unwrap() });
tokio::spawn(async move { svc.remove_all().await.log_err() });
}
}
}

View File

@@ -307,7 +307,7 @@ async fn torctl(
let logs = journalctl(
LogSource::Unit(SYSTEMD_UNIT),
0,
Some(0),
None,
Some("0"),
false,

View File

@@ -7,7 +7,7 @@ use clap::builder::ValueParserFactory;
use clap::Parser;
use color_eyre::eyre::eyre;
use imbl_value::InternedString;
use models::PackageId;
use models::{FromStrParser, PackageId};
use rpc_toolkit::{from_fn_async, Context, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
use tracing::instrument;
@@ -17,7 +17,6 @@ use crate::backup::BackupReport;
use crate::context::{CliContext, RpcContext};
use crate::db::model::DatabaseModel;
use crate::prelude::*;
use crate::util::clap::FromStrParser;
use crate::util::serde::HandlerExtSerde;
// #[command(subcommands(list, delete, delete_before, create))]

View File

@@ -3,6 +3,7 @@ use std::str::FromStr;
use clap::builder::ValueParserFactory;
use itertools::Itertools;
use models::FromStrParser;
use serde::{Deserialize, Serialize};
use ts_rs::TS;
use url::Url;
@@ -10,7 +11,6 @@ use url::Url;
use crate::prelude::*;
use crate::registry::signer::commitment::Digestable;
use crate::registry::signer::sign::{AnySignature, AnyVerifyingKey, SignatureScheme};
use crate::util::clap::FromStrParser;
pub mod commitment;
pub mod sign;

View File

@@ -4,6 +4,7 @@ use std::str::FromStr;
use ::ed25519::pkcs8::BitStringRef;
use clap::builder::ValueParserFactory;
use der::referenced::OwnedToRef;
use models::FromStrParser;
use pkcs8::der::AnyRef;
use pkcs8::{PrivateKeyInfo, SubjectPublicKeyInfo};
use serde::{Deserialize, Serialize};
@@ -13,7 +14,6 @@ use ts_rs::TS;
use crate::prelude::*;
use crate::registry::signer::commitment::Digestable;
use crate::registry::signer::sign::ed25519::Ed25519;
use crate::util::clap::FromStrParser;
use crate::util::serde::{deserialize_from_str, serialize_display};
pub mod ed25519;

View File

@@ -13,12 +13,12 @@ use futures::future::BoxFuture;
use futures::{Future, FutureExt};
use helpers::TimedResource;
use imbl_value::InternedString;
use models::FromStrParser;
use tokio::sync::{broadcast, Mutex as AsyncMutex};
use ts_rs::TS;
#[allow(unused_imports)]
use crate::prelude::*;
use crate::util::clap::FromStrParser;
use crate::util::new_guid;
#[derive(

View File

@@ -249,7 +249,6 @@ impl TryFrom<ManifestV1> for Manifest {
hardware_requirements: value.hardware_requirements,
git_hash: value.git_hash,
os_version: value.eos_version,
has_config: value.config.is_some(),
})
}
}

View File

@@ -68,8 +68,6 @@ pub struct Manifest {
#[serde(default = "current_version")]
#[ts(type = "string")]
pub os_version: Version,
#[serde(default = "const_true")]
pub has_config: bool,
}
impl Manifest {
pub fn validate_for<'a, T: Clone>(

View File

@@ -1,42 +1,38 @@
use std::collections::BTreeMap;
use std::time::Duration;
use models::{ActionId, ProcedureName};
use imbl_value::json;
use models::{ActionId, PackageId, ProcedureName, ReplayId};
use crate::action::ActionResult;
use crate::action::{ActionInput, ActionResult};
use crate::db::model::package::{ActionRequestCondition, ActionRequestEntry, ActionRequestInput};
use crate::prelude::*;
use crate::rpc_continuations::Guid;
use crate::service::config::GetConfig;
use crate::service::dependencies::DependencyConfig;
use crate::service::{Service, ServiceActor};
use crate::util::actor::background::BackgroundJobQueue;
use crate::util::actor::{ConflictBuilder, Handler};
use crate::util::serde::is_partial_of;
pub(super) struct Action {
pub(super) struct GetActionInput {
id: ActionId,
input: Value,
}
impl Handler<Action> for ServiceActor {
type Response = Result<ActionResult, Error>;
fn conflicts_with(_: &Action) -> ConflictBuilder<Self> {
ConflictBuilder::everything()
.except::<GetConfig>()
.except::<DependencyConfig>()
impl Handler<GetActionInput> for ServiceActor {
type Response = Result<Option<ActionInput>, Error>;
fn conflicts_with(_: &GetActionInput) -> ConflictBuilder<Self> {
ConflictBuilder::nothing()
}
async fn handle(
&mut self,
id: Guid,
Action {
id: action_id,
input,
}: Action,
GetActionInput { id: action_id }: GetActionInput,
_: &BackgroundJobQueue,
) -> Self::Response {
let container = &self.0.persistent_container;
container
.execute::<ActionResult>(
.execute::<Option<ActionInput>>(
id,
ProcedureName::RunAction(action_id),
input,
ProcedureName::GetActionInput(action_id),
Value::Null,
Some(Duration::from_secs(30)),
)
.await
@@ -45,16 +41,139 @@ impl Handler<Action> for ServiceActor {
}
impl Service {
pub async fn action(
pub async fn get_action_input(
&self,
id: Guid,
action_id: ActionId,
) -> Result<Option<ActionInput>, Error> {
if !self
.seed
.ctx
.db
.peek()
.await
.as_public()
.as_package_data()
.as_idx(&self.seed.id)
.or_not_found(&self.seed.id)?
.as_actions()
.as_idx(&action_id)
.or_not_found(&action_id)?
.as_has_input()
.de()?
{
return Ok(None);
}
self.actor
.send(id, GetActionInput { id: action_id })
.await?
}
}
pub fn update_requested_actions(
requested_actions: &mut BTreeMap<ReplayId, ActionRequestEntry>,
package_id: &PackageId,
action_id: &ActionId,
input: &Value,
was_run: bool,
) {
requested_actions.retain(|_, v| {
if &v.request.package_id != package_id || &v.request.action_id != action_id {
return true;
}
if let Some(when) = &v.request.when {
match &when.condition {
ActionRequestCondition::InputNotMatches => match &v.request.input {
Some(ActionRequestInput::Partial { value }) => {
if is_partial_of(value, input) {
if when.once {
return !was_run;
} else {
v.active = false;
}
} else {
v.active = true;
}
}
None => {
tracing::error!(
"action request exists in an invalid state {:?}",
v.request
);
}
},
}
true
} else {
!was_run
}
})
}
pub(super) struct RunAction {
id: ActionId,
input: Value,
}
impl Handler<RunAction> for ServiceActor {
type Response = Result<Option<ActionResult>, Error>;
fn conflicts_with(_: &RunAction) -> ConflictBuilder<Self> {
ConflictBuilder::everything().except::<GetActionInput>()
}
async fn handle(
&mut self,
id: Guid,
RunAction {
id: action_id,
input,
}: RunAction,
_: &BackgroundJobQueue,
) -> Self::Response {
let container = &self.0.persistent_container;
let result = container
.execute::<Option<ActionResult>>(
id,
ProcedureName::RunAction(action_id.clone()),
json!({
"input": input,
}),
Some(Duration::from_secs(30)),
)
.await
.with_kind(ErrorKind::Action)?;
let package_id = &self.0.id;
self.0
.ctx
.db
.mutate(|db| {
for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? {
pde.as_requested_actions_mut().mutate(|requested_actions| {
Ok(update_requested_actions(
requested_actions,
package_id,
&action_id,
&input,
true,
))
})?;
}
Ok(())
})
.await?;
Ok(result)
}
}
impl Service {
pub async fn run_action(
&self,
id: Guid,
action_id: ActionId,
input: Value,
) -> Result<ActionResult, Error> {
) -> Result<Option<ActionResult>, Error> {
self.actor
.send(
id,
Action {
RunAction {
id: action_id,
input,
},

View File

@@ -1,78 +0,0 @@
use std::time::Duration;
use models::ProcedureName;
use crate::config::action::ConfigRes;
use crate::config::ConfigureContext;
use crate::prelude::*;
use crate::rpc_continuations::Guid;
use crate::service::dependencies::DependencyConfig;
use crate::service::{Service, ServiceActor};
use crate::util::actor::background::BackgroundJobQueue;
use crate::util::actor::{ConflictBuilder, Handler};
use crate::util::serde::NoOutput;
pub(super) struct Configure(ConfigureContext);
impl Handler<Configure> for ServiceActor {
type Response = Result<(), Error>;
fn conflicts_with(_: &Configure) -> ConflictBuilder<Self> {
ConflictBuilder::everything().except::<DependencyConfig>()
}
async fn handle(
&mut self,
id: Guid,
Configure(ConfigureContext { timeout, config }): Configure,
_: &BackgroundJobQueue,
) -> Self::Response {
let container = &self.0.persistent_container;
let package_id = &self.0.id;
container
.execute::<NoOutput>(id, ProcedureName::SetConfig, to_value(&config)?, timeout)
.await
.with_kind(ErrorKind::ConfigRulesViolation)?;
self.0
.ctx
.db
.mutate(move |db| {
db.as_public_mut()
.as_package_data_mut()
.as_idx_mut(package_id)
.or_not_found(package_id)?
.as_status_mut()
.as_configured_mut()
.ser(&true)
})
.await?;
Ok(())
}
}
pub(super) struct GetConfig;
impl Handler<GetConfig> for ServiceActor {
type Response = Result<ConfigRes, Error>;
fn conflicts_with(_: &GetConfig) -> ConflictBuilder<Self> {
ConflictBuilder::nothing().except::<Configure>()
}
async fn handle(&mut self, id: Guid, _: GetConfig, _: &BackgroundJobQueue) -> Self::Response {
let container = &self.0.persistent_container;
container
.execute::<ConfigRes>(
id,
ProcedureName::GetConfig,
Value::Null,
Some(Duration::from_secs(30)), // TODO timeout
)
.await
.with_kind(ErrorKind::ConfigRulesViolation)
}
}
impl Service {
pub async fn configure(&self, id: Guid, ctx: ConfigureContext) -> Result<(), Error> {
self.actor.send(id, Configure(ctx)).await?
}
pub async fn get_config(&self, id: Guid) -> Result<ConfigRes, Error> {
self.actor.send(id, GetConfig).await?
}
}

View File

@@ -1,7 +1,6 @@
use crate::prelude::*;
use crate::rpc_continuations::Guid;
use crate::service::config::GetConfig;
use crate::service::dependencies::DependencyConfig;
use crate::service::action::RunAction;
use crate::service::start_stop::StartStop;
use crate::service::transition::TransitionKind;
use crate::service::{Service, ServiceActor};
@@ -12,9 +11,7 @@ pub(super) struct Start;
impl Handler<Start> for ServiceActor {
type Response = ();
fn conflicts_with(_: &Start) -> ConflictBuilder<Self> {
ConflictBuilder::everything()
.except::<GetConfig>()
.except::<DependencyConfig>()
ConflictBuilder::everything().except::<RunAction>()
}
async fn handle(&mut self, _: Guid, _: Start, _: &BackgroundJobQueue) -> Self::Response {
self.0.persistent_container.state.send_modify(|x| {
@@ -33,9 +30,7 @@ struct Stop;
impl Handler<Stop> for ServiceActor {
type Response = ();
fn conflicts_with(_: &Stop) -> ConflictBuilder<Self> {
ConflictBuilder::everything()
.except::<GetConfig>()
.except::<DependencyConfig>()
ConflictBuilder::everything().except::<RunAction>()
}
async fn handle(&mut self, _: Guid, _: Stop, _: &BackgroundJobQueue) -> Self::Response {
let mut transition_state = None;

View File

@@ -1,86 +0,0 @@
use std::time::Duration;
use imbl_value::json;
use models::{PackageId, ProcedureName};
use crate::prelude::*;
use crate::rpc_continuations::Guid;
use crate::service::{Service, ServiceActor, ServiceActorSeed};
use crate::util::actor::background::BackgroundJobQueue;
use crate::util::actor::{ConflictBuilder, Handler};
use crate::Config;
impl ServiceActorSeed {
async fn dependency_config(
&self,
id: Guid,
dependency_id: PackageId,
remote_config: Option<Config>,
) -> Result<Option<Config>, Error> {
let container = &self.persistent_container;
container
.sanboxed::<Option<Config>>(
id.clone(),
ProcedureName::UpdateDependency(dependency_id.clone()),
json!({
"queryResults": container
.execute::<Value>(
id,
ProcedureName::QueryDependency(dependency_id),
Value::Null,
Some(Duration::from_secs(30)),
)
.await
.with_kind(ErrorKind::Dependency)?,
"remoteConfig": remote_config,
}),
Some(Duration::from_secs(30)),
)
.await
.with_kind(ErrorKind::Dependency)
.map(|res| res.filter(|c| !c.is_empty() && Some(c) != remote_config.as_ref()))
}
}
pub(super) struct DependencyConfig {
dependency_id: PackageId,
remote_config: Option<Config>,
}
impl Handler<DependencyConfig> for ServiceActor {
type Response = Result<Option<Config>, Error>;
fn conflicts_with(_: &DependencyConfig) -> ConflictBuilder<Self> {
ConflictBuilder::nothing()
}
async fn handle(
&mut self,
id: Guid,
DependencyConfig {
dependency_id,
remote_config,
}: DependencyConfig,
_: &BackgroundJobQueue,
) -> Self::Response {
self.0
.dependency_config(id, dependency_id, remote_config)
.await
}
}
impl Service {
pub async fn dependency_config(
&self,
id: Guid,
dependency_id: PackageId,
remote_config: Option<Config>,
) -> Result<Option<Config>, Error> {
self.actor
.send(
id,
DependencyConfig {
dependency_id,
remote_config,
},
)
.await?
}
}

View File

@@ -1,22 +1,59 @@
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use models::{ActionId, PackageId};
use models::{ActionId, PackageId, ReplayId};
use rpc_toolkit::{from_fn_async, Context, HandlerExt, ParentHandler};
use crate::action::ActionResult;
use crate::db::model::package::ActionMetadata;
use crate::action::{display_action_result, ActionInput, ActionResult};
use crate::db::model::package::{
ActionMetadata, ActionRequest, ActionRequestCondition, ActionRequestEntry, ActionRequestTrigger,
};
use crate::rpc_continuations::Guid;
use crate::service::cli::ContainerCliContext;
use crate::service::effects::prelude::*;
use crate::util::serde::HandlerExtSerde;
pub fn action_api<C: Context>() -> ParentHandler<C> {
ParentHandler::new()
.subcommand("export", from_fn_async(export_action).no_cli())
.subcommand(
"clear",
from_fn_async(clear_actions)
.no_display()
.with_call_remote::<ContainerCliContext>(),
)
.subcommand(
"get-input",
from_fn_async(get_action_input)
.with_display_serializable()
.with_call_remote::<ContainerCliContext>(),
)
.subcommand(
"run",
from_fn_async(run_action)
.with_display_serializable()
.with_custom_display_fn(|args, res| Ok(display_action_result(args.params, res)))
.with_call_remote::<ContainerCliContext>(),
)
.subcommand("request", from_fn_async(request_action).no_cli())
.subcommand(
"clear-requests",
from_fn_async(clear_action_requests)
.no_display()
.with_call_remote::<ContainerCliContext>(),
)
}
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[ts(export)]
#[serde(rename_all = "camelCase")]
pub struct ExportActionParams {
#[ts(optional)]
package_id: Option<PackageId>,
id: ActionId,
metadata: ActionMetadata,
}
pub async fn export_action(context: EffectContext, data: ExportActionParams) -> Result<(), Error> {
pub async fn export_action(
context: EffectContext,
ExportActionParams { id, metadata }: ExportActionParams,
) -> Result<(), Error> {
let context = context.deref()?;
let package_id = context.seed.id.clone();
context
@@ -31,17 +68,26 @@ pub async fn export_action(context: EffectContext, data: ExportActionParams) ->
.or_not_found(&package_id)?
.as_actions_mut();
let mut value = model.de()?;
value
.insert(data.id, data.metadata)
.map(|_| ())
.unwrap_or_default();
value.insert(id, metadata);
model.ser(&value)
})
.await?;
Ok(())
}
pub async fn clear_actions(context: EffectContext) -> Result<(), Error> {
#[derive(Debug, Clone, Serialize, Deserialize, TS, Parser)]
#[ts(export)]
#[serde(rename_all = "camelCase")]
pub struct ClearActionsParams {
#[arg(long)]
pub except: Vec<ActionId>,
}
async fn clear_actions(
context: EffectContext,
ClearActionsParams { except }: ClearActionsParams,
) -> Result<(), Error> {
let except: BTreeSet<_> = except.into_iter().collect();
let context = context.deref()?;
let package_id = context.seed.id.clone();
context
@@ -54,34 +100,32 @@ pub async fn clear_actions(context: EffectContext) -> Result<(), Error> {
.as_idx_mut(&package_id)
.or_not_found(&package_id)?
.as_actions_mut()
.ser(&BTreeMap::new())
.mutate(|a| Ok(a.retain(|e, _| except.contains(e))))
})
.await?;
Ok(())
}
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[derive(Debug, Clone, Serialize, Deserialize, TS, Parser)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct ExecuteAction {
pub struct GetActionInputParams {
#[serde(default)]
#[ts(skip)]
#[arg(skip)]
procedure_id: Guid,
#[ts(optional)]
package_id: Option<PackageId>,
action_id: ActionId,
#[ts(type = "any")]
input: Value,
}
pub async fn execute_action(
async fn get_action_input(
context: EffectContext,
ExecuteAction {
GetActionInputParams {
procedure_id,
package_id,
action_id,
input,
}: ExecuteAction,
) -> Result<ActionResult, Error> {
}: GetActionInputParams,
) -> Result<Option<ActionInput>, Error> {
let context = context.deref()?;
if let Some(package_id) = package_id {
@@ -93,9 +137,179 @@ pub async fn execute_action(
.await
.as_ref()
.or_not_found(&package_id)?
.action(procedure_id, action_id, input)
.get_action_input(procedure_id, action_id)
.await
} else {
context.action(procedure_id, action_id, input).await
context.get_action_input(procedure_id, action_id).await
}
}
#[derive(Debug, Clone, Serialize, Deserialize, TS, Parser)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct RunActionParams {
#[serde(default)]
#[ts(skip)]
#[arg(skip)]
procedure_id: Guid,
#[ts(optional)]
package_id: Option<PackageId>,
action_id: ActionId,
#[ts(type = "any")]
input: Value,
}
async fn run_action(
context: EffectContext,
RunActionParams {
procedure_id,
package_id,
action_id,
input,
}: RunActionParams,
) -> Result<Option<ActionResult>, Error> {
let context = context.deref()?;
let package_id = package_id.as_ref().unwrap_or(&context.seed.id);
if package_id != &context.seed.id {
return Err(Error::new(
eyre!("calling actions on other packages is unsupported at this time"),
ErrorKind::InvalidRequest,
));
context
.seed
.ctx
.services
.get(&package_id)
.await
.as_ref()
.or_not_found(&package_id)?
.run_action(procedure_id, action_id, input)
.await
} else {
context.run_action(procedure_id, action_id, input).await
}
}
#[derive(Clone, Debug, Deserialize, Serialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct RequestActionParams {
#[serde(default)]
#[ts(skip)]
procedure_id: Guid,
replay_id: ReplayId,
#[serde(flatten)]
request: ActionRequest,
}
async fn request_action(
context: EffectContext,
RequestActionParams {
procedure_id,
replay_id,
request,
}: RequestActionParams,
) -> Result<(), Error> {
let context = context.deref()?;
let src_id = &context.seed.id;
let active = match &request.when {
Some(ActionRequestTrigger { once, condition }) => match condition {
ActionRequestCondition::InputNotMatches => {
let Some(input) = request.input.as_ref() else {
return Err(Error::new(
eyre!("input-not-matches trigger requires input to be specified"),
ErrorKind::InvalidRequest,
));
};
if let Some(service) = context
.seed
.ctx
.services
.get(&request.package_id)
.await
.as_ref()
{
let Some(prev) = service
.get_action_input(procedure_id, request.action_id.clone())
.await?
else {
return Err(Error::new(
eyre!(
"action {} of {} has no input",
request.action_id,
request.package_id
),
ErrorKind::InvalidRequest,
));
};
if input.matches(prev.value.as_ref()) {
if *once {
return Ok(());
} else {
false
}
} else {
true
}
} else {
true // update when service is installed
}
}
},
None => true,
};
context
.seed
.ctx
.db
.mutate(|db| {
db.as_public_mut()
.as_package_data_mut()
.as_idx_mut(src_id)
.or_not_found(src_id)?
.as_requested_actions_mut()
.insert(&replay_id, &ActionRequestEntry { active, request })
})
.await?;
Ok(())
}
#[derive(Debug, Clone, Serialize, Deserialize, TS, Parser)]
#[ts(type = "{ only: string[] } | { except: string[] }")]
#[ts(export)]
pub struct ClearActionRequestsParams {
#[arg(long, conflicts_with = "except")]
pub only: Option<Vec<ReplayId>>,
#[arg(long, conflicts_with = "only")]
pub except: Option<Vec<ReplayId>>,
}
async fn clear_action_requests(
context: EffectContext,
ClearActionRequestsParams { only, except }: ClearActionRequestsParams,
) -> Result<(), Error> {
let context = context.deref()?;
let package_id = context.seed.id.clone();
let only = only.map(|only| only.into_iter().collect::<BTreeSet<_>>());
let except = except.map(|except| except.into_iter().collect::<BTreeSet<_>>());
context
.seed
.ctx
.db
.mutate(|db| {
db.as_public_mut()
.as_package_data_mut()
.as_idx_mut(&package_id)
.or_not_found(&package_id)?
.as_requested_actions_mut()
.mutate(|a| {
Ok(a.retain(|e, _| {
only.as_ref().map_or(true, |only| !only.contains(e))
&& except.as_ref().map_or(true, |except| except.contains(e))
}))
})
})
.await?;
Ok(())
}

View File

@@ -3,19 +3,22 @@ use std::collections::{BTreeMap, BTreeSet};
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, SystemTime};
use clap::Parser;
use futures::future::join_all;
use helpers::NonDetachingJoinHandle;
use imbl::{vector, Vector};
use imbl_value::InternedString;
use models::{HostId, PackageId, ServiceInterfaceId};
use patch_db::json_ptr::JsonPointer;
use serde::{Deserialize, Serialize};
use tracing::warn;
use ts_rs::TS;
use crate::net::ssl::FullchainCertData;
use crate::prelude::*;
use crate::service::effects::context::EffectContext;
use crate::service::effects::net::ssl::Algorithm;
use crate::service::rpc::CallbackHandle;
use crate::service::rpc::{CallbackHandle, CallbackId};
use crate::service::{Service, ServiceActorSeed};
use crate::util::collections::EqMap;
@@ -272,6 +275,7 @@ impl CallbackHandler {
}
}
pub async fn call(mut self, args: Vector<Value>) -> Result<(), Error> {
dbg!(eyre!("callback fired: {}", self.handle.is_active()));
if let Some(seed) = self.seed.upgrade() {
seed.persistent_container
.callback(self.handle.take(), args)
@@ -299,13 +303,29 @@ impl CallbackHandlers {
}
}
pub(super) fn clear_callbacks(context: EffectContext) -> Result<(), Error> {
#[derive(Debug, Clone, Serialize, Deserialize, TS, Parser)]
#[ts(type = "{ only: number[] } | { except: number[] }")]
#[ts(export)]
pub struct ClearCallbacksParams {
#[arg(long, conflicts_with = "except")]
pub only: Option<Vec<CallbackId>>,
#[arg(long, conflicts_with = "only")]
pub except: Option<Vec<CallbackId>>,
}
pub(super) fn clear_callbacks(
context: EffectContext,
ClearCallbacksParams { only, except }: ClearCallbacksParams,
) -> Result<(), Error> {
let context = context.deref()?;
context
.seed
.persistent_container
.state
.send_if_modified(|s| !std::mem::take(&mut s.callbacks).is_empty());
let only = only.map(|only| only.into_iter().collect::<BTreeSet<_>>());
let except = except.map(|except| except.into_iter().collect::<BTreeSet<_>>());
context.seed.persistent_container.state.send_modify(|s| {
s.callbacks.retain(|cb| {
only.as_ref().map_or(true, |only| !only.contains(cb))
&& except.as_ref().map_or(true, |except| except.contains(cb))
})
});
context.seed.ctx.callbacks.gc();
Ok(())
}

View File

@@ -1,53 +0,0 @@
use models::PackageId;
use crate::service::effects::prelude::*;
#[derive(Debug, Clone, Serialize, Deserialize, Parser, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct GetConfiguredParams {
#[ts(optional)]
package_id: Option<PackageId>,
}
pub async fn get_configured(context: EffectContext) -> Result<bool, Error> {
let context = context.deref()?;
let peeked = context.seed.ctx.db.peek().await;
let package_id = &context.seed.id;
peeked
.as_public()
.as_package_data()
.as_idx(package_id)
.or_not_found(package_id)?
.as_status()
.as_configured()
.de()
}
#[derive(Debug, Clone, Serialize, Deserialize, Parser, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct SetConfigured {
configured: bool,
}
pub async fn set_configured(
context: EffectContext,
SetConfigured { configured }: SetConfigured,
) -> Result<(), Error> {
let context = context.deref()?;
let package_id = &context.seed.id;
context
.seed
.ctx
.db
.mutate(|db| {
db.as_public_mut()
.as_package_data_mut()
.as_idx_mut(package_id)
.or_not_found(package_id)?
.as_status_mut()
.as_configured_mut()
.ser(&configured)
})
.await?;
Ok(())
}

View File

@@ -1,9 +1,9 @@
use std::str::FromStr;
use clap::builder::ValueParserFactory;
use models::FromStrParser;
use crate::service::effects::prelude::*;
use crate::util::clap::FromStrParser;
pub async fn restart(
context: EffectContext,

View File

@@ -6,13 +6,13 @@ use clap::builder::ValueParserFactory;
use exver::VersionRange;
use imbl::OrdMap;
use imbl_value::InternedString;
use itertools::Itertools;
use models::{HealthCheckId, PackageId, VersionString, VolumeId};
use models::{FromStrParser, HealthCheckId, PackageId, ReplayId, VersionString, VolumeId};
use patch_db::json_ptr::JsonPointer;
use tokio::process::Command;
use crate::db::model::package::{
CurrentDependencies, CurrentDependencyInfo, CurrentDependencyKind, ManifestPreference,
ActionRequestEntry, CurrentDependencies, CurrentDependencyInfo, CurrentDependencyKind,
ManifestPreference,
};
use crate::disk::mount::filesystem::bind::Bind;
use crate::disk::mount::filesystem::idmapped::IdMapped;
@@ -20,7 +20,6 @@ use crate::disk::mount::filesystem::{FileSystem, MountType};
use crate::rpc_continuations::Guid;
use crate::service::effects::prelude::*;
use crate::status::health_check::NamedHealthCheckResult;
use crate::util::clap::FromStrParser;
use crate::util::Invoke;
use crate::volume::data_dir;
@@ -113,6 +112,7 @@ pub async fn expose_for_dependents(
context: EffectContext,
ExposeForDependentsParams { paths }: ExposeForDependentsParams,
) -> Result<(), Error> {
// TODO
Ok(())
}
@@ -192,16 +192,11 @@ impl ValueParserFactory for DependencyRequirement {
#[command(rename_all = "camelCase")]
#[ts(export)]
pub struct SetDependenciesParams {
#[serde(default)]
procedure_id: Guid,
dependencies: Vec<DependencyRequirement>,
}
pub async fn set_dependencies(
context: EffectContext,
SetDependenciesParams {
procedure_id,
dependencies,
}: SetDependenciesParams,
SetDependenciesParams { dependencies }: SetDependenciesParams,
) -> Result<(), Error> {
let context = context.deref()?;
let id = &context.seed.id;
@@ -222,19 +217,6 @@ pub async fn set_dependencies(
version_range,
),
};
let config_satisfied =
if let Some(dep_service) = &*context.seed.ctx.services.get(&dep_id).await {
context
.dependency_config(
procedure_id.clone(),
dep_id.clone(),
dep_service.get_config(procedure_id.clone()).await?.config,
)
.await?
.is_none()
} else {
true
};
let info = CurrentDependencyInfo {
title: context
.seed
@@ -251,7 +233,6 @@ pub async fn set_dependencies(
.await?,
kind,
version_range,
config_satisfied,
};
deps.insert(dep_id, info);
}
@@ -282,7 +263,8 @@ pub async fn get_dependencies(context: EffectContext) -> Result<Vec<DependencyRe
.as_current_dependencies()
.de()?;
data.0
Ok(data
.0
.into_iter()
.map(|(id, current_dependency_info)| {
let CurrentDependencyInfo {
@@ -290,7 +272,7 @@ pub async fn get_dependencies(context: EffectContext) -> Result<Vec<DependencyRe
kind,
..
} = current_dependency_info;
Ok::<_, Error>(match kind {
match kind {
CurrentDependencyKind::Exists => {
DependencyRequirement::Exists { id, version_range }
}
@@ -301,9 +283,9 @@ pub async fn get_dependencies(context: EffectContext) -> Result<Vec<DependencyRe
version_range,
}
}
})
}
})
.try_collect()
.collect())
}
#[derive(Debug, Clone, Serialize, Deserialize, Parser, TS)]
@@ -320,12 +302,10 @@ pub struct CheckDependenciesResult {
package_id: PackageId,
#[ts(type = "string | null")]
title: Option<InternedString>,
#[ts(type = "string | null")]
installed_version: Option<exver::ExtendedVersion>,
#[ts(type = "string[]")]
installed_version: Option<VersionString>,
satisfies: BTreeSet<VersionString>,
is_running: bool,
config_satisfied: bool,
requested_actions: BTreeMap<ReplayId, ActionRequestEntry>,
#[ts(as = "BTreeMap::<HealthCheckId, NamedHealthCheckResult>")]
health_checks: OrdMap<HealthCheckId, NamedHealthCheckResult>,
}
@@ -335,14 +315,14 @@ pub async fn check_dependencies(
) -> Result<Vec<CheckDependenciesResult>, Error> {
let context = context.deref()?;
let db = context.seed.ctx.db.peek().await;
let current_dependencies = db
let pde = db
.as_public()
.as_package_data()
.as_idx(&context.seed.id)
.or_not_found(&context.seed.id)?
.as_current_dependencies()
.de()?;
let package_ids: Vec<_> = package_ids
.or_not_found(&context.seed.id)?;
let current_dependencies = pde.as_current_dependencies().de()?;
let requested_actions = pde.as_requested_actions().de()?;
let package_dependency_info: Vec<_> = package_ids
.unwrap_or_else(|| current_dependencies.0.keys().cloned().collect())
.into_iter()
.filter_map(|x| {
@@ -350,18 +330,23 @@ pub async fn check_dependencies(
Some((x, info))
})
.collect();
let mut results = Vec::with_capacity(package_ids.len());
let mut results = Vec::with_capacity(package_dependency_info.len());
for (package_id, dependency_info) in package_ids {
for (package_id, dependency_info) in package_dependency_info {
let title = dependency_info.title.clone();
let Some(package) = db.as_public().as_package_data().as_idx(&package_id) else {
let requested_actions = requested_actions
.iter()
.filter(|(_, v)| v.request.package_id == package_id)
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
results.push(CheckDependenciesResult {
package_id,
title,
installed_version: None,
satisfies: BTreeSet::new(),
is_running: false,
config_satisfied: false,
requested_actions,
health_checks: Default::default(),
});
continue;
@@ -369,22 +354,27 @@ pub async fn check_dependencies(
let manifest = package.as_state_info().as_manifest(ManifestPreference::New);
let installed_version = manifest.as_version().de()?.into_version();
let satisfies = manifest.as_satisfies().de()?;
let installed_version = Some(installed_version.clone());
let installed_version = Some(installed_version.clone().into());
let is_installed = true;
let status = package.as_status().as_main().de()?;
let status = package.as_status().de()?;
let is_running = if is_installed {
status.running()
} else {
false
};
let health_checks = status.health().cloned().unwrap_or_default();
let requested_actions = requested_actions
.iter()
.filter(|(_, v)| v.request.package_id == package_id)
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
results.push(CheckDependenciesResult {
package_id,
title,
installed_version,
satisfies,
is_running,
config_satisfied: dependency_info.config_satisfied,
requested_actions,
health_checks,
});
}

View File

@@ -29,7 +29,6 @@ pub async fn set_health(
.as_idx_mut(package_id)
.or_not_found(package_id)?
.as_status_mut()
.as_main_mut()
.mutate(|main| {
match main {
MainStatus::Running { ref mut health, .. }

View File

@@ -7,7 +7,6 @@ use crate::service::effects::context::EffectContext;
mod action;
pub mod callbacks;
mod config;
pub mod context;
mod control;
mod dependency;
@@ -26,34 +25,12 @@ pub fn handler<C: Context>() -> ParentHandler<C> {
from_fn(echo::<EffectContext>).with_call_remote::<ContainerCliContext>(),
)
// action
.subcommand(
"execute-action",
from_fn_async(action::execute_action).no_cli(),
)
.subcommand(
"export-action",
from_fn_async(action::export_action).no_cli(),
)
.subcommand(
"clear-actions",
from_fn_async(action::clear_actions).no_cli(),
)
.subcommand("action", action::action_api::<C>())
// callbacks
.subcommand(
"clear-callbacks",
from_fn(callbacks::clear_callbacks).no_cli(),
)
// config
.subcommand(
"get-configured",
from_fn_async(config::get_configured).no_cli(),
)
.subcommand(
"set-configured",
from_fn_async(config::set_configured)
.no_display()
.with_call_remote::<ContainerCliContext>(),
)
// control
.subcommand(
"restart",

View File

@@ -1,6 +1,6 @@
use models::{HostId, PackageId};
use crate::net::host::binding::{BindOptions, LanInfo};
use crate::net::host::binding::{BindId, BindOptions, LanInfo};
use crate::net::host::HostKind;
use crate::service::effects::prelude::*;
@@ -28,10 +28,20 @@ pub async fn bind(
svc.bind(kind, id, internal_port, options).await
}
pub async fn clear_bindings(context: EffectContext) -> Result<(), Error> {
#[derive(Debug, Clone, Serialize, Deserialize, TS, Parser)]
#[ts(export)]
#[serde(rename_all = "camelCase")]
pub struct ClearBindingsParams {
pub except: Vec<BindId>,
}
pub async fn clear_bindings(
context: EffectContext,
ClearBindingsParams { except }: ClearBindingsParams,
) -> Result<(), Error> {
let context = context.deref()?;
let mut svc = context.seed.persistent_container.net_service.lock().await;
svc.clear_bindings().await?;
svc.clear_bindings(except.into_iter().collect()).await?;
Ok(())
}

View File

@@ -165,7 +165,17 @@ pub async fn list_service_interfaces(
Ok(res)
}
pub async fn clear_service_interfaces(context: EffectContext) -> Result<(), Error> {
#[derive(Debug, Clone, Serialize, Deserialize, TS, Parser)]
#[ts(export)]
#[serde(rename_all = "camelCase")]
pub struct ClearServiceInterfacesParams {
pub except: Vec<ServiceInterfaceId>,
}
pub async fn clear_service_interfaces(
context: EffectContext,
ClearServiceInterfacesParams { except }: ClearServiceInterfacesParams,
) -> Result<(), Error> {
let context = context.deref()?;
let package_id = context.seed.id.clone();
@@ -179,7 +189,7 @@ pub async fn clear_service_interfaces(context: EffectContext) -> Result<(), Erro
.as_idx_mut(&package_id)
.or_not_found(&package_id)?
.as_service_interfaces_mut()
.ser(&Default::default())
.mutate(|s| Ok(s.retain(|id, _| except.contains(id))))
})
.await
}

View File

@@ -26,6 +26,7 @@ pub async fn get_store(
callback,
}: GetStoreParams,
) -> Result<Value, Error> {
dbg!(&callback);
let context = context.deref()?;
let peeked = context.seed.ctx.db.peek().await;
let package_id = package_id.unwrap_or(context.seed.id.clone());
@@ -33,8 +34,9 @@ pub async fn get_store(
.as_private()
.as_package_stores()
.as_idx(&package_id)
.or_not_found(&package_id)?
.de()?;
.map(|s| s.de())
.transpose()?
.unwrap_or_default();
if let Some(callback) = callback {
let callback = callback.register(&context.seed.persistent_container);
@@ -45,10 +47,7 @@ pub async fn get_store(
);
}
Ok(path
.get(&value)
.ok_or_else(|| Error::new(eyre!("Did not find value at path"), ErrorKind::NotFound))?
.clone())
Ok(path.get(&value).cloned().unwrap_or_default())
}
#[derive(Debug, Clone, Serialize, Deserialize, TS)]

View File

@@ -1,11 +1,12 @@
use std::collections::{BTreeMap, BTreeSet};
use std::ffi::OsString;
use std::io::IsTerminal;
use std::ops::Deref;
use std::os::unix::process::ExitStatusExt;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::{Arc, Weak};
use std::time::Duration;
use std::{ffi::OsString, path::PathBuf};
use axum::extract::ws::WebSocket;
use chrono::{DateTime, Utc};
@@ -15,7 +16,7 @@ use futures::stream::FusedStream;
use futures::{SinkExt, StreamExt, TryStreamExt};
use imbl_value::{json, InternedString};
use itertools::Itertools;
use models::{ImageId, PackageId, ProcedureName};
use models::{ActionId, ImageId, PackageId, ProcedureName};
use nix::sys::signal::Signal;
use persistent_container::{PersistentContainer, Subcontainer};
use rpc_toolkit::{from_fn_async, CallRemoteHandler, Empty, HandlerArgs, HandlerFor};
@@ -39,6 +40,7 @@ use crate::prelude::*;
use crate::progress::{NamedProgress, Progress};
use crate::rpc_continuations::{Guid, RpcContinuation};
use crate::s9pk::S9pk;
use crate::service::action::update_requested_actions;
use crate::service::service_map::InstallProgressHandles;
use crate::util::actor::concurrent::ConcurrentActor;
use crate::util::io::{create_file, AsyncReadStream};
@@ -48,11 +50,9 @@ use crate::util::Never;
use crate::volume::data_dir;
use crate::CAP_1_KiB;
mod action;
pub mod action;
pub mod cli;
mod config;
mod control;
mod dependencies;
pub mod effects;
pub mod persistent_container;
mod properties;
@@ -97,7 +97,7 @@ impl ServiceRef {
.persistent_container
.execute::<NoOutput>(
Guid::new(),
ProcedureName::Uninit,
ProcedureName::PackageUninit,
to_value(&target_version)?,
None,
) // TODO timeout
@@ -257,7 +257,7 @@ impl Service {
tokio::fs::create_dir_all(&path).await?;
}
}
let start_stop = if i.as_status().as_main().de()?.running() {
let start_stop = if i.as_status().de()?.running() {
StartStop::Start
} else {
StartStop::Stop
@@ -429,12 +429,13 @@ impl Service {
.clone(),
);
}
let procedure_id = Guid::new();
service
.seed
.persistent_container
.execute::<NoOutput>(
Guid::new(),
ProcedureName::Init,
procedure_id.clone(),
ProcedureName::PackageInit,
to_value(&src_version)?,
None,
) // TODO timeout
@@ -445,16 +446,60 @@ impl Service {
progress.progress.complete();
tokio::task::yield_now().await;
}
let peek = ctx.db.peek().await;
let mut action_input: BTreeMap<ActionId, Value> = BTreeMap::new();
let requested_actions: BTreeSet<_> = peek
.as_public()
.as_package_data()
.as_entries()?
.into_iter()
.map(|(_, pde)| {
Ok(pde
.as_requested_actions()
.as_entries()?
.into_iter()
.map(|(_, r)| {
Ok::<_, Error>(if r.as_request().as_package_id().de()? == manifest.id {
Some(r.as_request().as_action_id().de()?)
} else {
None
})
})
.filter_map_ok(|a| a))
})
.flatten_ok()
.map(|a| a.and_then(|a| a))
.try_collect()?;
for action_id in requested_actions {
if let Some(input) = service
.get_action_input(procedure_id.clone(), action_id.clone())
.await?
.and_then(|i| i.value)
{
action_input.insert(action_id, input);
}
}
ctx.db
.mutate(|d| {
let entry = d
.mutate(|db| {
for (action_id, input) in &action_input {
for (_, pde) in db.as_public_mut().as_package_data_mut().as_entries_mut()? {
pde.as_requested_actions_mut().mutate(|requested_actions| {
Ok(update_requested_actions(
requested_actions,
&manifest.id,
action_id,
input,
false,
))
})?;
}
}
let entry = db
.as_public_mut()
.as_package_data_mut()
.as_idx_mut(&manifest.id)
.or_not_found(&manifest.id)?;
if !manifest.has_config {
entry.as_status_mut().as_configured_mut().ser(&true)?;
}
entry
.as_state_info_mut()
.ser(&PackageState::Installed(InstalledState { manifest }))?;

View File

@@ -379,7 +379,11 @@ impl PersistentContainer {
));
}
self.rpc_client.request(rpc::Init, Empty {}).await?;
self.rpc_client
.request(rpc::Init, Empty {})
.await
.map_err(Error::from)
.log_err();
self.state.send_modify(|s| s.rt_initialized = true);
@@ -548,7 +552,7 @@ impl PersistentContainer {
impl Drop for PersistentContainer {
fn drop(&mut self) {
if let Some(destroy) = self.destroy() {
tokio::spawn(async move { destroy.await.unwrap() });
tokio::spawn(async move { destroy.await.log_err() });
}
}
}

View File

@@ -1,10 +1,12 @@
use std::collections::BTreeSet;
use std::str::FromStr;
use std::sync::{Arc, Weak};
use std::time::Duration;
use clap::builder::ValueParserFactory;
use imbl::Vector;
use imbl_value::Value;
use models::ProcedureName;
use models::{FromStrParser, ProcedureName};
use rpc_toolkit::yajrc::RpcMethod;
use rpc_toolkit::Empty;
use ts_rs::TS;
@@ -153,6 +155,11 @@ impl serde::Serialize for Sandbox {
pub struct CallbackId(u64);
impl CallbackId {
pub fn register(self, container: &PersistentContainer) -> CallbackHandle {
dbg!(eyre!(
"callback {} registered for {}",
self.0,
container.s9pk.as_manifest().id
));
let this = Arc::new(self);
let res = Arc::downgrade(&this);
container
@@ -161,6 +168,18 @@ impl CallbackId {
CallbackHandle(res)
}
}
impl FromStr for CallbackId {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
u64::from_str(s).map_err(Error::from).map(Self)
}
}
impl ValueParserFactory for CallbackId {
type Parser = FromStrParser<Self>;
fn value_parser() -> Self::Parser {
FromStrParser::new()
}
}
pub struct CallbackHandle(Weak<CallbackId>);
impl CallbackHandle {

View File

@@ -49,7 +49,7 @@ async fn service_actor_loop(
.db
.mutate(|d| {
if let Some(i) = d.as_public_mut().as_package_data_mut().as_idx_mut(&id) {
let previous = i.as_status().as_main().de()?;
let previous = i.as_status().de()?;
let main_status = match &kinds {
ServiceStateKinds {
transition_state: Some(TransitionKind::Restarting),
@@ -89,7 +89,7 @@ async fn service_actor_loop(
..
} => MainStatus::Stopped,
};
i.as_status_mut().as_main_mut().ser(&main_status)?;
i.as_status_mut().ser(&main_status)?;
}
Ok(())
})

View File

@@ -23,7 +23,7 @@ use crate::s9pk::manifest::PackageId;
use crate::s9pk::merkle_archive::source::FileSource;
use crate::s9pk::S9pk;
use crate::service::{LoadDisposition, Service, ServiceRef};
use crate::status::{MainStatus, Status};
use crate::status::MainStatus;
use crate::util::serde::Pem;
pub type DownloadInstallFuture = BoxFuture<'static, Result<InstallFuture, Error>>;
@@ -174,16 +174,14 @@ impl ServiceMap {
PackageState::Installing(installing)
},
data_version: None,
status: Status {
configured: false,
main: MainStatus::Stopped,
},
status: MainStatus::Stopped,
registry: None,
developer_key: Pem::new(developer_key),
icon,
last_backup: None,
current_dependencies: Default::default(),
actions: Default::default(),
requested_actions: Default::default(),
service_interfaces: Default::default(),
hosts: Default::default(),
store_exposed_dependents: Default::default(),

View File

@@ -9,8 +9,7 @@ use super::TempDesiredRestore;
use crate::disk::mount::filesystem::ReadWrite;
use crate::prelude::*;
use crate::rpc_continuations::Guid;
use crate::service::config::GetConfig;
use crate::service::dependencies::DependencyConfig;
use crate::service::action::GetActionInput;
use crate::service::transition::{TransitionKind, TransitionState};
use crate::service::ServiceActor;
use crate::util::actor::background::BackgroundJobQueue;
@@ -23,9 +22,7 @@ pub(in crate::service) struct Backup {
impl Handler<Backup> for ServiceActor {
type Response = Result<BoxFuture<'static, Result<(), Error>>, Error>;
fn conflicts_with(_: &Backup) -> ConflictBuilder<Self> {
ConflictBuilder::everything()
.except::<GetConfig>()
.except::<DependencyConfig>()
ConflictBuilder::everything().except::<GetActionInput>()
}
async fn handle(
&mut self,

View File

@@ -3,8 +3,7 @@ use futures::FutureExt;
use super::TempDesiredRestore;
use crate::prelude::*;
use crate::rpc_continuations::Guid;
use crate::service::config::GetConfig;
use crate::service::dependencies::DependencyConfig;
use crate::service::action::GetActionInput;
use crate::service::transition::{TransitionKind, TransitionState};
use crate::service::{Service, ServiceActor};
use crate::util::actor::background::BackgroundJobQueue;
@@ -15,9 +14,7 @@ pub(super) struct Restart;
impl Handler<Restart> for ServiceActor {
type Response = ();
fn conflicts_with(_: &Restart) -> ConflictBuilder<Self> {
ConflictBuilder::everything()
.except::<GetConfig>()
.except::<DependencyConfig>()
ConflictBuilder::everything().except::<GetActionInput>()
}
async fn handle(&mut self, _: Guid, _: Restart, jobs: &BackgroundJobQueue) -> Self::Response {
// So Need a handle to just a single field in the state

View File

@@ -5,6 +5,7 @@ use clap::builder::ValueParserFactory;
use clap::Parser;
use color_eyre::eyre::eyre;
use imbl_value::InternedString;
use models::FromStrParser;
use rpc_toolkit::{from_fn_async, Context, Empty, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
use tracing::instrument;
@@ -12,7 +13,6 @@ use ts_rs::TS;
use crate::context::{CliContext, RpcContext};
use crate::prelude::*;
use crate::util::clap::FromStrParser;
use crate::util::io::create_file;
use crate::util::serde::{display_serializable, HandlerExtSerde, WithIoFormat};

View File

@@ -1,12 +1,11 @@
use std::str::FromStr;
use clap::builder::ValueParserFactory;
use models::FromStrParser;
pub use models::HealthCheckId;
use serde::{Deserialize, Serialize};
use ts_rs::TS;
use crate::util::clap::FromStrParser;
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, TS)]
#[serde(rename_all = "camelCase")]
pub struct NamedHealthCheckResult {

View File

@@ -11,17 +11,9 @@ use crate::service::start_stop::StartStop;
use crate::status::health_check::NamedHealthCheckResult;
pub mod health_check;
#[derive(Clone, Debug, Deserialize, Serialize, HasModel, TS)]
#[serde(rename_all = "camelCase")]
#[model = "Model<Self>"]
#[ts(export)]
pub struct Status {
pub configured: bool,
pub main: MainStatus,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, TS)]
#[serde(tag = "status")]
#[serde(tag = "main")]
#[serde(rename_all = "camelCase")]
#[serde(rename_all_fields = "camelCase")]
pub enum MainStatus {

View File

@@ -1,36 +0,0 @@
use std::marker::PhantomData;
use std::str::FromStr;
use clap::builder::TypedValueParser;
use crate::prelude::*;
pub struct FromStrParser<T>(PhantomData<T>);
impl<T> FromStrParser<T> {
pub fn new() -> Self {
Self(PhantomData)
}
}
impl<T> Clone for FromStrParser<T> {
fn clone(&self) -> Self {
Self(PhantomData)
}
}
impl<T> TypedValueParser for FromStrParser<T>
where
T: FromStr + Clone + Send + Sync + 'static,
T::Err: std::fmt::Display,
{
type Value = T;
fn parse_ref(
&self,
_: &clap::Command,
_: Option<&clap::Arg>,
value: &std::ffi::OsStr,
) -> Result<Self::Value, clap::Error> {
value
.to_string_lossy()
.parse()
.map_err(|e| clap::Error::raw(clap::error::ErrorKind::ValueValidation, e))
}
}

View File

@@ -442,6 +442,13 @@ impl<T> BackTrackingIO<T> {
},
}
}
pub fn read_buffer(&self) -> &[u8] {
match &self.buffer {
BTBuffer::NotBuffering => &[],
BTBuffer::Buffering { read, .. } => read,
BTBuffer::Rewound { read } => read.remaining_slice(),
}
}
#[must_use]
pub fn stop_buffering(&mut self) -> Vec<u8> {
match std::mem::take(&mut self.buffer) {
@@ -512,6 +519,28 @@ impl<T: AsyncRead> AsyncRead for BackTrackingIO<T> {
}
}
}
impl<T: std::io::Read> std::io::Read for BackTrackingIO<T> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match &mut self.buffer {
BTBuffer::Buffering { read, .. } => {
let n = self.io.read(buf)?;
read.extend_from_slice(&buf[..n]);
Ok(n)
}
BTBuffer::NotBuffering => self.io.read(buf),
BTBuffer::Rewound { read } => {
let mut ready = false;
if (read.position() as usize) < read.get_ref().len() {
let n = std::io::Read::read(read, buf)?;
if n != 0 {
return Ok(n);
}
}
self.io.read(buf)
}
}
}
}
impl<T: AsyncWrite> AsyncWrite for BackTrackingIO<T> {
fn is_write_vectored(&self) -> bool {
@@ -869,7 +898,7 @@ impl Drop for TmpDir {
if self.path.exists() {
let path = std::mem::take(&mut self.path);
tokio::spawn(async move {
tokio::fs::remove_dir_all(&path).await.unwrap();
tokio::fs::remove_dir_all(&path).await.log_err();
});
}
}

View File

@@ -36,7 +36,6 @@ use crate::util::serde::{deserialize_from_str, serialize_display};
use crate::{Error, ErrorKind, ResultExt as _};
pub mod actor;
pub mod clap;
pub mod collections;
pub mod cpupower;
pub mod crypto;
@@ -568,7 +567,7 @@ pub struct FileLock(#[allow(unused)] OwnedMutexGuard<()>, Option<FdLock<File>>);
impl Drop for FileLock {
fn drop(&mut self) {
if let Some(fd_lock) = self.1.take() {
tokio::task::spawn_blocking(|| fd_lock.unlock(true).map_err(|(_, e)| e).unwrap());
tokio::task::spawn_blocking(|| fd_lock.unlock(true).map_err(|(_, e)| e).log_err());
}
}
}

View File

@@ -1,4 +1,3 @@
use std::any::Any;
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::ops::Deref;
@@ -9,6 +8,7 @@ use clap::builder::ValueParserFactory;
use clap::{ArgMatches, CommandFactory, FromArgMatches};
use color_eyre::eyre::eyre;
use imbl::OrdMap;
use models::FromStrParser;
use openssl::pkey::{PKey, Private};
use openssl::x509::X509;
use rpc_toolkit::{
@@ -17,12 +17,10 @@ use rpc_toolkit::{
use serde::de::DeserializeOwned;
use serde::ser::{SerializeMap, SerializeSeq};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::Value;
use ts_rs::TS;
use super::IntoDoubleEndedIterator;
use crate::prelude::*;
use crate::util::clap::FromStrParser;
use crate::util::Apply;
pub fn deserialize_from_str<
@@ -272,7 +270,7 @@ impl std::fmt::Display for IoFormat {
impl std::str::FromStr for IoFormat {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_value(Value::String(s.to_owned()))
serde_json::from_value(serde_json::Value::String(s.to_owned()))
.with_kind(crate::ErrorKind::Deserialization)
}
}
@@ -566,7 +564,7 @@ where
}
}
#[derive(Deserialize, Serialize, TS)]
#[derive(Deserialize, Serialize, TS, Clone)]
pub struct StdinDeserializable<T>(pub T);
impl<T> Default for StdinDeserializable<T>
where
@@ -1358,3 +1356,19 @@ impl Serialize for MaybeUtf8String {
}
}
}
pub fn is_partial_of(partial: &Value, full: &Value) -> bool {
match (partial, full) {
(Value::Object(partial), Value::Object(full)) => partial.iter().all(|(k, v)| {
if let Some(v_full) = full.get(k) {
is_partial_of(v, v_full)
} else {
false
}
}),
(Value::Array(partial), Value::Array(full)) => partial
.iter()
.all(|v| full.iter().any(|v_full| is_partial_of(v, v_full))),
(_, _) => partial == full,
}
}