fix: shutdown order (#3073)

* fix: race condition in Daemon.stop()

* fix: do not stop Daemon on context leave

* fix: remove duplicate Daemons.term calls

* feat: honor dependency order when shutting terminating Daemons

* fixes, and remove started

---------

Co-authored-by: Aiden McClelland <me@drbonez.dev>
This commit is contained in:
Remco Ros
2025-12-15 23:21:23 +01:00
committed by GitHub
parent 0430e0f930
commit 9c43c43a46
14 changed files with 131 additions and 100 deletions

View File

@@ -289,6 +289,7 @@ export function makeEffects(context: EffectContext): Effects {
getStatus(...[o]: Parameters<T.Effects["getStatus"]>) { getStatus(...[o]: Parameters<T.Effects["getStatus"]>) {
return rpcRound("get-status", o) as ReturnType<T.Effects["getStatus"]> return rpcRound("get-status", o) as ReturnType<T.Effects["getStatus"]>
}, },
/// DEPRECATED
setMainStatus(o: { status: "running" | "stopped" }): Promise<null> { setMainStatus(o: { status: "running" | "stopped" }): Promise<null> {
return rpcRound("set-main-status", o) as ReturnType< return rpcRound("set-main-status", o) as ReturnType<
T.Effects["setHealth"] T.Effects["setHealth"]

View File

@@ -292,10 +292,13 @@ export class RpcListener {
) )
}) })
.when(stopType, async ({ id }) => { .when(stopType, async ({ id }) => {
this.callbacks?.removeChild("main")
return handleRpc( return handleRpc(
id, id,
this.system.stop().then((result) => ({ result })), this.system.stop().then((result) => {
this.callbacks?.removeChild("main")
return { result }
}),
) )
}) })
.when(exitType, async ({ id, params }) => { .when(exitType, async ({ id, params }) => {

View File

@@ -70,20 +70,13 @@ export class SystemForStartOs implements System {
this.starting = true this.starting = true
effects.constRetry = utils.once(() => effects.restart()) effects.constRetry = utils.once(() => effects.restart())
let mainOnTerm: () => Promise<void> | undefined let mainOnTerm: () => Promise<void> | undefined
const started = async (onTerm: () => Promise<void>) => {
await effects.setMainStatus({ status: "running" })
mainOnTerm = onTerm
return null
}
const daemons = await ( const daemons = await (
await this.abi.main({ await this.abi.main({
effects, effects,
started,
}) })
).build() ).build()
this.runningMain = { this.runningMain = {
stop: async () => { stop: async () => {
if (mainOnTerm) await mainOnTerm()
await daemons.term() await daemons.term()
}, },
} }

View File

@@ -53,16 +53,22 @@ pub fn kill_init(procfs: &Path, chroot: &Path) -> Result<(), Error> {
) )
})?; })?;
if pids.0.len() == 2 && pids.0[1] == 1 { if pids.0.len() == 2 && pids.0[1] == 1 {
nix::sys::signal::kill(Pid::from_raw(pid), nix::sys::signal::SIGKILL) match nix::sys::signal::kill(
.with_ctx(|_| { Pid::from_raw(pid),
( Some(nix::sys::signal::SIGKILL),
ErrorKind::Filesystem, ) {
lazy_format!( Err(Errno::ESRCH) => Ok(()),
"kill pid {} (determined to be pid 1 in subcontainer)", a => a,
pid }
), .with_ctx(|_| {
) (
})?; ErrorKind::Filesystem,
lazy_format!(
"kill pid {} (determined to be pid 1 in subcontainer)",
pid
),
)
})?;
} }
} }
} }
@@ -510,10 +516,13 @@ pub fn exec(
std::thread::spawn(move || { std::thread::spawn(move || {
if let Ok(pid) = recv_pid.blocking_recv() { if let Ok(pid) = recv_pid.blocking_recv() {
for sig in sig.forever() { for sig in sig.forever() {
nix::sys::signal::kill( match nix::sys::signal::kill(
Pid::from_raw(pid), Pid::from_raw(pid),
Some(nix::sys::signal::Signal::try_from(sig).unwrap()), Some(nix::sys::signal::Signal::try_from(sig).unwrap()),
) ) {
Err(Errno::ESRCH) => Ok(()),
a => a,
}
.unwrap(); .unwrap();
} }
} }

View File

@@ -29,7 +29,25 @@ impl ServiceActorSeed {
pub fn start(&self) -> Transition<'_> { pub fn start(&self) -> Transition<'_> {
Transition { Transition {
kind: TransitionKind::Starting, kind: TransitionKind::Starting,
future: self.persistent_container.start().boxed(), future: async {
self.persistent_container.start().await?;
let id = &self.id;
self.ctx
.db
.mutate(|db| {
db.as_public_mut()
.as_package_data_mut()
.as_idx_mut(id)
.or_not_found(id)?
.as_status_info_mut()
.started()
})
.await
.result?;
Ok(())
}
.boxed(),
} }
} }
@@ -47,8 +65,7 @@ impl ServiceActorSeed {
.as_idx_mut(id) .as_idx_mut(id)
.or_not_found(id)? .or_not_found(id)?
.as_status_info_mut() .as_status_info_mut()
.as_started_mut() .stopped()
.ser(&None)
}) })
.await .await
.result?; .result?;

View File

@@ -28,11 +28,24 @@ impl StatusInfo {
} }
} }
impl Model<StatusInfo> { impl Model<StatusInfo> {
pub fn start(&mut self) -> Result<(), Error> {
self.as_desired_mut().map_mutate(|s| Ok(s.start()))?;
Ok(())
}
pub fn started(&mut self) -> Result<(), Error> {
self.as_started_mut()
.map_mutate(|s| Ok(Some(s.unwrap_or_else(|| Utc::now()))))?;
Ok(())
}
pub fn stop(&mut self) -> Result<(), Error> { pub fn stop(&mut self) -> Result<(), Error> {
self.as_desired_mut().map_mutate(|s| Ok(s.stop()))?; self.as_desired_mut().map_mutate(|s| Ok(s.stop()))?;
self.as_health_mut().ser(&Default::default())?; self.as_health_mut().ser(&Default::default())?;
Ok(()) Ok(())
} }
pub fn stopped(&mut self) -> Result<(), Error> {
self.as_started_mut().ser(&None)?;
Ok(())
}
pub fn init(&mut self) -> Result<(), Error> { pub fn init(&mut self) -> Result<(), Error> {
self.as_started_mut().ser(&None)?; self.as_started_mut().ser(&None)?;
self.as_desired_mut().map_mutate(|s| { self.as_desired_mut().map_mutate(|s| {

View File

@@ -67,7 +67,7 @@ export type Effects = {
packageId?: PackageId packageId?: PackageId
callback?: () => void callback?: () => void
}): Promise<StatusInfo> }): Promise<StatusInfo>
/** indicate to the host os what runstate the service is in */ /** DEPRECATED: indicate to the host os what runstate the service is in */
setMainStatus(options: SetMainStatus): Promise<null> setMainStatus(options: SetMainStatus): Promise<null>
// dependency // dependency

View File

@@ -44,10 +44,7 @@ export namespace ExpectedExports {
* This is the entrypoint for the main container. Used to start up something like the service that the * This is the entrypoint for the main container. Used to start up something like the service that the
* package represents, like running a bitcoind in a bitcoind-wrapper. * package represents, like running a bitcoind in a bitcoind-wrapper.
*/ */
export type main = (options: { export type main = (options: { effects: Effects }) => Promise<DaemonBuildable>
effects: Effects
started(onTerm: () => PromiseLike<void>): PromiseLike<null>
}) => Promise<DaemonBuildable>
/** /**
* Every time a service launches (both on startup, and on install) this function is called before packageInit * Every time a service launches (both on startup, and on install) this function is called before packageInit

View File

@@ -634,10 +634,7 @@ export class StartSdk<Manifest extends T.SDKManifest> {
*/ */
setupInterfaces: setupServiceInterfaces, setupInterfaces: setupServiceInterfaces,
setupMain: ( setupMain: (
fn: (o: { fn: (o: { effects: Effects }) => Promise<Daemons<Manifest, any>>,
effects: Effects
started(onTerm: () => PromiseLike<void>): PromiseLike<null>
}) => Promise<Daemons<Manifest, any>>,
) => setupMain<Manifest>(fn), ) => setupMain<Manifest>(fn),
trigger: { trigger: {
defaultTrigger, defaultTrigger,
@@ -690,13 +687,8 @@ export class StartSdk<Manifest extends T.SDKManifest> {
}, },
}, },
Daemons: { Daemons: {
of( of(effects: Effects) {
effects: Effects, return Daemons.of<Manifest>({ effects })
started:
| ((onTerm: () => PromiseLike<void>) => PromiseLike<null>)
| null,
) {
return Daemons.of<Manifest>({ effects, started })
}, },
}, },
SubContainer: { SubContainer: {

View File

@@ -24,6 +24,7 @@ export class Daemon<
private commandController: CommandController<Manifest, C> | null = null private commandController: CommandController<Manifest, C> | null = null
private shouldBeRunning = false private shouldBeRunning = false
protected exitedSuccess = false protected exitedSuccess = false
private exiting: Promise<void> | null = null
private onExitFns: ((success: boolean) => void)[] = [] private onExitFns: ((success: boolean) => void)[] = []
protected constructor( protected constructor(
private subcontainer: C, private subcontainer: C,
@@ -36,7 +37,7 @@ export class Daemon<
return this.oneshot return this.oneshot
} }
static of<Manifest extends T.SDKManifest>() { static of<Manifest extends T.SDKManifest>() {
return async <C extends SubContainer<Manifest> | null>( return <C extends SubContainer<Manifest> | null>(
effects: T.Effects, effects: T.Effects,
subcontainer: C, subcontainer: C,
exec: DaemonCommandType<Manifest, C>, exec: DaemonCommandType<Manifest, C>,
@@ -51,9 +52,7 @@ export class Daemon<
) )
const res = new Daemon(subc, startCommand) const res = new Daemon(subc, startCommand)
effects.onLeaveContext(() => { effects.onLeaveContext(() => {
res res.term({ destroySubcontainer: true }).catch((e) => console.error(e))
.term({ destroySubcontainer: true })
.catch((e) => console.error(asError(e)))
}) })
return res return res
} }
@@ -114,19 +113,26 @@ export class Daemon<
this.shouldBeRunning = false this.shouldBeRunning = false
this.exitedSuccess = false this.exitedSuccess = false
if (this.commandController) { if (this.commandController) {
await this.commandController this.exiting = this.commandController.term({ ...termOptions })
.term({ ...termOptions })
.catch((e) => console.error(asError(e)))
this.commandController = null this.commandController = null
this.onExitFns = [] this.onExitFns = []
}
if (this.exiting) {
await this.exiting.catch(console.error)
if (termOptions?.destroySubcontainer) { if (termOptions?.destroySubcontainer) {
await this.subcontainer?.destroy() await this.subcontainer?.destroy()
} }
this.exiting = null
} }
} }
subcontainerRc(): SubContainerRc<Manifest> | null { subcontainerRc(): SubContainerRc<Manifest> | null {
return this.subcontainer?.rc() ?? null return this.subcontainer?.rc() ?? null
} }
sharesSubcontainerWith(
other: Daemon<Manifest, SubContainer<Manifest> | null>,
): boolean {
return this.subcontainer?.guid === other.subcontainer?.guid
}
onExit(fn: (success: boolean) => void) { onExit(fn: (success: boolean) => void) {
this.onExitFns.push(fn) this.onExitFns.push(fn)
} }

View File

@@ -161,9 +161,6 @@ export class Daemons<Manifest extends T.SDKManifest, Ids extends string>
{ {
private constructor( private constructor(
readonly effects: T.Effects, readonly effects: T.Effects,
readonly started:
| ((onTerm: () => PromiseLike<void>) => PromiseLike<null>)
| null,
readonly ids: Ids[], readonly ids: Ids[],
readonly healthDaemons: HealthDaemon<Manifest>[], readonly healthDaemons: HealthDaemon<Manifest>[],
) {} ) {}
@@ -180,26 +177,13 @@ export class Daemons<Manifest extends T.SDKManifest, Ids extends string>
* @param started * @param started
* @returns * @returns
*/ */
static of<Manifest extends T.SDKManifest>(options: { static of<Manifest extends T.SDKManifest>(options: { effects: T.Effects }) {
effects: T.Effects return new Daemons<Manifest, never>(options.effects, [], [])
/**
* A closure to run once the system is launched. If you are in main, provide the `started` argument you receive from the function arguments
*/
started: ((onTerm: () => PromiseLike<void>) => PromiseLike<null>) | null
}) {
return new Daemons<Manifest, never>(
options.effects,
options.started,
[],
[],
)
} }
private addDaemonImpl<Id extends string>( private addDaemonImpl<Id extends string>(
id: Id, id: Id,
daemon: Promise< daemon: Daemon<Manifest, SubContainer<Manifest, T.Effects> | null> | null,
Daemon<Manifest, SubContainer<Manifest, T.Effects> | null>
> | null,
requires: Ids[], requires: Ids[],
ready: Ready | typeof EXIT_SUCCESS, ready: Ready | typeof EXIT_SUCCESS,
) { ) {
@@ -215,12 +199,7 @@ export class Daemons<Manifest extends T.SDKManifest, Ids extends string>
) )
const ids = [...this.ids, id] as (Ids | Id)[] const ids = [...this.ids, id] as (Ids | Id)[]
const healthDaemons = [...this.healthDaemons, healthDaemon] const healthDaemons = [...this.healthDaemons, healthDaemon]
return new Daemons<Manifest, Ids | Id>( return new Daemons<Manifest, Ids | Id>(this.effects, ids, healthDaemons)
this.effects,
this.started,
ids,
healthDaemons,
)
} }
/** /**
@@ -256,7 +235,7 @@ export class Daemons<Manifest extends T.SDKManifest, Ids extends string>
if (!options) return prev if (!options) return prev
const daemon = const daemon =
"daemon" in options "daemon" in options
? Promise.resolve(options.daemon) ? options.daemon
: Daemon.of<Manifest>()<C>( : Daemon.of<Manifest>()<C>(
this.effects, this.effects,
options.subcontainer, options.subcontainer,
@@ -397,12 +376,10 @@ export class Daemons<Manifest extends T.SDKManifest, Ids extends string>
"EXIT_SUCCESS", "EXIT_SUCCESS",
this.effects, this.effects,
) )
const daemons = await new Daemons<Manifest, Ids>( const daemons = await new Daemons<Manifest, Ids>(this.effects, this.ids, [
this.effects, ...this.healthDaemons,
this.started, healthDaemon,
this.ids, ]).build()
[...this.healthDaemons, healthDaemon],
).build()
try { try {
await res await res
} finally { } finally {
@@ -412,23 +389,51 @@ export class Daemons<Manifest extends T.SDKManifest, Ids extends string>
} }
async term() { async term() {
for (let result of await Promise.allSettled( const remaining = new Set(this.healthDaemons)
this.healthDaemons.map((x) => x.term({ destroySubcontainer: true })),
)) { while (remaining.size > 0) {
if (result.status === "rejected") { // Find daemons with no remaining dependents
console.error(result.reason) const canShutdown = [...remaining].filter(
(daemon) =>
![...remaining].some((other) =>
other.dependencies.some((dep) => dep.id === daemon.id),
),
)
if (canShutdown.length === 0) {
// Dependency cycle that should not happen, just shutdown remaining daemons
console.warn(
"Dependency cycle detected, shutting down remaining daemons",
)
canShutdown.push(...[...remaining].reverse())
} }
// remove from remaining set
canShutdown.forEach((daemon) => remaining.delete(daemon))
// Shutdown daemons with no remaining dependents concurrently
await Promise.allSettled(
canShutdown.map(async (daemon) => {
try {
console.debug(`Terminating daemon ${daemon.id}`)
const destroySubcontainer = daemon.daemon
? ![...remaining].some((d) =>
d.daemon?.sharesSubcontainerWith(daemon.daemon!),
)
: false
await daemon.term({ destroySubcontainer })
} catch (e) {
console.error(e)
}
}),
)
} }
} }
async build() { async build() {
this.effects.onLeaveContext(() => {
this.term().catch((e) => console.error(asError(e)))
})
for (const daemon of this.healthDaemons) { for (const daemon of this.healthDaemons) {
await daemon.init() await daemon.init()
} }
this.started?.(() => this.term())
return this return this
} }
} }

View File

@@ -34,8 +34,8 @@ export class HealthDaemon<Manifest extends SDKManifest> {
private resolvedReady: boolean = false private resolvedReady: boolean = false
private readyPromise: Promise<void> private readyPromise: Promise<void>
constructor( constructor(
private readonly daemon: Promise<Daemon<Manifest>> | null, readonly daemon: Daemon<Manifest> | null,
private readonly dependencies: HealthDaemon<Manifest>[], readonly dependencies: HealthDaemon<Manifest>[],
readonly id: string, readonly id: string,
readonly ready: Ready | typeof EXIT_SUCCESS, readonly ready: Ready | typeof EXIT_SUCCESS,
readonly effects: Effects, readonly effects: Effects,
@@ -60,11 +60,9 @@ export class HealthDaemon<Manifest extends SDKManifest> {
this.running = false this.running = false
this.healthCheckCleanup?.() this.healthCheckCleanup?.()
await this.daemon?.then((d) => await this.daemon?.term({
d.term({ ...termOptions,
...termOptions, })
}),
)
} }
/** Want to add another notifier that the health might have changed */ /** Want to add another notifier that the health might have changed */

View File

@@ -15,7 +15,7 @@ export class Oneshot<
C extends SubContainer<Manifest> | null = SubContainer<Manifest> | null, C extends SubContainer<Manifest> | null = SubContainer<Manifest> | null,
> extends Daemon<Manifest, C> { > extends Daemon<Manifest, C> {
static of<Manifest extends T.SDKManifest>() { static of<Manifest extends T.SDKManifest>() {
return async <C extends SubContainer<Manifest> | null>( return <C extends SubContainer<Manifest> | null>(
effects: T.Effects, effects: T.Effects,
subcontainer: C, subcontainer: C,
exec: DaemonCommandType<Manifest, C>, exec: DaemonCommandType<Manifest, C>,

View File

@@ -15,10 +15,7 @@ export const DEFAULT_SIGTERM_TIMEOUT = 60_000
* @returns * @returns
*/ */
export const setupMain = <Manifest extends T.SDKManifest>( export const setupMain = <Manifest extends T.SDKManifest>(
fn: (o: { fn: (o: { effects: T.Effects }) => Promise<Daemons<Manifest, any>>,
effects: T.Effects
started(onTerm: () => PromiseLike<void>): PromiseLike<null>
}) => Promise<Daemons<Manifest, any>>,
): T.ExpectedExports.main => { ): T.ExpectedExports.main => {
return async (options) => { return async (options) => {
const result = await fn(options) const result = await fn(options)