diff --git a/backend/src/control.rs b/backend/src/control.rs index a84fe7283..6d72a4043 100644 --- a/backend/src/control.rs +++ b/backend/src/control.rs @@ -27,7 +27,8 @@ pub async fn start(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<( .get(&(id, version)) .await .ok_or_else(|| Error::new(eyre!("Manager not found"), crate::ErrorKind::InvalidRequest))? - .start(); + .start() + .await; Ok(()) } @@ -62,7 +63,8 @@ pub async fn stop(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result Result .get(&(id, version)) .await .ok_or_else(|| Error::new(eyre!("Manager not found"), crate::ErrorKind::InvalidRequest))? - .restart(); + .restart() + .await; Ok(()) } diff --git a/backend/src/install/mod.rs b/backend/src/install/mod.rs index c2546e6eb..cb21739cc 100644 --- a/backend/src/install/mod.rs +++ b/backend/src/install/mod.rs @@ -22,7 +22,7 @@ use serde_json::{json, Value}; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWriteExt}; use tokio::process::Command; -use tokio::sync::{oneshot, Mutex}; +use tokio::sync::oneshot; use tokio_stream::wrappers::ReadDirStream; use tracing::instrument; @@ -666,23 +666,11 @@ pub async fn download_install_s9pk( ) -> Result<(), Error> { let pkg_id = &temp_manifest.id; let version = &temp_manifest.version; - let previous_state: Arc>> = Default::default(); let db = ctx.db.peek().await?; - let after_previous_state = previous_state.clone(); if let Result::<(), Error>::Err(e) = { let ctx = ctx.clone(); async move { - if db - .as_package_data() - .as_idx(&pkg_id) - .or_not_found(&pkg_id)? - .as_installed() - .is_some() - { - *previous_state.lock().await = - crate::control::stop(ctx.clone(), pkg_id.clone()).await.ok(); - } // // Build set of existing manifests let mut manifests = Vec::new(); for (_id, pkg) in db.as_package_data().as_entries()? { @@ -779,15 +767,6 @@ pub async fn download_install_s9pk( tracing::debug!("{:?}", e); } - let previous_state = after_previous_state.lock().await; - if previous_state - .as_ref() - .map(|x| x.running()) - .unwrap_or(false) - { - crate::control::start(ctx.clone(), pkg_id.clone()).await?; - } - Err(e) } else { Ok::<_, Error>(()) @@ -845,7 +824,12 @@ pub async fn install_s9pk( .await .with_kind(crate::ErrorKind::Deserialization)?, )), - Err(e) if e.status() == Some(StatusCode::BAD_REQUEST) => Ok(None), + Err(e) + if e.status() == Some(StatusCode::BAD_REQUEST) + || e.status() == Some(StatusCode::NOT_FOUND) => + { + Ok(None) + } Err(e) => Err(e), } .with_kind(crate::ErrorKind::Registry)? @@ -1033,6 +1017,12 @@ pub async fn install_s9pk( ) .await?; + let peek = ctx.db.peek().await?; + let prev = peek + .as_package_data() + .as_idx(pkg_id) + .or_not_found(pkg_id)? + .de()?; let mut sql_tx = ctx.secret_store.begin().await?; tracing::info!("Install {}@{}: Creating volumes", pkg_id, version); @@ -1095,12 +1085,6 @@ pub async fn install_s9pk( CurrentDependents(deps) }; - let peek = ctx.db.peek().await?; - let prev = peek - .as_package_data() - .as_idx(pkg_id) - .or_not_found(pkg_id)? - .de()?; let installed = InstalledPackageInfo { status: Status { configured: manifest.config.is_none(), @@ -1240,11 +1224,7 @@ pub async fn install_s9pk( manager.configure(configure_context).await?; } - if auto_start { - manager.start(); - } - - for to_configure in to_configure { + for to_configure in to_configure.into_iter().filter(|(dep, _)| dep != pkg_id) { if let Err(e) = async { ctx.managers .get(&to_configure) @@ -1266,6 +1246,10 @@ pub async fn install_s9pk( } } + if auto_start { + manager.start().await; + } + tracing::info!("Install {}@{}: Complete", pkg_id, version); Ok(()) diff --git a/backend/src/manager/mod.rs b/backend/src/manager/mod.rs index ceb535302..85e302cc5 100644 --- a/backend/src/manager/mod.rs +++ b/backend/src/manager/mod.rs @@ -111,7 +111,7 @@ pub struct Manager { seed: Arc, manage_container: Arc, - transition: Arc>>, + transition: Arc>, persistent_container: ManagerPersistentContainer, pub gid: Arc, @@ -140,60 +140,67 @@ impl Manager { }) } - pub fn start(&self) { - self._transition_abort(); - self.manage_container.to_desired(StartStop::Start); - } - pub fn stop(&self) { - self._transition_abort(); - self.manage_container.to_desired(StartStop::Stop); - } - pub fn restart(&self) { + /// awaiting this does not wait for the start to complete + pub async fn start(&self) { if self._is_transition_restart() { return; } - self._transition_replace(self._transition_restart()); + self._transition_abort().await; + self.manage_container.to_desired(StartStop::Start); } + + /// awaiting this does not wait for the stop to complete + pub async fn stop(&self) { + self._transition_abort().await; + self.manage_container.to_desired(StartStop::Stop); + } + /// awaiting this does not wait for the restart to complete + pub async fn restart(&self) { + if self._is_transition_restart() + && *self.manage_container.desired_state().borrow() == StartStop::Stop + { + return; + } + if self.manage_container.desired_state().borrow().is_start() { + self._transition_replace(self._transition_restart()).await; + } + } + /// awaiting this does not wait for the restart to complete pub async fn configure( &self, configure_context: ConfigureContext, ) -> Result, Error> { - if self._is_transition_configure() { - return Ok(configure_context.breakages); + if self._is_transition_restart() { + self._transition_abort().await; + } else if self._is_transition_backup() { + return Err(Error::new( + eyre!("Can't configure because service is backing up"), + ErrorKind::InvalidRequest, + )); } let context = self.seed.ctx.clone(); let id = self.seed.manifest.id.clone(); let breakages = configure(context, id, configure_context).await?; - self._transition_replace({ - let manage_container = self.manage_container.clone(); - let state_reverter = DesiredStateReverter::new(manage_container.clone()); - let transition = self.transition.clone(); - TransitionState::Configuring( - tokio::spawn(async move { - manage_container.wait_for_desired(StartStop::Stop).await; + self.restart().await; - state_reverter.revert().await; - transition.send_replace(Default::default()); - }) - .into(), - ) - }); Ok(breakages) } + + /// awaiting this does not wait for the backup to complete pub async fn backup(&self, backup_guard: BackupGuard) -> BackupReturn { if self._is_transition_backup() { return BackupReturn::AlreadyRunning(PackageBackupReport { - error: Some("Can't do backup because service is in a backing up state".to_owned()), + error: Some("Can't do backup because service is already backing up".to_owned()), }); } let (transition_state, done) = self._transition_backup(backup_guard); - self._transition_replace(transition_state); + self._transition_replace(transition_state).await; done.await } pub async fn exit(&self) { - self._transition_abort(); + self._transition_abort().await; self.manage_container .wait_for_desired(StartStop::Stop) .await; @@ -220,19 +227,14 @@ impl Manager { .map(|x| x.rpc_client()) } - fn _transition_abort(&self) { - if let Some(transition) = self - .transition - .send_replace(Default::default()) - .join_handle() - { - (**transition).abort(); - } - } - fn _transition_replace(&self, transition_state: TransitionState) { + async fn _transition_abort(&self) { self.transition - .send_replace(Arc::new(transition_state)) - .abort(); + .send_replace(Default::default()) + .abort() + .await; + } + async fn _transition_replace(&self, transition_state: TransitionState) { + self.transition.send_replace(transition_state).abort().await; } pub(super) fn perform_restart(&self) -> impl Future> + 'static { @@ -322,15 +324,11 @@ impl Manager { } fn _is_transition_restart(&self) -> bool { let transition = self.transition.borrow(); - matches!(**transition, TransitionState::Restarting(_)) + matches!(*transition, TransitionState::Restarting(_)) } fn _is_transition_backup(&self) -> bool { let transition = self.transition.borrow(); - matches!(**transition, TransitionState::BackingUp(_)) - } - fn _is_transition_configure(&self) -> bool { - let transition = self.transition.borrow(); - matches!(**transition, TransitionState::Configuring(_)) + matches!(*transition, TransitionState::BackingUp(_)) } } @@ -602,7 +600,7 @@ impl Drop for DesiredStateReverter { type BackupDoneSender = oneshot::Sender>; fn finish_up_backup_task( - transition: Arc>>, + transition: Arc>, send: BackupDoneSender, ) -> impl FnOnce(Result, Error>) -> BoxFuture<'static, ()> { move |result| { diff --git a/backend/src/manager/transition_state.rs b/backend/src/manager/transition_state.rs index 6826aef09..122c0f703 100644 --- a/backend/src/manager/transition_state.rs +++ b/backend/src/manager/transition_state.rs @@ -5,21 +5,26 @@ use helpers::NonDetachingJoinHandle; pub(super) enum TransitionState { BackingUp(NonDetachingJoinHandle<()>), Restarting(NonDetachingJoinHandle<()>), - Configuring(NonDetachingJoinHandle<()>), None, } impl TransitionState { - pub(super) fn join_handle(&self) -> Option<&NonDetachingJoinHandle<()>> { + pub(super) fn take(&mut self) -> Self { + std::mem::take(self) + } + pub(super) fn into_join_handle(self) -> Option> { Some(match self { TransitionState::BackingUp(a) => a, TransitionState::Restarting(a) => a, - TransitionState::Configuring(a) => a, TransitionState::None => return None, }) } - pub(super) fn abort(&self) { - self.join_handle().map(|transition| transition.abort()); + pub(super) async fn abort(&mut self) { + if let Some(s) = self.take().into_join_handle() { + if s.wait_for_abort().await.is_ok() { + tracing::trace!("transition completed before abort"); + } + } } } diff --git a/backend/src/util/mod.rs b/backend/src/util/mod.rs index 428bf5c6e..7f2c62b5d 100644 --- a/backend/src/util/mod.rs +++ b/backend/src/util/mod.rs @@ -6,6 +6,7 @@ use std::pin::Pin; use std::process::Stdio; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Duration; use async_trait::async_trait; use clap::ArgMatches; @@ -50,13 +51,31 @@ impl std::error::Error for Never {} #[async_trait::async_trait] pub trait Invoke { async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result, Error>; + async fn invoke_timeout( + &mut self, + error_kind: crate::ErrorKind, + timeout: Option, + ) -> Result, Error>; } #[async_trait::async_trait] impl Invoke for tokio::process::Command { async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result, Error> { + self.invoke_timeout(error_kind, None).await + } + async fn invoke_timeout( + &mut self, + error_kind: crate::ErrorKind, + timeout: Option, + ) -> Result, Error> { + self.kill_on_drop(true); self.stdout(Stdio::piped()); self.stderr(Stdio::piped()); - let res = self.output().await?; + let res = match timeout { + None => self.output().await?, + Some(t) => tokio::time::timeout(t, self.output()) + .await + .with_kind(ErrorKind::Timeout)??, + }; crate::ensure_code!( res.status.success(), error_kind, diff --git a/libs/helpers/src/lib.rs b/libs/helpers/src/lib.rs index b854f142c..226787590 100644 --- a/libs/helpers/src/lib.rs +++ b/libs/helpers/src/lib.rs @@ -70,6 +70,12 @@ pub async fn canonicalize( #[pin_project::pin_project(PinnedDrop)] pub struct NonDetachingJoinHandle(#[pin] JoinHandle); +impl NonDetachingJoinHandle { + pub async fn wait_for_abort(self) -> Result { + self.abort(); + self.await + } +} impl From> for NonDetachingJoinHandle { fn from(t: JoinHandle) -> Self { NonDetachingJoinHandle(t) diff --git a/libs/models/src/errors.rs b/libs/models/src/errors.rs index c7c733ca6..f22624d36 100644 --- a/libs/models/src/errors.rs +++ b/libs/models/src/errors.rs @@ -80,6 +80,7 @@ pub enum ErrorKind { Lshw = 68, CpuSettings = 69, Firmware = 70, + Timeout = 71, } impl ErrorKind { pub fn as_str(&self) -> &'static str { @@ -155,6 +156,7 @@ impl ErrorKind { Lshw => "LSHW Error", CpuSettings => "CPU Settings Error", Firmware => "Firmware Error", + Timeout => "Timeout Error", } } }