fix: Fix a lint

chore: remove the limit on the long-running

fix: Starting sometimes.

fix: Make it so the stop of the main works

fix: Bind local and tor with package.

wip: envs

fix TS error

import config types from sdk

update package.json
This commit is contained in:
BluJ
2023-02-20 12:22:57 -07:00
committed by Aiden McClelland
parent f5430f9151
commit 5e9e26fa67
34 changed files with 1425 additions and 1195 deletions

View File

@@ -135,7 +135,7 @@ rust-argon2 = "1.0.0"
scopeguard = "1.1" # because avahi-sys fucks your shit up
serde = { version = "1.0.139", features = ["derive", "rc"] }
serde_cbor = { package = "ciborium", version = "0.2.0" }
serde_json = "1.0.82"
serde_json = "1.0.93"
serde_toml = { package = "toml", version = "0.5.9" }
serde_with = { version = "2.0.1", features = ["macros", "json"] }
serde_yaml = "0.9.11"

View File

@@ -101,16 +101,28 @@ async fn create_service_manager(
let mut running_service: Option<NonDetachingJoinHandle<()>> = None;
let seed = seed.clone();
loop {
let current: StartStop = current_state.borrow().clone();
let desired: StartStop = desired_state_receiver.borrow().clone();
let current: StartStop = *current_state.borrow();
let desired: StartStop = *desired_state_receiver.borrow();
match (current, desired) {
(StartStop::Start, StartStop::Start) => (),
(StartStop::Start, StartStop::Stop) => {
if let Err(err) = seed.stop_container().await {
tracing::error!("Could not stop container");
tracing::debug!("{:?}", err)
};
running_service = None;
if persistent_container.is_none() {
if let Err(err) = seed.stop_container().await {
tracing::error!("Could not stop container");
tracing::debug!("{:?}", err)
}
running_service = None;
} else if let Some(current_service) = running_service.take() {
tokio::select! {
_ = current_service => (),
_ = tokio::time::sleep(Duration::from_secs_f64(seed.manifest
.containers
.as_ref()
.and_then(|c| c.main.sigterm_timeout).map(|x| x.as_secs_f64()).unwrap_or_default())) => {
tracing::error!("Could not stop service");
}
}
}
current_state.send(StartStop::Stop).unwrap_or_default();
}
(StartStop::Stop, StartStop::Start) => starting_service(
@@ -123,7 +135,7 @@ async fn create_service_manager(
(StartStop::Stop, StartStop::Stop) => (),
}
if let Err(_) = desired_state_receiver.changed().await {
if desired_state_receiver.changed().await.is_err() {
tracing::error!("Desired state error");
break;
}
@@ -192,10 +204,7 @@ fn starting_service(
current_state.send(StartStop::Start).unwrap_or_default();
})
};
let set_stopped = {
let current_state = current_state.clone();
move || current_state.send(StartStop::Stop)
};
let set_stopped = { move || current_state.send(StartStop::Stop) };
let running_main_loop = async move {
while desired_state.borrow().is_start() {
let result = run_main(

View File

@@ -77,22 +77,25 @@ pub enum BackupReturn {
}
pub struct Gid {
next_gid: watch::Sender<u32>,
main_gid: watch::Sender<ProcessGroupId>,
next_gid: (watch::Sender<u32>, watch::Receiver<u32>),
main_gid: (
watch::Sender<ProcessGroupId>,
watch::Receiver<ProcessGroupId>,
),
}
impl Default for Gid {
fn default() -> Self {
Self {
next_gid: watch::channel(1).0,
main_gid: watch::channel(ProcessGroupId(1)).0,
next_gid: watch::channel(1),
main_gid: watch::channel(ProcessGroupId(1)),
}
}
}
impl Gid {
pub fn new_gid(&self) -> ProcessGroupId {
let mut previous = 0;
self.next_gid.send_modify(|x| {
self.next_gid.0.send_modify(|x| {
previous = *x;
*x = previous + 1;
});
@@ -101,7 +104,7 @@ impl Gid {
pub fn new_main_gid(&self) -> ProcessGroupId {
let gid = self.new_gid();
self.main_gid.send(gid).unwrap();
self.main_gid.0.send(gid).unwrap_or_default();
gid
}
}
@@ -923,7 +926,7 @@ async fn send_signal(manager: &Manager, gid: Arc<Gid>, signal: Signal) -> Result
// .store(false, Ordering::SeqCst);
if let Some(rpc_client) = manager.rpc_client() {
let main_gid = *gid.main_gid.borrow();
let main_gid = *gid.main_gid.0.borrow();
let next_gid = gid.new_gid();
#[cfg(feature = "js_engine")]
if let Err(e) = crate::procedure::js_scripts::JsProcedure::default()

View File

@@ -9,8 +9,8 @@ use tracing::instrument;
use super::manager_seed::ManagerSeed;
use super::{
add_network_for_main, generate_certificate, get_long_running_ip, long_running_docker,
remove_network_for_main, GetRunningIp,
add_network_for_main, get_long_running_ip, long_running_docker, remove_network_for_main,
GetRunningIp,
};
use crate::procedure::docker::DockerContainer;
use crate::util::NonDetachingJoinHandle;
@@ -56,7 +56,8 @@ pub async fn spawn_persistent_container(
let (mut runtime, inserter) =
long_running_docker(&seed, &container).await?;
let ip = match get_long_running_ip(&*seed, &mut runtime).await {
let ip = match get_long_running_ip(&seed, &mut runtime).await {
GetRunningIp::Ip(x) => x,
GetRunningIp::Error(e) => return Err(e),
GetRunningIp::EarlyExit(e) => {
@@ -65,7 +66,7 @@ pub async fn spawn_persistent_container(
return Ok(());
}
};
add_network_for_main(&*seed, ip, generated_certificate).await?;
let svc = add_network_for_main(&seed, ip).await?;
if let Some(inserter_send) = inserter_send.as_mut() {
let _ = inserter_send.send(Arc::new(inserter));
@@ -81,7 +82,7 @@ pub async fn spawn_persistent_container(
a = runtime.running_output => a.map_err(|_| Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)).map(|_| ()),
};
remove_network_for_main(&*seed, ip).await?;
remove_network_for_main(svc).await?;
res
}.await {

View File

@@ -236,7 +236,7 @@ mod tests {
use tokio::sync::watch;
struct OsApiMock {
config_callbacks: watch::Sender<Vec<Callback>>,
config_callbacks: (watch::Sender<Vec<Callback>>, watch::Sender<Vec<Callback>>),
}
impl Default for OsApiMock {
fn default() -> Self {

View File

@@ -93,13 +93,12 @@ impl PackageProcedure {
ErrorKind::NotFound,
)
})?;
let gid;
let rpc_client = man.rpc_client();
if matches!(name, ProcedureName::Main) {
gid = man.gid.new_main_gid();
let gid = if matches!(name, ProcedureName::Main) {
man.gid.new_main_gid()
} else {
gid = man.gid.new_gid();
}
man.gid.new_gid()
};
procedure
.execute(

View File

@@ -234,16 +234,6 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send + Sync> S9pkReader<R> {
&validated_image_ids,
)?;
#[cfg(feature = "js_engine")]
if man.containers.is_some()
|| matches!(man.main, crate::procedure::PackageProcedure::Script(_))
{
return Err(Error::new(
eyre!("Right now we don't support the containers and the long running main"),
crate::ErrorKind::ValidateS9pk,
));
}
if man.replaces.len() >= MAX_REPLACES {
return Err(Error::new(
eyre!("Cannot have more than {MAX_REPLACES} replaces"),