chore: Simplify the state into one

This commit is contained in:
J H
2024-03-06 09:38:55 -07:00
parent 88028412bd
commit 093a5d4ddf
8 changed files with 174 additions and 153 deletions

View File

@@ -9,7 +9,9 @@ struct Start;
impl Handler<Start> for ServiceActor {
type Response = ();
async fn handle(&mut self, _: Start, _: &mut BackgroundJobs) -> Self::Response {
self.0.desired_state.send_replace(StartStop::Start);
self.0.persistent_container.state.send_modify(|x| {
x.desired_state = StartStop::Start;
});
self.0.synchronized.notified().await
}
}
@@ -24,16 +26,15 @@ struct Stop;
impl Handler<Stop> for ServiceActor {
type Response = ();
async fn handle(&mut self, _: Stop, _: &mut BackgroundJobs) -> Self::Response {
self.0.desired_state.send_replace(StartStop::Stop);
if self.0.transition_state.borrow().as_ref().map(|t| t.kind())
== Some(TransitionKind::Restarting)
{
if let Some(restart) = self.0.transition_state.send_replace(None) {
restart.abort().await;
} else {
#[cfg(feature = "unstable")]
unreachable!()
let mut transition_state = None;
self.0.persistent_container.state.send_modify(|x| {
x.desired_state = StartStop::Stop;
if x.transition_state.as_ref().map(|x| x.kind()) == Some(TransitionKind::Restarting) {
transition_state = std::mem::take(&mut x.transition_state);
}
});
if let Some(restart) = transition_state {
restart.abort().await;
}
self.0.synchronized.notified().await
}

View File

@@ -1,5 +1,5 @@
use std::sync::Arc;
use std::time::Duration;
use std::{ops::Deref, sync::Arc};
use chrono::{DateTime, Utc};
use clap::Parser;
@@ -70,23 +70,17 @@ impl Service {
#[instrument(skip_all)]
async fn new(ctx: RpcContext, s9pk: S9pk, start: StartStop) -> Result<Self, Error> {
let id = s9pk.as_manifest().id.clone();
let desired_state = watch::channel(start).0;
let temp_desired_state = TempDesiredState(Arc::new(watch::channel(None).0));
let persistent_container = PersistentContainer::new(
&ctx,
s9pk,
&ctx, s9pk,
start,
// desired_state.subscribe(),
// temp_desired_state.subscribe(),
)
.await?;
let seed = Arc::new(ServiceActorSeed {
id,
running_status: persistent_container.running_status.subscribe(),
persistent_container,
ctx,
desired_state,
temp_desired_state,
transition_state: Arc::new(watch::channel(None).0),
synchronized: Arc::new(Notify::new()),
});
seed.persistent_container
@@ -383,7 +377,7 @@ impl Service {
.await?;
self.shutdown().await
}
pub async fn backup(&self, guard: impl GenericMountGuard) -> Result<BackupReturn, Error> {
pub async fn backup(&self, _guard: impl GenericMountGuard) -> Result<BackupReturn, Error> {
// TODO
Err(Error::new(eyre!("not yet implemented"), ErrorKind::Unknown))
}
@@ -395,45 +389,36 @@ struct RunningStatus {
started: DateTime<Utc>,
}
pub(self) struct ServiceActorSeed {
struct ServiceActorSeed {
ctx: RpcContext,
id: PackageId,
// Needed to interact with the container for the service
/// Needed to interact with the container for the service
persistent_container: PersistentContainer,
// Setting this value causes the service actor to try to bring the service to the specified state. This is done in the background job created in ServiceActor::init
desired_state: watch::Sender<StartStop>,
// Override the current desired state for the service during a transition (this is protected by a guard that sets this value to null on drop)
temp_desired_state: TempDesiredState,
// This represents a currently running task that affects the service's shown state, such as BackingUp or Restarting.
transition_state: Arc<watch::Sender<Option<TransitionState>>>,
// This contains the start time and health check information for when the service is running. Note: Will be overwritting to the db,
running_status: watch::Receiver<Option<RunningStatus>>,
// This is notified every time the background job created in ServiceActor::init responds to a change
/// This is notified every time the background job created in ServiceActor::init responds to a change
synchronized: Arc<Notify>,
}
impl ServiceActorSeed {
/// Used to indicate that we have finished the task of starting the service
pub fn started(&self) {
self.persistent_container
.current_state
.send_replace(StartStop::Start);
self.persistent_container
.running_status
.send_modify(|running_status| {
*running_status =
Some(
std::mem::take(running_status).unwrap_or_else(|| RunningStatus {
self.persistent_container.state.send_modify(|state| {
state.running_status =
Some(
state
.running_status
.take()
.unwrap_or_else(|| RunningStatus {
health: Default::default(),
started: Utc::now(),
}),
);
})
);
});
}
/// Used to indicate that we have finished the task of stopping the service
pub fn stopped(&self) {
self.persistent_container
.current_state
.send_replace(StartStop::Stop);
self.persistent_container.running_status.send_replace(None);
self.persistent_container.state.send_modify(|state| {
state.running_status = None;
});
}
}
struct ServiceActor(Arc<ServiceActorSeed>);
@@ -443,20 +428,41 @@ impl Actor for ServiceActor {
let seed = self.0.clone();
jobs.add_job(async move {
let id = seed.id.clone();
let mut current = seed.persistent_container.current_state.subscribe();
let mut desired = seed.desired_state.subscribe();
let mut temp_desired = seed.temp_desired_state.subscribe();
let mut transition = seed.transition_state.subscribe();
let mut running = seed.running_status.clone();
let mut current = seed.persistent_container.state.subscribe();
loop {
let (desired_state, current_state, transition_kind, running_status) = dbg!(
temp_desired.borrow().unwrap_or(*desired.borrow()),
*current.borrow(),
transition.borrow().as_ref().map(|t| t.kind()),
running.borrow().clone(),
);
let kinds = dbg!(current.borrow().kinds());
if let Err(e) = async {
let main_status = match (
kinds.transition_state,
kinds.desired_state,
kinds.running_status,
) {
(Some(TransitionKind::Restarting), _, _) => MainStatus::Restarting,
(Some(TransitionKind::BackingUp), _, Some(status)) => {
MainStatus::BackingUp {
started: Some(status.started),
health: status.health.clone(),
}
}
(Some(TransitionKind::BackingUp), _, None) => MainStatus::BackingUp {
started: None,
health: OrdMap::new(),
},
(None, StartStop::Stop, None) => MainStatus::Stopped,
(None, StartStop::Stop, Some(_)) => MainStatus::Stopping {
timeout: seed.persistent_container.stop().await?.into(),
},
(None, StartStop::Start, Some(status)) => MainStatus::Running {
started: status.started,
health: status.health.clone(),
},
(None, StartStop::Start, None) => {
seed.persistent_container.start().await?;
MainStatus::Starting
}
};
seed.ctx
.db
.mutate(|d| {
@@ -466,61 +472,13 @@ impl Actor for ServiceActor {
.as_idx_mut(&id)
.and_then(|p| p.as_installed_mut())
{
i.as_status_mut().as_main_mut().ser(&match (
transition_kind,
desired_state,
current_state,
running_status,
) {
(Some(TransitionKind::Restarting), _, _, _) => {
MainStatus::Restarting
}
(Some(TransitionKind::BackingUp), _, _, Some(status)) => {
MainStatus::BackingUp {
started: Some(status.started),
health: status.health.clone(),
}
}
(Some(TransitionKind::BackingUp), _, _, None) => {
MainStatus::BackingUp {
started: None,
health: OrdMap::new(),
}
}
(None, StartStop::Stop, StartStop::Stop, _) => {
MainStatus::Stopped
}
(None, StartStop::Stop, StartStop::Start, _) => {
MainStatus::Stopping {
timeout: todo!("sigterm timeout"),
}
}
(None, StartStop::Start, StartStop::Stop, _)
| (None, StartStop::Start, StartStop::Start, None) => {
MainStatus::Starting
}
(None, StartStop::Start, StartStop::Start, Some(status)) => {
MainStatus::Running {
started: status.started,
health: status.health.clone(),
}
}
})?;
i.as_status_mut().as_main_mut().ser(&main_status)?;
}
Ok(())
})
.await?;
match (desired_state, current_state) {
(StartStop::Start, StartStop::Stop) => {
seed.persistent_container.start().await
}
(StartStop::Stop, StartStop::Start) => {
seed.persistent_container
.stop(todo!("s9pk sigterm timeout"))
.await
}
_ => Ok(()),
}
Ok::<_, Error>(())
}
.await
{
@@ -538,10 +496,6 @@ impl Actor for ServiceActor {
tokio::select! {
_ = current.changed() => (),
_ = desired.changed() => (),
_ = temp_desired.changed() => (),
_ = transition.changed() => (),
_ = running.changed() => (),
}
}
})

View File

@@ -15,8 +15,11 @@ use tokio::process::Command;
use tokio::sync::{oneshot, watch, Mutex, OnceCell};
use tracing::instrument;
use super::service_effect_handler::{service_effect_handler, EffectContext};
use super::ServiceActorSeed;
use super::{
service_effect_handler::{service_effect_handler, EffectContext},
transition::{TempDesiredState, TransitionKind},
};
use super::{transition::TransitionState, ServiceActorSeed};
use crate::context::RpcContext;
use crate::disk::mount::filesystem::bind::Bind;
use crate::disk::mount::filesystem::idmapped::IdMapped;
@@ -39,6 +42,43 @@ const RPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
struct ProcedureId(u64);
#[derive(Debug)]
pub struct ServiceState {
// This contains the start time and health check information for when the service is running. Note: Will be overwritting to the db,
pub(super) running_status: Option<RunningStatus>,
/// Setting this value causes the service actor to try to bring the service to the specified state. This is done in the background job created in ServiceActor::init
pub(super) desired_state: StartStop,
/// Override the current desired state for the service during a transition (this is protected by a guard that sets this value to null on drop)
pub(super) temp_desired_state: Option<StartStop>,
/// This represents a currently running task that affects the service's shown state, such as BackingUp or Restarting.
pub(super) transition_state: Option<TransitionState>,
}
#[derive(Debug)]
pub struct ServiceStateKinds {
pub transition_state: Option<TransitionKind>,
pub running_status: Option<RunningStatus>,
pub desired_state: StartStop,
}
impl ServiceState {
pub fn new(desired_state: StartStop) -> Self {
Self {
running_status: Default::default(),
temp_desired_state: Default::default(),
transition_state: Default::default(),
desired_state,
}
}
pub fn kinds(&self) -> ServiceStateKinds {
ServiceStateKinds {
transition_state: self.transition_state.as_ref().map(|x| x.kind()),
desired_state: self.temp_desired_state.unwrap_or(self.desired_state),
running_status: self.running_status.clone(),
}
}
}
// @DRB On top of this we need to also have the procedures to have the effects and get the results back for them, maybe lock them to the running instance?
/// This contains the LXC container running the javascript init system
/// that can be used via a JSON RPC Client connected to a unix domain
@@ -53,20 +93,12 @@ pub struct PersistentContainer {
volumes: BTreeMap<VolumeId, MountGuard>,
assets: BTreeMap<VolumeId, MountGuard>,
pub(super) overlays: Arc<Mutex<BTreeMap<InternedString, OverlayGuard>>>,
pub(super) current_state: watch::Sender<StartStop>,
// pub(super) desired_state: watch::Receiver<StartStop>,
// pub(super) temp_desired_state: watch::Receiver<Option<StartStop>>,
pub(super) running_status: watch::Sender<Option<RunningStatus>>,
pub(super) state: Arc<watch::Sender<ServiceState>>,
}
impl PersistentContainer {
#[instrument(skip_all)]
pub async fn new(
ctx: &RpcContext,
s9pk: S9pk,
// desired_state: watch::Receiver<StartStop>,
// temp_desired_state: watch::Receiver<Option<StartStop>>,
) -> Result<Self, Error> {
pub async fn new(ctx: &RpcContext, s9pk: S9pk, start: StartStop) -> Result<Self, Error> {
let lxc_container = ctx.lxc_manager.create(LxcConfig::default()).await?;
let rpc_client = lxc_container.connect_rpc(Some(RPC_CONNECT_TIMEOUT)).await?;
let js_mount = MountGuard::mount(
@@ -156,10 +188,7 @@ impl PersistentContainer {
volumes,
assets,
overlays: Arc::new(Mutex::new(BTreeMap::new())),
current_state: watch::channel(StartStop::Stop).0,
// desired_state,
// temp_desired_state,
running_status: watch::channel(None).0,
state: Arc::new(watch::channel(ServiceState::new(start)).0),
})
}
@@ -280,10 +309,11 @@ impl PersistentContainer {
}
#[instrument(skip_all)]
pub async fn stop(&self, timeout: Option<Duration>) -> Result<(), Error> {
self.execute(ProcedureName::StopMain, Value::Null, timeout)
pub async fn stop(&self) -> Result<Duration, Error> {
let timeout: Option<crate::util::serde::Duration> = self
.execute(ProcedureName::StopMain, Value::Null, None)
.await?;
Ok(())
Ok(timeout.map(|a| *a).unwrap_or(Duration::from_secs(30)))
}
#[instrument(skip_all)]

View File

@@ -1,5 +1,5 @@
use std::ops::Deref;
use std::sync::Arc;
use std::{fmt::Display, ops::Deref};
use futures::{Future, FutureExt};
use tokio::sync::watch;
@@ -8,6 +8,8 @@ use crate::service::start_stop::StartStop;
use crate::util::actor::BackgroundJobs;
use crate::util::future::{CancellationHandle, RemoteCancellable};
use super::persistent_container::ServiceState;
pub mod backup;
pub mod restart;
@@ -23,6 +25,13 @@ pub struct TransitionState {
cancel_handle: CancellationHandle,
kind: TransitionKind,
}
impl ::std::fmt::Debug for TransitionState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TransitionState")
.field("kind", &self.kind)
.finish_non_exhaustive()
}
}
impl TransitionState {
pub fn kind(&self) -> TransitionKind {
@@ -52,23 +61,28 @@ impl Drop for TransitionState {
}
#[derive(Debug, Clone)]
pub struct TempDesiredState(pub(super) Arc<watch::Sender<Option<StartStop>>>);
pub struct TempDesiredState(pub(super) Arc<watch::Sender<ServiceState>>);
impl TempDesiredState {
pub fn new(state: &Arc<watch::Sender<ServiceState>>) -> Self {
Self(state.clone())
}
pub fn stop(&self) {
self.0.send_replace(Some(StartStop::Stop));
self.0
.send_modify(|s| s.temp_desired_state = Some(StartStop::Stop));
}
pub fn start(&self) {
self.0.send_replace(Some(StartStop::Start));
self.0
.send_modify(|s| s.temp_desired_state = Some(StartStop::Start));
}
}
impl Drop for TempDesiredState {
fn drop(&mut self) {
self.0.send_replace(None);
}
}
impl Deref for TempDesiredState {
type Target = watch::Sender<Option<StartStop>>;
fn deref(&self) -> &Self::Target {
&*self.0
self.0.send_modify(|s| s.temp_desired_state = None);
}
}
// impl Deref for TempDesiredState {
// type Target = watch::Sender<Option<StartStop>>;
// fn deref(&self) -> &Self::Target {
// &*self.0
// }
// }

View File

@@ -1,32 +1,45 @@
use std::sync::Arc;
use futures::FutureExt;
use crate::prelude::*;
use crate::service::start_stop::StartStop;
use crate::service::transition::{TransitionKind, TransitionState};
use crate::service::{Service, ServiceActor};
use crate::util::actor::{BackgroundJobs, Handler};
use crate::util::future::RemoteCancellable;
use super::TempDesiredState;
struct Restart;
#[async_trait::async_trait]
impl Handler<Restart> for ServiceActor {
type Response = ();
async fn handle(&mut self, _: Restart, jobs: &mut BackgroundJobs) -> Self::Response {
let temp = self.0.temp_desired_state.clone();
let mut current = self.0.persistent_container.current_state.subscribe();
// So Need a handle to just a single field in the state
let temp = TempDesiredState::new(&self.0.persistent_container.state);
let mut current = self.0.persistent_container.state.subscribe();
let transition = RemoteCancellable::new(async move {
temp.stop();
current.wait_for(|s| *s == StartStop::Stop).await;
current.wait_for(|s| s.running_status.is_none()).await;
temp.start();
current.wait_for(|s| *s == StartStop::Start).await;
current.wait_for(|s| s.running_status.is_some()).await;
drop(temp);
});
let cancel_handle = transition.cancellation_handle();
jobs.add_job(transition.map(|_| ()));
let notified = self.0.synchronized.notified();
if let Some(t) = self.0.transition_state.send_replace(Some(TransitionState {
kind: TransitionKind::Restarting,
cancel_handle,
})) {
let mut old = None;
self.0.persistent_container.state.send_modify(|s| {
old = std::mem::replace(
&mut s.transition_state,
Some(TransitionState {
kind: TransitionKind::Restarting,
cancel_handle,
}),
)
});
if let Some(t) = old {
t.abort().await;
}
notified.await