mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-03-31 04:23:40 +00:00
fix race condition
This commit is contained in:
@@ -43,6 +43,8 @@ const RPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
|||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ServiceState {
|
pub struct ServiceState {
|
||||||
|
// indicates whether the service container runtime has been initialized yet
|
||||||
|
pub(super) rt_initialized: bool,
|
||||||
// This contains the start time and health check information for when the service is running. Note: Will be overwritting to the db,
|
// This contains the start time and health check information for when the service is running. Note: Will be overwritting to the db,
|
||||||
pub(super) running_status: Option<RunningStatus>,
|
pub(super) running_status: Option<RunningStatus>,
|
||||||
// This tracks references to callbacks registered by the running service:
|
// This tracks references to callbacks registered by the running service:
|
||||||
@@ -65,6 +67,7 @@ pub struct ServiceStateKinds {
|
|||||||
impl ServiceState {
|
impl ServiceState {
|
||||||
pub fn new(desired_state: StartStop) -> Self {
|
pub fn new(desired_state: StartStop) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
rt_initialized: false,
|
||||||
running_status: Default::default(),
|
running_status: Default::default(),
|
||||||
callbacks: Default::default(),
|
callbacks: Default::default(),
|
||||||
temp_desired_state: Default::default(),
|
temp_desired_state: Default::default(),
|
||||||
@@ -369,6 +372,8 @@ impl PersistentContainer {
|
|||||||
|
|
||||||
self.rpc_client.request(rpc::Init, Empty {}).await?;
|
self.rpc_client.request(rpc::Init, Empty {}).await?;
|
||||||
|
|
||||||
|
self.state.send_modify(|s| s.rt_initialized = true);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,10 +2,9 @@ use std::sync::Arc;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use imbl::OrdMap;
|
use imbl::OrdMap;
|
||||||
use models::PackageId;
|
|
||||||
|
|
||||||
use super::start_stop::StartStop;
|
use super::start_stop::StartStop;
|
||||||
|
use super::ServiceActorSeed;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use crate::service::transition::TransitionKind;
|
use crate::service::transition::TransitionKind;
|
||||||
use crate::service::SYNC_RETRY_COOLDOWN_SECONDS;
|
use crate::service::SYNC_RETRY_COOLDOWN_SECONDS;
|
||||||
@@ -13,8 +12,6 @@ use crate::status::MainStatus;
|
|||||||
use crate::util::actor::background::BackgroundJobQueue;
|
use crate::util::actor::background::BackgroundJobQueue;
|
||||||
use crate::util::actor::Actor;
|
use crate::util::actor::Actor;
|
||||||
|
|
||||||
use super::ServiceActorSeed;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub(super) struct ServiceActor(pub(super) Arc<ServiceActorSeed>);
|
pub(super) struct ServiceActor(pub(super) Arc<ServiceActorSeed>);
|
||||||
|
|
||||||
@@ -26,12 +23,12 @@ enum ServiceActorLoopNext {
|
|||||||
impl Actor for ServiceActor {
|
impl Actor for ServiceActor {
|
||||||
fn init(&mut self, jobs: &BackgroundJobQueue) {
|
fn init(&mut self, jobs: &BackgroundJobQueue) {
|
||||||
let seed = self.0.clone();
|
let seed = self.0.clone();
|
||||||
|
let mut current = seed.persistent_container.state.subscribe();
|
||||||
jobs.add_job(async move {
|
jobs.add_job(async move {
|
||||||
let id = seed.id.clone();
|
let _ = current.wait_for(|s| s.rt_initialized).await;
|
||||||
let mut current = seed.persistent_container.state.subscribe();
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match service_actor_loop(¤t, &seed, &id).await {
|
match service_actor_loop(¤t, &seed).await {
|
||||||
ServiceActorLoopNext::Wait => tokio::select! {
|
ServiceActorLoopNext::Wait => tokio::select! {
|
||||||
_ = current.changed() => (),
|
_ = current.changed() => (),
|
||||||
},
|
},
|
||||||
@@ -45,8 +42,8 @@ impl Actor for ServiceActor {
|
|||||||
async fn service_actor_loop(
|
async fn service_actor_loop(
|
||||||
current: &tokio::sync::watch::Receiver<super::persistent_container::ServiceState>,
|
current: &tokio::sync::watch::Receiver<super::persistent_container::ServiceState>,
|
||||||
seed: &Arc<ServiceActorSeed>,
|
seed: &Arc<ServiceActorSeed>,
|
||||||
id: &PackageId,
|
|
||||||
) -> ServiceActorLoopNext {
|
) -> ServiceActorLoopNext {
|
||||||
|
let id = &seed.id;
|
||||||
let kinds = current.borrow().kinds();
|
let kinds = current.borrow().kinds();
|
||||||
if let Err(e) = async {
|
if let Err(e) = async {
|
||||||
let main_status = match (
|
let main_status = match (
|
||||||
|
|||||||
Reference in New Issue
Block a user