switch to managers: wip 2

This commit is contained in:
Aiden McClelland
2021-07-19 11:38:04 -06:00
committed by Aiden McClelland
parent 34e4c12af3
commit a14820087d
10 changed files with 315 additions and 210 deletions

View File

@@ -1,12 +1,12 @@
use std::collections::HashMap;
use std::future::Future;
use std::net::Ipv4Addr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use anyhow::anyhow;
use bollard::container::StopContainerOptions;
use bollard::Docker;
use patch_db::{DbHandle, PatchDbHandle};
use sqlx::{Executor, Sqlite};
@@ -14,17 +14,13 @@ use tokio::sync::watch::error::RecvError;
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::WatchStream;
use torut::onion::TorSecretKeyV3;
use crate::action::docker::DockerAction;
use crate::context::RpcContext;
use crate::net::interface::InterfaceId;
use crate::net::mdns::MdnsController;
use crate::net::tor::TorController;
use crate::net::NetController;
use crate::s9pk::manifest::{Manifest, PackageId};
use crate::util::Version;
use crate::util::{Container, Version};
use crate::{Error, ResultExt};
pub struct ManagerMap(RwLock<HashMap<(PackageId, Version), Arc<Manager>>>);
@@ -49,8 +45,10 @@ impl ManagerMap {
) -> Result<(), Error> {
let mut lock = self.0.write().await;
let id = (manifest.id.clone(), manifest.version.clone());
if lock.contains_key(&id) {
return Ok(());
if let Some(man) = lock.get(&id) {
if !man.thread.is_empty().await {
return Ok(());
}
}
lock.insert(
id,
@@ -60,7 +58,11 @@ impl ManagerMap {
}
pub async fn remove(&self, id: &(PackageId, Version)) {
self.0.write().await.remove(id);
if let Some(man) = self.0.write().await.remove(id) {
if let Err(e) = man.exit().await {
log::error!("Error shutting down manager: {}", e);
}
}
}
pub async fn get(&self, id: &(PackageId, Version)) -> Option<Arc<Manager>> {
@@ -69,8 +71,24 @@ impl ManagerMap {
}
pub struct Manager {
shared: Arc<ManagerSharedState>,
thread: Container<JoinHandle<()>>,
}
pub enum Status {
Running = 0,
Stopped = 1,
Paused = 2,
}
struct ManagerSharedState {
status: AtomicUsize,
on_stop: Sender<OnStop>,
thread: JoinHandle<()>,
docker: Docker,
net_ctl: Arc<NetController>,
manifest: Manifest,
container_name: String,
tor_keys: HashMap<InterfaceId, TorSecretKeyV3>,
}
#[derive(Clone, Copy)]
@@ -80,21 +98,17 @@ pub enum OnStop {
Exit,
}
async fn run_main(
docker: &Docker,
net_ctl: &NetController,
manifest: &Manifest,
tor_keys: &HashMap<InterfaceId, TorSecretKeyV3>,
) -> Result<Result<(), (i32, String)>, Error> {
let rt_manifest = manifest.clone();
async fn run_main(state: &Arc<ManagerSharedState>) -> Result<Result<(), (i32, String)>, Error> {
let rt_state = state.clone();
let mut runtime = tokio::spawn(async move {
rt_manifest
rt_state
.manifest
.main
.execute::<(), ()>(
&rt_manifest.id,
&rt_manifest.version,
&rt_state.manifest.id,
&rt_state.manifest.version,
None,
&rt_manifest.volumes,
&rt_state.manifest.volumes,
None,
false,
)
@@ -102,8 +116,9 @@ async fn run_main(
});
let mut ip = None::<Ipv4Addr>;
loop {
match docker
.inspect_container(&DockerAction::container_name(&manifest.id, None), None)
match state
.docker
.inspect_container(&state.container_name, None)
.await
{
Ok(res) => {
@@ -140,11 +155,13 @@ async fn run_main(
)
})?;
net_ctl
state
.net_ctl
.add(
&manifest.id,
&state.manifest.id,
ip,
manifest
state
.manifest
.interfaces
.0
.iter()
@@ -152,7 +169,8 @@ async fn run_main(
Ok((
id.clone(),
info,
tor_keys
state
.tor_keys
.get(id)
.ok_or_else(|| {
Error::new(
@@ -175,7 +193,10 @@ async fn run_main(
)
})
.and_then(|a| a);
net_ctl.remove(&manifest.id, manifest.interfaces.0.keys().cloned());
state.net_ctl.remove(
&state.manifest.id,
state.manifest.interfaces.0.keys().cloned(),
);
res
}
@@ -187,6 +208,16 @@ impl Manager {
tor_keys: HashMap<InterfaceId, TorSecretKeyV3>,
) -> Result<Self, Error> {
let (on_stop, mut recv) = channel(OnStop::Sleep);
let shared = Arc::new(ManagerSharedState {
status: AtomicUsize::new(Status::Stopped as usize),
on_stop,
docker,
net_ctl,
container_name: DockerAction::container_name(&manifest.id, None),
manifest,
tor_keys,
});
let thread_shared = shared.clone();
let thread = tokio::spawn(async move {
loop {
fn handle_stop_action<'a>(
@@ -205,17 +236,36 @@ impl Manager {
match stop_action {
OnStop::Sleep => {
if let Some(fut) = fut {
thread_shared.status.store(
Status::Stopped as usize,
std::sync::atomic::Ordering::SeqCst,
);
fut.await.unwrap();
continue;
}
}
OnStop::Exit => {
thread_shared.status.store(
Status::Stopped as usize,
std::sync::atomic::Ordering::SeqCst,
);
break;
}
OnStop::Restart => (),
OnStop::Restart => {
thread_shared.status.store(
Status::Running as usize,
std::sync::atomic::Ordering::SeqCst,
);
}
}
match run_main(&docker, &*net_ctl, &manifest, &tor_keys).await {
Ok(Ok(())) => break,
match run_main(&thread_shared).await {
Ok(Ok(())) => {
thread_shared
.on_stop
.send(OnStop::Sleep)
.map_err(|_| ())
.unwrap(); // recv is still in scope, cannot fail
}
Ok(Err(e)) => {
todo!("application crashed")
}
@@ -225,12 +275,115 @@ impl Manager {
}
}
});
Ok(Manager { on_stop, thread })
Ok(Manager {
shared,
thread: Container::new(Some(thread)),
})
}
}
impl Drop for Manager {
fn drop(&mut self) {
let _ = self.on_stop.send(OnStop::Exit);
pub fn status(&self) -> Status {
match self.shared.status.load(std::sync::atomic::Ordering::SeqCst) {
0 => Status::Running,
1 => Status::Stopped,
2 => Status::Paused,
_ => unreachable!(),
}
}
pub async fn stop(&self) -> Result<(), Error> {
self.shared.on_stop.send(OnStop::Sleep).map_err(|_| {
Error::new(
anyhow!("Manager has already been shutdown"),
crate::ErrorKind::Docker,
)
})?;
if matches!(self.status(), Status::Paused) {
self.resume().await?;
}
match self
.shared
.docker
.stop_container(
&self.shared.container_name,
Some(StopContainerOptions { t: 30 }),
)
.await
{
Err(bollard::errors::Error::DockerResponseNotFoundError { .. })
| Err(bollard::errors::Error::DockerResponseConflictError { .. }) => (), // Already stopped
a => a?,
};
self.shared.status.store(
Status::Stopped as usize,
std::sync::atomic::Ordering::SeqCst,
);
Ok(())
}
pub async fn start(&self) -> Result<(), Error> {
self.shared.on_stop.send(OnStop::Restart).map_err(|_| {
Error::new(
anyhow!("Manager has already been shutdown"),
crate::ErrorKind::Docker,
)
})?;
self.shared.status.store(
Status::Running as usize,
std::sync::atomic::Ordering::SeqCst,
);
Ok(())
}
pub async fn pause(&self) -> Result<(), Error> {
self.shared
.docker
.pause_container(&self.shared.container_name)
.await?;
self.shared
.status
.store(Status::Paused as usize, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
pub async fn resume(&self) -> Result<(), Error> {
self.shared
.docker
.unpause_container(&self.shared.container_name)
.await?;
self.shared.status.store(
Status::Running as usize,
std::sync::atomic::Ordering::SeqCst,
);
Ok(())
}
async fn exit(&self) -> Result<(), Error> {
let _ = self.shared.on_stop.send(OnStop::Exit);
match self
.shared
.docker
.stop_container(
&self.shared.container_name,
Some(StopContainerOptions { t: 30 }),
)
.await
{
Err(bollard::errors::Error::DockerResponseNotFoundError { .. })
| Err(bollard::errors::Error::DockerResponseConflictError { .. }) => (),
a => a?,
};
self.shared.status.store(
Status::Stopped as usize,
std::sync::atomic::Ordering::SeqCst,
);
if let Some(thread) = self.thread.take().await {
thread.await.map_err(|e| {
Error::new(
anyhow!("Manager thread panicked: {}", e),
crate::ErrorKind::Docker,
)
})?;
}
Ok(())
}
}