fix some causes of start wonkiness on update (#2458)

* fix some causes of start wonkiness on update

* fix race condition with manager

Co-authored-by: J H <Blu-J@users.noreply.github.com>

* only restart if running

* fix start function

* clean up clode

* fix restart logic

---------

Co-authored-by: J H <Blu-J@users.noreply.github.com>
This commit is contained in:
Aiden McClelland
2023-10-16 12:34:12 -06:00
committed by GitHub
parent 5164c21923
commit 78faf888af
7 changed files with 108 additions and 91 deletions

View File

@@ -27,7 +27,8 @@ pub async fn start(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<(
.get(&(id, version)) .get(&(id, version))
.await .await
.ok_or_else(|| Error::new(eyre!("Manager not found"), crate::ErrorKind::InvalidRequest))? .ok_or_else(|| Error::new(eyre!("Manager not found"), crate::ErrorKind::InvalidRequest))?
.start(); .start()
.await;
Ok(()) Ok(())
} }
@@ -62,7 +63,8 @@ pub async fn stop(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<Ma
.get(&(id, version)) .get(&(id, version))
.await .await
.ok_or_else(|| Error::new(eyre!("Manager not found"), crate::ErrorKind::InvalidRequest))? .ok_or_else(|| Error::new(eyre!("Manager not found"), crate::ErrorKind::InvalidRequest))?
.stop(); .stop()
.await;
Ok(last_statuts) Ok(last_statuts)
} }
@@ -83,7 +85,8 @@ pub async fn restart(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result
.get(&(id, version)) .get(&(id, version))
.await .await
.ok_or_else(|| Error::new(eyre!("Manager not found"), crate::ErrorKind::InvalidRequest))? .ok_or_else(|| Error::new(eyre!("Manager not found"), crate::ErrorKind::InvalidRequest))?
.restart(); .restart()
.await;
Ok(()) Ok(())
} }

View File

