feat: Add in the ability to get configs

config hooks
This commit is contained in:
BluJ
2023-02-09 11:16:55 -07:00
committed by Aiden McClelland
parent 9366dbb96e
commit f77a208e2c
11 changed files with 330 additions and 118 deletions

View File

@@ -0,0 +1,53 @@
use helpers::Callback;
use itertools::Itertools;
use jsonpath_lib::Compiled;
use models::PackageId;
use serde_json::Value;
use crate::context::RpcContext;
pub struct ConfigHook {
pub path: Compiled,
pub prev: Vec<Value>,
pub callback: Callback,
}
impl RpcContext {
pub async fn add_config_hook(&self, id: PackageId, hook: ConfigHook) {
let mut hooks = self.config_hooks.lock().await;
let prev = hooks.remove(&id).unwrap_or_default();
hooks.insert(
id,
prev.into_iter()
.filter(|h| h.callback.is_listening())
.chain(std::iter::once(hook))
.collect(),
);
}
pub async fn call_config_hooks(&self, id: PackageId, config: &Value) {
let mut hooks = self.config_hooks.lock().await;
let mut prev = hooks.remove(&id).unwrap_or_default();
for hook in &mut prev {
let new = hook
.path
.select(config)
.unwrap_or_default()
.into_iter()
.cloned()
.collect_vec();
if new != hook.prev {
hook.callback
.call(vec![Value::Array(new.clone())])
.unwrap_or_default();
hook.prev = new;
}
}
hooks.insert(
id,
prev.into_iter()
.filter(|h| h.callback.is_listening())
.collect(),
);
}
}

View File

@@ -1,29 +1,24 @@
use std::collections::{BTreeMap, BTreeSet};
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::time::Duration;
use color_eyre::eyre::eyre;
use futures::future::{BoxFuture, FutureExt};
use indexmap::IndexSet;
use itertools::Itertools;
use models::ErrorKind;
use patch_db::{
DbHandle, LockReceipt, LockTarget, LockTargetId, LockType, PatchDbHandle, Transaction, Verifier,
};
use rand::SeedableRng;
use patch_db::{DbHandle, LockReceipt, LockTarget, LockTargetId, LockType, Verifier};
use regex::Regex;
use rpc_toolkit::command;
use serde_json::Value;
use tracing::instrument;
use crate::context::RpcContext;
use crate::db::model::{CurrentDependencies, CurrentDependencyInfo, CurrentDependents};
use crate::db::model::{CurrentDependencies, CurrentDependents};
use crate::dependencies::{
add_dependent_to_current_dependents_lists, break_transitive, heal_all_dependents_transitive,
BreakTransitiveReceipts, BreakageRes, Dependencies, DependencyConfig, DependencyError,
DependencyErrors, DependencyReceipt, TaggedDependencyError, TryHealReceipts,
BreakTransitiveReceipts, BreakageRes, Dependencies, DependencyConfig, DependencyErrors,
DependencyReceipt, TaggedDependencyError, TryHealReceipts,
};
use crate::install::cleanup::{remove_from_current_dependents_lists, UpdateDependencyReceipts};
use crate::install::cleanup::UpdateDependencyReceipts;
use crate::procedure::docker::DockerContainers;
use crate::s9pk::manifest::{Manifest, PackageId};
use crate::util::display_none;
@@ -31,6 +26,7 @@ use crate::util::serde::{display_serializable, parse_stdin_deserializable, IoFor
use crate::Error;
pub mod action;
pub mod hook;
pub mod spec;
pub mod util;
@@ -38,7 +34,7 @@ pub use spec::{ConfigSpec, Defaultable};
use util::NumRange;
use self::action::{ConfigActions, ConfigRes};
use self::spec::{ConfigPointerReceipts, PackagePointerSpec, ValueSpecPointer};
use self::spec::{ConfigPointerReceipts, ValueSpecPointer};
pub type Config = serde_json::Map<String, Value>;
pub trait TypeOf {

View File

@@ -81,7 +81,7 @@ impl CliContext {
.chain(std::iter::once(Path::new(crate::util::config::CONFIG_PATH))),
)?;
let mut url = if let Some(host) = matches.value_of("host") {
host.parse()?
Url::parse host.parse()?
} else if let Some(host) = base.host {
host
} else {

View File

@@ -9,6 +9,7 @@ use std::time::Duration;
use bollard::Docker;
use helpers::to_tmp_path;
use josekit::jwk::Jwk;
use models::PackageId;
use patch_db::json_ptr::JsonPointer;
use patch_db::{DbHandle, LockReceipt, LockType, PatchDb};
use reqwest::Url;
@@ -21,6 +22,7 @@ use tracing::instrument;
use super::setup::CURRENT_SECRET;
use crate::account::AccountInfo;
use crate::config::hook::ConfigHook;
use crate::core::rpc_continuations::{RequestGuid, RestHandler, RpcContinuation};
use crate::db::model::{CurrentDependents, Database, InstalledPackageDataEntry, PackageDataEntry};
use crate::disk::OsPartitionInfo;
@@ -120,6 +122,7 @@ pub struct RpcContextSeed {
pub rpc_stream_continuations: Mutex<BTreeMap<RequestGuid, RpcContinuation>>,
pub wifi_manager: Option<Arc<RwLock<WpaCli>>>,
pub current_secret: Arc<Jwk>,
pub config_hooks: Mutex<BTreeMap<PackageId, Vec<ConfigHook>>>,
}
pub struct RpcCleanReceipts {
@@ -235,6 +238,7 @@ impl RpcContext {
)
})?,
),
config_hooks: Mutex::new(BTreeMap::new()),
});
let res = Self(seed);

