mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-04-02 05:23:14 +00:00
js effect for subscribing to config
fix errors chore: Fix some things in the manager for clippy
This commit is contained in:
@@ -1,6 +1,5 @@
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::net::Ipv4Addr;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
use std::time::Duration;
|
||||
@@ -18,12 +17,11 @@ use rand::SeedableRng;
|
||||
use serde_json::Value;
|
||||
use sqlx::Connection;
|
||||
use start_stop::StartStop;
|
||||
use tokio::sync::{oneshot, Notify};
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::{
|
||||
watch::{self, Receiver, Sender},
|
||||
watch::{self, Sender},
|
||||
Mutex,
|
||||
};
|
||||
use torut::onion::TorSecretKeyV3;
|
||||
use tracing::instrument;
|
||||
use transition_state::TransitionState;
|
||||
|
||||
@@ -41,10 +39,9 @@ use crate::dependencies::{
|
||||
use crate::disk::mount::backup::BackupMountGuard;
|
||||
use crate::disk::mount::guard::TmpMountGuard;
|
||||
use crate::install::cleanup::remove_from_current_dependents_lists;
|
||||
use crate::net::interface::InterfaceId;
|
||||
use crate::net::net_controller::NetService;
|
||||
use crate::procedure::docker::{DockerContainer, DockerProcedure, LongRunning};
|
||||
use crate::procedure::{NoOutput, PackageProcedure, ProcedureName};
|
||||
use crate::procedure::{NoOutput, ProcedureName};
|
||||
use crate::s9pk::manifest::Manifest;
|
||||
use crate::status::MainStatus;
|
||||
use crate::util::NonDetachingJoinHandle;
|
||||
@@ -69,7 +66,6 @@ pub const HEALTH_CHECK_GRACE_PERIOD_SECONDS: u64 = 5;
|
||||
|
||||
type ManagerPersistentContainer = Arc<Option<PersistentContainer>>;
|
||||
type BackupGuard = Arc<Mutex<BackupMountGuard<TmpMountGuard>>>;
|
||||
type BackupTask = Result<PackageBackupInfo, Error>;
|
||||
pub enum BackupReturn {
|
||||
Error(Error),
|
||||
AlreadyRunning(PackageBackupReport),
|
||||
@@ -94,7 +90,7 @@ impl Default for Gid {
|
||||
}
|
||||
impl Gid {
|
||||
pub fn new_gid(&self) -> ProcessGroupId {
|
||||
let previous;
|
||||
let mut previous = 0;
|
||||
self.next_gid.send_modify(|x| {
|
||||
previous = *x;
|
||||
*x = previous + 1;
|
||||
@@ -104,13 +100,13 @@ impl Gid {
|
||||
|
||||
pub fn new_main_gid(&self) -> ProcessGroupId {
|
||||
let gid = self.new_gid();
|
||||
self.main_gid.send(gid);
|
||||
self.main_gid.send(gid).unwrap();
|
||||
gid
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Manager {
|
||||
pub struct Manager {
|
||||
seed: Arc<ManagerSeed>,
|
||||
|
||||
manage_container: Arc<manager_container::ManageContainer>,
|
||||
@@ -120,16 +116,11 @@ struct Manager {
|
||||
pub gid: Arc<Gid>,
|
||||
}
|
||||
impl Manager {
|
||||
pub async fn new(
|
||||
ctx: RpcContext,
|
||||
manifest: Manifest,
|
||||
tor_keys: BTreeMap<InterfaceId, TorSecretKeyV3>,
|
||||
) -> Result<Self, Error> {
|
||||
pub async fn new(ctx: RpcContext, manifest: Manifest) -> Result<Self, Error> {
|
||||
let seed = Arc::new(ManagerSeed {
|
||||
ctx,
|
||||
container_name: DockerProcedure::container_name(&manifest.id, None),
|
||||
manifest,
|
||||
tor_keys,
|
||||
});
|
||||
|
||||
let persistent_container = Arc::new(PersistentContainer::init(&seed).await?);
|
||||
@@ -148,15 +139,13 @@ impl Manager {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn start(&self) -> Result<(), Error> {
|
||||
pub fn start(&self) {
|
||||
self._transition_abort();
|
||||
self.manage_container.to_desired(StartStop::Start);
|
||||
Ok(())
|
||||
}
|
||||
pub fn stop(&self) -> Result<(), Error> {
|
||||
pub fn stop(&self) {
|
||||
self._transition_abort();
|
||||
self.manage_container.to_desired(StartStop::Stop);
|
||||
Ok(())
|
||||
}
|
||||
pub async fn restart(&self) {
|
||||
if self._is_transition_restart() {
|
||||
@@ -176,17 +165,15 @@ impl Manager {
|
||||
|
||||
let (transition_state, done) = configure(context, id, configure_context).remote_handle();
|
||||
self._transition_replace({
|
||||
let seed = self.seed.clone();
|
||||
let manage_container = self.manage_container.clone();
|
||||
|
||||
TransitionState::Configuring(tokio::spawn(async move {
|
||||
let desired_state = manage_container.desired_state();
|
||||
let state_reverter = DesiredStateReverter::new(manage_container.clone());
|
||||
let starting_desired = desired_state.borrow().clone();
|
||||
let mut current_state = manage_container.current_state();
|
||||
manage_container.to_desired(StartStop::Stop);
|
||||
while current_state.borrow().is_start() {
|
||||
current_state.changed().await;
|
||||
current_state.changed().await.unwrap();
|
||||
}
|
||||
|
||||
transition_state.await;
|
||||
@@ -211,7 +198,7 @@ impl Manager {
|
||||
let mut current_status = self.manage_container.current_state();
|
||||
|
||||
while current_status.borrow().is_start() {
|
||||
current_status.changed().await;
|
||||
current_status.changed().await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -223,7 +210,9 @@ impl Manager {
|
||||
}
|
||||
|
||||
pub fn rpc_client(&self) -> Option<Arc<UnixRpcClient>> {
|
||||
self.persistent_container.clone().map(|x| x.rpc_client())
|
||||
(*self.persistent_container)
|
||||
.as_ref()
|
||||
.map(|x| x.rpc_client())
|
||||
}
|
||||
|
||||
fn _transition_abort(&self) {
|
||||
@@ -244,34 +233,30 @@ impl Manager {
|
||||
let transition = self.transition.clone();
|
||||
let manage_container = self.manage_container.clone();
|
||||
TransitionState::Restarting(tokio::spawn(async move {
|
||||
let mut desired_state = manage_container.desired_state();
|
||||
let mut current_state = manage_container.current_state();
|
||||
let _ = manage_container.set_override(Some(MainStatus::Restarting));
|
||||
manage_container.to_desired(StartStop::Stop);
|
||||
while current_state.borrow().is_start() {
|
||||
current_state.changed().await;
|
||||
current_state.changed().await.unwrap();
|
||||
}
|
||||
manage_container.to_desired(StartStop::Start);
|
||||
while current_state.borrow().is_stop() {
|
||||
current_state.changed().await;
|
||||
current_state.changed().await.unwrap();
|
||||
}
|
||||
transition.send_replace(Default::default());
|
||||
}))
|
||||
}
|
||||
fn _transition_backup(
|
||||
&self,
|
||||
mut backup_guard: BackupGuard,
|
||||
backup_guard: BackupGuard,
|
||||
) -> (TransitionState, BoxFuture<BackupReturn>) {
|
||||
let transition = self.transition.clone();
|
||||
let manage_container = self.manage_container.clone();
|
||||
let seed = self.seed.clone();
|
||||
let (send, done) = oneshot::channel();
|
||||
(
|
||||
TransitionState::BackingUp(tokio::spawn(
|
||||
async move {
|
||||
let mut desired_state = manage_container.desired_state();
|
||||
let state_reverter = DesiredStateReverter::new(manage_container.clone());
|
||||
let starting_desired = desired_state.borrow().clone();
|
||||
let mut current_state = manage_container.current_state();
|
||||
let mut tx = seed.ctx.db.handle();
|
||||
let _ = manage_container.set_override(Some(
|
||||
@@ -279,7 +264,7 @@ impl Manager {
|
||||
));
|
||||
manage_container.to_desired(StartStop::Stop);
|
||||
while current_state.borrow().is_start() {
|
||||
current_state.changed().await;
|
||||
current_state.changed().await.unwrap();
|
||||
}
|
||||
|
||||
let backup_guard = backup_guard.lock().await;
|
||||
@@ -333,8 +318,8 @@ fn configure(
|
||||
mut configure_context: ConfigureContext,
|
||||
) -> BoxFuture<'static, Result<BTreeMap<PackageId, TaggedDependencyError>, Error>> {
|
||||
async move {
|
||||
let db = ctx.db.handle();
|
||||
let tx = db.begin().await?;
|
||||
let mut db = ctx.db.handle();
|
||||
let mut tx = db.begin().await?;
|
||||
let db = &mut tx;
|
||||
|
||||
let receipts = ConfigReceipts::new(db).await?;
|
||||
@@ -353,11 +338,6 @@ fn configure(
|
||||
.await?
|
||||
.ok_or_else(not_found)?;
|
||||
let volumes = receipts.volumes.get(db, id).await?.ok_or_else(not_found)?;
|
||||
let is_needs_config = !receipts
|
||||
.configured
|
||||
.get(db, id)
|
||||
.await?
|
||||
.ok_or_else(not_found)?;
|
||||
let version = receipts.version.get(db, id).await?.ok_or_else(not_found)?;
|
||||
|
||||
// get current config and current spec
|
||||
@@ -394,7 +374,7 @@ fn configure(
|
||||
// create backreferences to pointers
|
||||
let mut sys = receipts
|
||||
.system_pointers
|
||||
.get(db, &id)
|
||||
.get(db, id)
|
||||
.await?
|
||||
.ok_or_else(not_found)?;
|
||||
sys.truncate(0);
|
||||
@@ -431,7 +411,7 @@ fn configure(
|
||||
ValueSpecPointer::System(s) => sys.push(s),
|
||||
}
|
||||
}
|
||||
receipts.system_pointers.set(db, sys, &id).await?;
|
||||
receipts.system_pointers.set(db, sys, id).await?;
|
||||
|
||||
let signal = if !configure_context.dry_run {
|
||||
// run config action
|
||||
@@ -475,7 +455,7 @@ fn configure(
|
||||
// update dependencies
|
||||
let prev_current_dependencies = receipts
|
||||
.current_dependencies
|
||||
.get(db, &id)
|
||||
.get(db, id)
|
||||
.await?
|
||||
.unwrap_or_default();
|
||||
remove_from_current_dependents_lists(
|
||||
@@ -495,7 +475,7 @@ fn configure(
|
||||
current_dependencies.0.remove(id);
|
||||
receipts
|
||||
.current_dependencies
|
||||
.set(db, current_dependencies.clone(), &id)
|
||||
.set(db, current_dependencies.clone(), id)
|
||||
.await?;
|
||||
|
||||
let errs = receipts
|
||||
@@ -512,7 +492,7 @@ fn configure(
|
||||
&receipts.dependency_receipt.try_heal,
|
||||
)
|
||||
.await?;
|
||||
receipts.dependency_errors.set(db, errs, &id).await?;
|
||||
receipts.dependency_errors.set(db, errs, id).await?;
|
||||
|
||||
// cache current config for dependents
|
||||
configure_context
|
||||
@@ -525,22 +505,18 @@ fn configure(
|
||||
.get(db, id)
|
||||
.await?
|
||||
.ok_or_else(not_found)?;
|
||||
let prev = if is_needs_config { None } else { old_config }
|
||||
.map(Value::Object)
|
||||
.unwrap_or_default();
|
||||
let next = Value::Object(config.clone());
|
||||
for (dependent, dep_info) in dependents.0.iter().filter(|(dep_id, _)| dep_id != &id) {
|
||||
let dependent_container = receipts.docker_containers.get(db, &dependent).await?;
|
||||
for (dependent, _dep_info) in dependents.0.iter().filter(|(dep_id, _)| dep_id != &id) {
|
||||
let dependent_container = receipts.docker_containers.get(db, dependent).await?;
|
||||
let dependent_container = &dependent_container;
|
||||
// check if config passes dependent check
|
||||
if let Some(cfg) = receipts
|
||||
.manifest_dependencies_config
|
||||
.get(db, (&dependent, &id))
|
||||
.get(db, (dependent, id))
|
||||
.await?
|
||||
{
|
||||
let manifest = receipts
|
||||
.manifest
|
||||
.get(db, &dependent)
|
||||
.get(db, dependent)
|
||||
.await?
|
||||
.ok_or_else(not_found)?;
|
||||
if let Err(error) = cfg
|
||||
@@ -605,7 +581,7 @@ struct DesiredStateReverter {
|
||||
}
|
||||
impl DesiredStateReverter {
|
||||
fn new(manage_container: Arc<ManageContainer>) -> Self {
|
||||
let starting_state = manage_container.desired_state().borrow().clone();
|
||||
let starting_state = *manage_container.desired_state().borrow();
|
||||
let manage_container = Some(manage_container);
|
||||
Self {
|
||||
starting_state,
|
||||
@@ -614,8 +590,8 @@ impl DesiredStateReverter {
|
||||
}
|
||||
async fn revert(mut self) {
|
||||
if let Some(mut current_state) = self._revert() {
|
||||
while &*current_state.borrow() != &self.starting_state {
|
||||
current_state.changed().await;
|
||||
while *current_state.borrow() != self.starting_state {
|
||||
current_state.changed().await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -646,7 +622,8 @@ fn finnish_up_backup_task(
|
||||
send.send(match result {
|
||||
Ok(a) => a,
|
||||
Err(e) => Err(e),
|
||||
});
|
||||
})
|
||||
.unwrap_or_default();
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
@@ -659,7 +636,7 @@ fn response_to_report(response: &Result<PackageBackupInfo, Error>) -> PackageBac
|
||||
}
|
||||
fn flatten_backup_error(input: Result<Result<PackageBackupInfo, Error>, Error>) -> BackupReturn {
|
||||
match input {
|
||||
Ok(a) | Ok(a) => BackupReturn::Ran {
|
||||
Ok(a) => BackupReturn::Ran {
|
||||
report: response_to_report(&a),
|
||||
res: a,
|
||||
},
|
||||
@@ -675,18 +652,6 @@ pub enum Status {
|
||||
Paused,
|
||||
Shutdown,
|
||||
}
|
||||
pub struct ManagerSharedState {
|
||||
seed: Arc<ManagerSeed>,
|
||||
persistent_container: Option<PersistentContainer>,
|
||||
status: (Sender<Status>, Receiver<Status>),
|
||||
killer: Notify,
|
||||
on_stop: Sender<OnStop>,
|
||||
synchronized: Notify,
|
||||
synchronize_now: Notify,
|
||||
commit_health_check_results: AtomicBool,
|
||||
next_gid: AtomicU32,
|
||||
main_gid: (Sender<ProcessGroupId>, Receiver<ProcessGroupId>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum OnStop {
|
||||
@@ -703,8 +668,6 @@ async fn run_main(
|
||||
persistent_container: ManagerPersistentContainer,
|
||||
started: Arc<impl Fn()>,
|
||||
) -> RunMainResult {
|
||||
let interfaces = main_interfaces(&seed)?;
|
||||
|
||||
let mut runtime = NonDetachingJoinHandle::from(tokio::spawn(start_up_image(seed.clone())));
|
||||
let ip = match persistent_container.is_some() {
|
||||
false => Some(match get_running_ip(&seed, &mut runtime).await {
|
||||
@@ -749,35 +712,6 @@ async fn start_up_image(seed: Arc<ManagerSeed>) -> Result<Result<NoOutput, (i32,
|
||||
.await
|
||||
}
|
||||
|
||||
fn main_interfaces(
|
||||
seed: &ManagerSeed,
|
||||
) -> Result<
|
||||
Vec<(
|
||||
InterfaceId,
|
||||
&crate::net::interface::Interface,
|
||||
TorSecretKeyV3,
|
||||
)>,
|
||||
Error,
|
||||
> {
|
||||
seed.manifest
|
||||
.interfaces
|
||||
.0
|
||||
.iter()
|
||||
.map(|(id, info)| {
|
||||
Ok((
|
||||
id.clone(),
|
||||
info,
|
||||
seed.tor_keys
|
||||
.get(id)
|
||||
.ok_or_else(|| {
|
||||
Error::new(eyre!("interface {} missing key", id), crate::ErrorKind::Tor)
|
||||
})?
|
||||
.clone(),
|
||||
))
|
||||
})
|
||||
.collect::<Result<Vec<_>, Error>>()
|
||||
}
|
||||
|
||||
async fn long_running_docker(
|
||||
seed: &ManagerSeed,
|
||||
container: &DockerContainer,
|
||||
@@ -896,17 +830,11 @@ async fn main_health_check_daemon(seed: Arc<ManagerSeed>) {
|
||||
}
|
||||
}
|
||||
|
||||
fn set_commit_health_true(state: &Arc<ManagerSharedState>) {
|
||||
state
|
||||
.commit_health_check_results
|
||||
.store(true, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
type RuntimeOfCommand = NonDetachingJoinHandle<Result<Result<NoOutput, (i32, String)>, Error>>;
|
||||
|
||||
async fn get_running_ip(seed: &ManagerSeed, mut runtime: &mut RuntimeOfCommand) -> GetRunningIp {
|
||||
loop {
|
||||
match container_inspect(&seed).await {
|
||||
match container_inspect(seed).await {
|
||||
Ok(res) => {
|
||||
match res
|
||||
.network_settings
|
||||
@@ -968,7 +896,7 @@ async fn send_signal(
|
||||
// .store(false, Ordering::SeqCst);
|
||||
|
||||
if let Some(rpc_client) = rpc_client {
|
||||
let main_gid = gid.main_gid.borrow().clone();
|
||||
let main_gid = *gid.main_gid.borrow();
|
||||
let next_gid = gid.new_gid();
|
||||
#[cfg(feature = "js_engine")]
|
||||
if let Err(e) = crate::procedure::js_scripts::JsProcedure::default()
|
||||
@@ -985,6 +913,7 @@ async fn send_signal(
|
||||
None, // TODO
|
||||
next_gid,
|
||||
Some(rpc_client),
|
||||
Arc::new(seed.ctx.clone()),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user