@@ -22,7 +22,7 @@ use serde_json::{json, Value};
use tokio::fs::{File, OpenOptions}; use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWriteExt};
use tokio::process::Command; use tokio::process::Command;
use tokio::sync::{oneshot, Mutex}; use tokio::sync::oneshot;
use tokio_stream::wrappers::ReadDirStream; use tokio_stream::wrappers::ReadDirStream;
use tracing::instrument; use tracing::instrument;
@@ -666,23 +666,11 @@ pub async fn download_install_s9pk(
) -> Result<(), Error> { ) -> Result<(), Error> {
let pkg_id = &temp_manifest.id; let pkg_id = &temp_manifest.id;
let version = &temp_manifest.version; let version = &temp_manifest.version;
let previous_state: Arc<Mutex<Option<MainStatus>>> = Default::default();
let db = ctx.db.peek().await?; let db = ctx.db.peek().await?;
let after_previous_state = previous_state.clone();
if let Result::<(), Error>::Err(e) = { if let Result::<(), Error>::Err(e) = {
let ctx = ctx.clone(); let ctx = ctx.clone();
async move { async move {
if db
.as_package_data()
.as_idx(&pkg_id)
.or_not_found(&pkg_id)?
.as_installed()
.is_some()
{
*previous_state.lock().await =
crate::control::stop(ctx.clone(), pkg_id.clone()).await.ok();
}
// // Build set of existing manifests // // Build set of existing manifests
let mut manifests = Vec::new(); let mut manifests = Vec::new();
for (_id, pkg) in db.as_package_data().as_entries()? { for (_id, pkg) in db.as_package_data().as_entries()? {
@@ -779,15 +767,6 @@ pub async fn download_install_s9pk(
tracing::debug!("{:?}", e); tracing::debug!("{:?}", e);
} }
let previous_state = after_previous_state.lock().await;
if previous_state
.as_ref()
.map(|x| x.running())
.unwrap_or(false)
{
crate::control::start(ctx.clone(), pkg_id.clone()).await?;
}
Err(e) Err(e)
} else { } else {
Ok::<_, Error>(()) Ok::<_, Error>(())
@@ -845,7 +824,12 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
.await .await
.with_kind(crate::ErrorKind::Deserialization)?, .with_kind(crate::ErrorKind::Deserialization)?,
)), )),
Err(e) if e.status() == Some(StatusCode::BAD_REQUEST) => Ok(None), Err(e)
if e.status() == Some(StatusCode::BAD_REQUEST)
|| e.status() == Some(StatusCode::NOT_FOUND) =>
{
Ok(None)
}
Err(e) => Err(e), Err(e) => Err(e),
} }
.with_kind(crate::ErrorKind::Registry)? .with_kind(crate::ErrorKind::Registry)?
@@ -1033,6 +1017,12 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
) )
.await?; .await?;
let peek = ctx.db.peek().await?;
let prev = peek
.as_package_data()
.as_idx(pkg_id)
.or_not_found(pkg_id)?
.de()?;
let mut sql_tx = ctx.secret_store.begin().await?; let mut sql_tx = ctx.secret_store.begin().await?;
tracing::info!("Install {}@{}: Creating volumes", pkg_id, version); tracing::info!("Install {}@{}: Creating volumes", pkg_id, version);
@@ -1095,12 +1085,6 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
CurrentDependents(deps) CurrentDependents(deps)
}; };
let peek = ctx.db.peek().await?;
let prev = peek
.as_package_data()
.as_idx(pkg_id)
.or_not_found(pkg_id)?
.de()?;
let installed = InstalledPackageInfo { let installed = InstalledPackageInfo {
status: Status { status: Status {
configured: manifest.config.is_none(), configured: manifest.config.is_none(),
@@ -1240,11 +1224,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
manager.configure(configure_context).await?; manager.configure(configure_context).await?;
} }
if auto_start { for to_configure in to_configure.into_iter().filter(|(dep, _)| dep != pkg_id) {
manager.start();
}
for to_configure in to_configure {
if let Err(e) = async { if let Err(e) = async {
ctx.managers ctx.managers
.get(&to_configure) .get(&to_configure)
@@ -1266,6 +1246,10 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
} }
} }
if auto_start {
manager.start().await;
}
tracing::info!("Install {}@{}: Complete", pkg_id, version); tracing::info!("Install {}@{}: Complete", pkg_id, version);
Ok(()) Ok(())

View File

@@ -111,7 +111,7 @@ pub struct Manager {
seed: Arc<ManagerSeed>, seed: Arc<ManagerSeed>,
manage_container: Arc<manager_container::ManageContainer>, manage_container: Arc<manager_container::ManageContainer>,
transition: Arc<watch::Sender<Arc<TransitionState>>>, transition: Arc<watch::Sender<TransitionState>>,
persistent_container: ManagerPersistentContainer, persistent_container: ManagerPersistentContainer,
pub gid: Arc<Gid>, pub gid: Arc<Gid>,
@@ -140,60 +140,67 @@ impl Manager {
}) })
} }
pub fn start(&self) { /// awaiting this does not wait for the start to complete
self._transition_abort(); pub async fn start(&self) {
self.manage_container.to_desired(StartStop::Start);
}
pub fn stop(&self) {
self._transition_abort();
self.manage_container.to_desired(StartStop::Stop);
}
pub fn restart(&self) {
if self._is_transition_restart() { if self._is_transition_restart() {
return; return;
} }
self._transition_replace(self._transition_restart()); self._transition_abort().await;
self.manage_container.to_desired(StartStop::Start);
} }
/// awaiting this does not wait for the stop to complete
pub async fn stop(&self) {
self._transition_abort().await;
self.manage_container.to_desired(StartStop::Stop);
}
/// awaiting this does not wait for the restart to complete
pub async fn restart(&self) {
if self._is_transition_restart()
&& *self.manage_container.desired_state().borrow() == StartStop::Stop
{
return;
}
if self.manage_container.desired_state().borrow().is_start() {
self._transition_replace(self._transition_restart()).await;
}
}
/// awaiting this does not wait for the restart to complete
pub async fn configure( pub async fn configure(
&self, &self,
configure_context: ConfigureContext, configure_context: ConfigureContext,
) -> Result<BTreeMap<PackageId, String>, Error> { ) -> Result<BTreeMap<PackageId, String>, Error> {
if self._is_transition_configure() { if self._is_transition_restart() {
return Ok(configure_context.breakages); self._transition_abort().await;
} else if self._is_transition_backup() {
return Err(Error::new(
eyre!("Can't configure because service is backing up"),
ErrorKind::InvalidRequest,
));
} }
let context = self.seed.ctx.clone(); let context = self.seed.ctx.clone();
let id = self.seed.manifest.id.clone(); let id = self.seed.manifest.id.clone();
let breakages = configure(context, id, configure_context).await?; let breakages = configure(context, id, configure_context).await?;
self._transition_replace({
let manage_container = self.manage_container.clone();
let state_reverter = DesiredStateReverter::new(manage_container.clone());
let transition = self.transition.clone(); self.restart().await;
TransitionState::Configuring(
tokio::spawn(async move {
manage_container.wait_for_desired(StartStop::Stop).await;
state_reverter.revert().await;
transition.send_replace(Default::default());
})
.into(),
)
});
Ok(breakages) Ok(breakages)
} }
/// awaiting this does not wait for the backup to complete
pub async fn backup(&self, backup_guard: BackupGuard) -> BackupReturn { pub async fn backup(&self, backup_guard: BackupGuard) -> BackupReturn {
if self._is_transition_backup() { if self._is_transition_backup() {
return BackupReturn::AlreadyRunning(PackageBackupReport { return BackupReturn::AlreadyRunning(PackageBackupReport {
error: Some("Can't do backup because service is in a backing up state".to_owned()), error: Some("Can't do backup because service is already backing up".to_owned()),
}); });
} }
let (transition_state, done) = self._transition_backup(backup_guard); let (transition_state, done) = self._transition_backup(backup_guard);
self._transition_replace(transition_state); self._transition_replace(transition_state).await;
done.await done.await
} }
pub async fn exit(&self) { pub async fn exit(&self) {
self._transition_abort(); self._transition_abort().await;
self.manage_container self.manage_container
.wait_for_desired(StartStop::Stop) .wait_for_desired(StartStop::Stop)
.await; .await;
@@ -220,19 +227,14 @@ impl Manager {
.map(|x| x.rpc_client()) .map(|x| x.rpc_client())
} }
fn _transition_abort(&self) { async fn _transition_abort(&self) {
if let Some(transition) = self
.transition
.send_replace(Default::default())
.join_handle()
{
(**transition).abort();
}
}
fn _transition_replace(&self, transition_state: TransitionState) {
self.transition self.transition
.send_replace(Arc::new(transition_state)) .send_replace(Default::default())
.abort(); .abort()
.await;
}
async fn _transition_replace(&self, transition_state: TransitionState) {
self.transition.send_replace(transition_state).abort().await;
} }
pub(super) fn perform_restart(&self) -> impl Future<Output = Result<(), Error>> + 'static { pub(super) fn perform_restart(&self) -> impl Future<Output = Result<(), Error>> + 'static {
@@ -322,15 +324,11 @@ impl Manager {
} }
fn _is_transition_restart(&self) -> bool { fn _is_transition_restart(&self) -> bool {
let transition = self.transition.borrow(); let transition = self.transition.borrow();
matches!(**transition, TransitionState::Restarting(_)) matches!(*transition, TransitionState::Restarting(_))
} }
fn _is_transition_backup(&self) -> bool { fn _is_transition_backup(&self) -> bool {
let transition = self.transition.borrow(); let transition = self.transition.borrow();
matches!(**transition, TransitionState::BackingUp(_)) matches!(*transition, TransitionState::BackingUp(_))
}
fn _is_transition_configure(&self) -> bool {
let transition = self.transition.borrow();
matches!(**transition, TransitionState::Configuring(_))
} }
} }
@@ -602,7 +600,7 @@ impl Drop for DesiredStateReverter {
type BackupDoneSender = oneshot::Sender<Result<PackageBackupInfo, Error>>; type BackupDoneSender = oneshot::Sender<Result<PackageBackupInfo, Error>>;
fn finish_up_backup_task( fn finish_up_backup_task(
transition: Arc<Sender<Arc<TransitionState>>>, transition: Arc<Sender<TransitionState>>,
send: BackupDoneSender, send: BackupDoneSender,
) -> impl FnOnce(Result<Result<PackageBackupInfo, Error>, Error>) -> BoxFuture<'static, ()> { ) -> impl FnOnce(Result<Result<PackageBackupInfo, Error>, Error>) -> BoxFuture<'static, ()> {
move |result| { move |result| {

View File

@@ -5,21 +5,26 @@ use helpers::NonDetachingJoinHandle;
pub(super) enum TransitionState { pub(super) enum TransitionState {
BackingUp(NonDetachingJoinHandle<()>), BackingUp(NonDetachingJoinHandle<()>),
Restarting(NonDetachingJoinHandle<()>), Restarting(NonDetachingJoinHandle<()>),
Configuring(NonDetachingJoinHandle<()>),
None, None,
} }
impl TransitionState { impl TransitionState {
pub(super) fn join_handle(&self) -> Option<&NonDetachingJoinHandle<()>> { pub(super) fn take(&mut self) -> Self {
std::mem::take(self)
}
pub(super) fn into_join_handle(self) -> Option<NonDetachingJoinHandle<()>> {
Some(match self { Some(match self {
TransitionState::BackingUp(a) => a, TransitionState::BackingUp(a) => a,
TransitionState::Restarting(a) => a, TransitionState::Restarting(a) => a,
TransitionState::Configuring(a) => a,
TransitionState::None => return None, TransitionState::None => return None,
}) })
} }
pub(super) fn abort(&self) { pub(super) async fn abort(&mut self) {
self.join_handle().map(|transition| transition.abort()); if let Some(s) = self.take().into_join_handle() {
if s.wait_for_abort().await.is_ok() {
tracing::trace!("transition completed before abort");
}
}
} }
} }

View File

@@ -6,6 +6,7 @@ use std::pin::Pin;
use std::process::Stdio; use std::process::Stdio;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use clap::ArgMatches; use clap::ArgMatches;
@@ -50,13 +51,31 @@ impl std::error::Error for Never {}
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Invoke { pub trait Invoke {
async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result<Vec<u8>, Error>; async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result<Vec<u8>, Error>;
async fn invoke_timeout(
&mut self,
error_kind: crate::ErrorKind,
timeout: Option<Duration>,
) -> Result<Vec<u8>, Error>;
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl Invoke for tokio::process::Command { impl Invoke for tokio::process::Command {
async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result<Vec<u8>, Error> { async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result<Vec<u8>, Error> {
self.invoke_timeout(error_kind, None).await
}
async fn invoke_timeout(
&mut self,
error_kind: crate::ErrorKind,
timeout: Option<Duration>,
) -> Result<Vec<u8>, Error> {
self.kill_on_drop(true);
self.stdout(Stdio::piped()); self.stdout(Stdio::piped());
self.stderr(Stdio::piped()); self.stderr(Stdio::piped());
let res = self.output().await?; let res = match timeout {
None => self.output().await?,
Some(t) => tokio::time::timeout(t, self.output())
.await
.with_kind(ErrorKind::Timeout)??,
};
crate::ensure_code!( crate::ensure_code!(
res.status.success(), res.status.success(),
error_kind, error_kind,

View File

@@ -70,6 +70,12 @@ pub async fn canonicalize(
#[pin_project::pin_project(PinnedDrop)] #[pin_project::pin_project(PinnedDrop)]
pub struct NonDetachingJoinHandle<T>(#[pin] JoinHandle<T>); pub struct NonDetachingJoinHandle<T>(#[pin] JoinHandle<T>);
impl<T> NonDetachingJoinHandle<T> {
pub async fn wait_for_abort(self) -> Result<T, JoinError> {
self.abort();
self.await
}
}
impl<T> From<JoinHandle<T>> for NonDetachingJoinHandle<T> { impl<T> From<JoinHandle<T>> for NonDetachingJoinHandle<T> {
fn from(t: JoinHandle<T>) -> Self { fn from(t: JoinHandle<T>) -> Self {
NonDetachingJoinHandle(t) NonDetachingJoinHandle(t)

View File

@@ -80,6 +80,7 @@ pub enum ErrorKind {
Lshw = 68, Lshw = 68,
CpuSettings = 69, CpuSettings = 69,
Firmware = 70, Firmware = 70,
Timeout = 71,
} }
impl ErrorKind { impl ErrorKind {
pub fn as_str(&self) -> &'static str { pub fn as_str(&self) -> &'static str {
@@ -155,6 +156,7 @@ impl ErrorKind {
Lshw => "LSHW Error", Lshw => "LSHW Error",
CpuSettings => "CPU Settings Error", CpuSettings => "CPU Settings Error",
Firmware => "Firmware Error", Firmware => "Firmware Error",
Timeout => "Timeout Error",
} }
} }
} }