View File

@@ -1,22 +1,81 @@
use color_eyre::{eyre::eyre, Report};
use color_eyre::{
eyre::{bail, eyre},
Report,
};
use helpers::{AddressSchemaLocal, AddressSchemaOnion, Callback, OsApi};
use itertools::Itertools;
use jsonpath_lib::Compiled;
use models::{InterfaceId, PackageId};
use serde_json::Value;
use sqlx::Acquire;
use crate::{manager::Manager, net::keys::Key};
use crate::{
config::hook::ConfigHook,
manager::{start_stop::StartStop, Manager},
net::keys::Key,
};
use super::try_get_running_ip;
const NULL_VALUE: &Value = &Value::Null;
#[async_trait::async_trait]
impl OsApi for Manager {
async fn get_service_config(
&self,
id: PackageId,
path: &str,
callback: Callback,
) -> Result<serde_json::Value, Report> {
todo!("BLUJ")
callback: Option<Callback>,
) -> Result<Vec<serde_json::Value>, Report> {
let found = match self
.seed
.manifest
.dependencies
.0
.iter()
.find(|x| x.0 == &id)
{
None => bail!("Cannot get a service that is not part of the dependencies"),
Some(a) => a,
};
let config = match crate::config::get(self.seed.ctx.clone(), id.clone(), None)
.await
.map(|x| x.config)
{
Ok(Some(a)) => a,
Ok(None) => bail!("No current config for the service"),
Err(err) => bail!("Could not fetch the config. {err}"),
};
let path = Compiled::compile(path).map_err(|e| eyre!("{e}"))?;
let filtered_values = path
.select(&Value::Object(config))?
.into_iter()
.cloned()
.collect_vec();
if let Some(callback) = callback {
self.seed
.ctx
.add_config_hook(
id,
ConfigHook {
path,
prev: filtered_values.clone(),
callback,
},
)
.await;
}
Ok(filtered_values)
}
// Get tor key - base 32
// Certificate + Certificate key for interface
async fn bind_local(
&self,
internal_port: u16,
@@ -77,24 +136,6 @@ impl OsApi for Manager {
tx.commit().await?;
Ok(helpers::Address(key))
}
async fn unbind_onion(&self, id: InterfaceId, external: u16) -> Result<(), Report> {
let ip = try_get_running_ip(&self.seed)
.await?
.ok_or_else(|| eyre!("No ip available"))?;
let mut svc = self
.seed
.ctx
.net_controller
.create_service(self.seed.manifest.id.clone(), ip)
.await
.map_err(|e| eyre!("Could not get to net controller: {e:?}"))?;
let mut secrets = self.seed.ctx.secret_store.acquire().await?;
svc.remove_tor(id, external)
.await
.map_err(|e| eyre!("Could not add to tor: {e:?}"))?;
Ok(())
}
async fn unbind_local(&self, id: InterfaceId, external: u16) -> Result<(), Report> {
let ip = try_get_running_ip(&self.seed)
.await?
@@ -113,4 +154,49 @@ impl OsApi for Manager {
.map_err(|e| eyre!("Could not add to local: {e:?}"))?;
Ok(())
}
async fn unbind_onion(&self, id: InterfaceId, external: u16) -> Result<(), Report> {
let ip = try_get_running_ip(&self.seed)
.await?
.ok_or_else(|| eyre!("No ip available"))?;
let mut svc = self
.seed
.ctx
.net_controller
.create_service(self.seed.manifest.id.clone(), ip)
.await
.map_err(|e| eyre!("Could not get to net controller: {e:?}"))?;
let mut secrets = self.seed.ctx.secret_store.acquire().await?;
svc.remove_tor(id, external)
.await
.map_err(|e| eyre!("Could not add to tor: {e:?}"))?;
Ok(())
}
fn set_started(&self) -> Result<(), Report> {
self.manage_container
.current_state
.send(StartStop::Start)
.unwrap_or_default();
Ok(())
}
async fn restart(&self) -> Result<(), Report> {
self.perform_restart().await;
Ok(())
}
async fn start(&self) -> Result<(), Report> {
self.manage_container
.wait_for_desired(StartStop::Start)
.await;
Ok(())
}
async fn stop(&self) -> Result<(), Report> {
self.manage_container
.wait_for_desired(StartStop::Stop)
.await;
Ok(())
}
}

View File

@@ -66,7 +66,15 @@ impl ManageContainer {
}
pub fn to_desired(&self, new_state: StartStop) {
self.desired_state.send(new_state);
self.desired_state.send(new_state).unwrap_or_default();
}
pub async fn wait_for_desired(&self, new_state: StartStop) {
let mut current_state = self.current_state();
self.to_desired(new_state);
while *current_state.borrow() != new_state {
current_state.changed().await.unwrap_or_default();
}
}
pub fn current_state(&self) -> watch::Receiver<StartStop> {

View File

@@ -420,6 +420,9 @@ fn configure(
.set(ctx, id, &version, &dependencies, &volumes, &config)
.await?;
ctx.call_config_hooks(id.clone(), &serde_json::Value::Object(config.clone()))
.await;
// track dependencies with no pointers
for (package_id, health_checks) in res.depends_on.into_iter() {
if let Some(current_dependency) = current_dependencies.0.get_mut(&package_id) {

View File

@@ -8,7 +8,7 @@ use embassy_container_init::{ProcessGroupId, SignalGroup, SignalGroupParams};
use helpers::{Address, AddressSchemaLocal, AddressSchemaOnion, Callback, OsApi, UnixRpcClient};
pub use js_engine::JsError;
use js_engine::{JsExecutionEnvironment, PathForVolumeId};
use models::{ErrorKind, VolumeId};
use models::{ErrorKind, InterfaceId, VolumeId};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tracing::instrument;
@@ -56,9 +56,9 @@ impl OsApi for SandboxOsApi {
&self,
id: PackageId,
path: &str,
callback: Callback,
) -> Result<serde_json::Value, Report> {
todo!()
callback: Option<Callback>,
) -> Result<Vec<serde_json::Value>, Report> {
Err(eyre!("Operation not permitted"))
}
#[allow(unused_variables)]
async fn bind_local(
@@ -66,7 +66,7 @@ impl OsApi for SandboxOsApi {
internal_port: u16,
address_schema: AddressSchemaLocal,
) -> Result<Address, Report> {
todo!()
Err(eyre!("Operation not permitted"))
}
#[allow(unused_variables)]
async fn bind_onion(
@@ -74,7 +74,27 @@ impl OsApi for SandboxOsApi {
internal_port: u16,
address_schema: AddressSchemaOnion,
) -> Result<Address, Report> {
todo!()
Err(eyre!("Operation not permitted"))
}
#[allow(unused_variables)]
async fn unbind_local(&self, id: InterfaceId, external: u16) -> Result<(), Report> {
Err(eyre!("Operation not permitted"))
}
#[allow(unused_variables)]
async fn unbind_onion(&self, id: InterfaceId, external: u16) -> Result<(), Report> {
Err(eyre!("Operation not permitted"))
}
fn set_started(&self) -> Result<(), Report> {
Err(eyre!("Operation not permitted"))
}
async fn restart(&self) -> Result<(), Report> {
Err(eyre!("Operation not permitted"))
}
async fn start(&self) -> Result<(), Report> {
Err(eyre!("Operation not permitted"))
}
async fn stop(&self) -> Result<(), Report> {
Err(eyre!("Operation not permitted"))
}
}