Refactor/networking (#2189)

* refactor networking and account

* add interfaces from manifest automatically

* use nistp256 to satisfy firefox

* use ed25519 if available

* fix ip signing

* fix SQL error

* update prettytable to fix segfault

* fix migration

* fix migration

* bump welcome-ack

* add redirect if connecting to https over http

* misc rebase fixes

* fix compression

* bump rustc version
This commit is contained in:
Aiden McClelland
2023-03-08 19:30:46 -07:00
committed by GitHub
parent da55d6f7cd
commit bbb9980941
79 changed files with 3577 additions and 3587 deletions

View File

@@ -12,23 +12,22 @@ use embassy_container_init::{ProcessGroupId, SignalGroupParams};
use helpers::UnixRpcClient;
use nix::sys::signal::Signal;
use patch_db::DbHandle;
use sqlx::{Executor, Postgres};
use sqlx::{Connection, Executor, Postgres};
use tokio::sync::watch::error::RecvError;
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::sync::{oneshot, Notify, RwLock};
use torut::onion::TorSecretKeyV3;
use tracing::instrument;
use crate::context::RpcContext;
use crate::manager::sync::synchronizer;
use crate::net::interface::InterfaceId;
use crate::net::GeneratedCertificateMountPoint;
use crate::net::net_controller::NetService;
use crate::procedure::docker::{DockerContainer, DockerProcedure, LongRunning};
#[cfg(feature = "js_engine")]
use crate::procedure::js_scripts::JsProcedure;
use crate::procedure::{NoOutput, PackageProcedure, ProcedureName};
use crate::s9pk::manifest::{Manifest, PackageId};
use crate::util::{ApplyRef, Container, NonDetachingJoinHandle, Version};
use crate::volume::Volume;
use crate::Error;
pub mod health;
@@ -70,10 +69,9 @@ impl ManagerMap {
continue;
};
let tor_keys = man.interfaces.tor_keys(secrets, &package).await?;
res.insert(
(package, man.version.clone()),
Arc::new(Manager::create(ctx.clone(), man, tor_keys).await?),
Arc::new(Manager::create(ctx.clone(), man).await?),
);
}
*self.0.write().await = res;
@@ -81,12 +79,7 @@ impl ManagerMap {
}
#[instrument(skip(self, ctx))]
pub async fn add(
&self,
ctx: RpcContext,
manifest: Manifest,
tor_keys: BTreeMap<InterfaceId, TorSecretKeyV3>,
) -> Result<(), Error> {
pub async fn add(&self, ctx: RpcContext, manifest: Manifest) -> Result<(), Error> {
let mut lock = self.0.write().await;
let id = (manifest.id.clone(), manifest.version.clone());
if let Some(man) = lock.remove(&id) {
@@ -94,10 +87,7 @@ impl ManagerMap {
man.exit().await?;
}
}
lock.insert(
id,
Arc::new(Manager::create(ctx, manifest, tor_keys).await?),
);
lock.insert(id, Arc::new(Manager::create(ctx, manifest).await?));
Ok(())
}
@@ -162,7 +152,6 @@ struct ManagerSeed {
ctx: RpcContext,
manifest: Manifest,
container_name: String,
tor_keys: BTreeMap<InterfaceId, TorSecretKeyV3>,
}
pub struct ManagerSharedState {
@@ -190,13 +179,8 @@ async fn run_main(
state: &Arc<ManagerSharedState>,
) -> Result<Result<NoOutput, (i32, String)>, Error> {
let rt_state = state.clone();
let interfaces = main_interfaces(&*state.seed)?;
let generated_certificate = generate_certificate(&*state.seed, &interfaces).await?;
let mut runtime = NonDetachingJoinHandle::from(tokio::spawn(start_up_image(
rt_state,
generated_certificate,
)));
let mut runtime = NonDetachingJoinHandle::from(tokio::spawn(start_up_image(rt_state)));
let ip = match state.persistent_container.is_some() {
false => Some(match get_running_ip(state, &mut runtime).await {
GetRunningIp::Ip(x) => x,
@@ -206,9 +190,11 @@ async fn run_main(
true => None,
};
if let Some(ip) = ip {
add_network_for_main(&*state.seed, ip, interfaces, generated_certificate).await?;
}
let svc = if let Some(ip) = ip {
Some(add_network_for_main(&*state.seed, ip).await?)
} else {
None
};
set_commit_health_true(state);
let health = main_health_check_daemon(state.clone());
@@ -218,8 +204,8 @@ async fn run_main(
_ = health => Err(Error::new(eyre!("Health check daemon exited!"), crate::ErrorKind::Unknown)),
_ = state.killer.notified() => Ok(Err((137, "Killed".to_string())))
};
if let Some(ip) = ip {
remove_network_for_main(&*state.seed, ip).await?;
if let Some(svc) = svc {
remove_network_for_main(svc).await?;
}
res
}
@@ -228,7 +214,6 @@ async fn run_main(
/// Note for _generated_certificate: Needed to know that before we start the state we have generated the certificate
async fn start_up_image(
rt_state: Arc<ManagerSharedState>,
_generated_certificate: GeneratedCertificateMountPoint,
) -> Result<Result<NoOutput, (i32, String)>, Error> {
rt_state
.seed
@@ -248,17 +233,12 @@ async fn start_up_image(
impl Manager {
#[instrument(skip(ctx))]
async fn create(
ctx: RpcContext,
manifest: Manifest,
tor_keys: BTreeMap<InterfaceId, TorSecretKeyV3>,
) -> Result<Self, Error> {
async fn create(ctx: RpcContext, manifest: Manifest) -> Result<Self, Error> {
let (on_stop, recv) = channel(OnStop::Sleep);
let seed = Arc::new(ManagerSeed {
ctx,
container_name: DockerProcedure::container_name(&manifest.id, None),
manifest,
tor_keys,
});
let persistent_container = PersistentContainer::init(&seed).await?;
let shared = Arc::new(ManagerSharedState {
@@ -479,8 +459,6 @@ async fn spawn_persistent_container(
let mut send_inserter: Option<oneshot::Sender<Receiver<Arc<UnixRpcClient>>>> = Some(send_inserter);
loop {
if let Err(e) = async {
let interfaces = main_interfaces(&*seed)?;
let generated_certificate = generate_certificate(&*seed, &interfaces).await?;
let (mut runtime, inserter) =
long_running_docker(&seed, &container).await?;
@@ -493,7 +471,7 @@ async fn spawn_persistent_container(
return Ok(());
}
};
add_network_for_main(&*seed, ip, interfaces, generated_certificate).await?;
let svc = add_network_for_main(&*seed, ip).await?;
if let Some(inserter_send) = inserter_send.as_mut() {
let _ = inserter_send.send(Arc::new(inserter));
@@ -509,7 +487,7 @@ async fn spawn_persistent_container(
a = runtime.running_output => a.map_err(|_| Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)).map(|_| ()),
};
remove_network_for_main(&*seed, ip).await?;
remove_network_for_main(svc).await?;
res
}.await {
@@ -540,16 +518,8 @@ async fn long_running_docker(
.await
}
async fn remove_network_for_main(seed: &ManagerSeed, ip: std::net::Ipv4Addr) -> Result<(), Error> {
seed.ctx
.net_controller
.remove(
&seed.manifest.id,
ip,
seed.manifest.interfaces.0.keys().cloned(),
)
.await?;
Ok(())
async fn remove_network_for_main(svc: NetService) -> Result<(), Error> {
svc.remove_all().await
}
fn fetch_starting_to_running(state: &Arc<ManagerSharedState>) {
@@ -592,18 +562,32 @@ fn set_commit_health_true(state: &Arc<ManagerSharedState>) {
async fn add_network_for_main(
seed: &ManagerSeed,
ip: std::net::Ipv4Addr,
interfaces: Vec<(
InterfaceId,
&crate::net::interface::Interface,
TorSecretKeyV3,
)>,
generated_certificate: GeneratedCertificateMountPoint,
) -> Result<(), Error> {
seed.ctx
) -> Result<NetService, Error> {
let mut svc = seed
.ctx
.net_controller
.add(&seed.manifest.id, ip, interfaces, generated_certificate)
.create_service(seed.manifest.id.clone(), ip)
.await?;
Ok(())
// DEPRECATED
let mut secrets = seed.ctx.secret_store.acquire().await?;
let mut tx = secrets.begin().await?;
for (id, interface) in &seed.manifest.interfaces.0 {
for (external, internal) in interface.lan_config.iter().flatten() {
svc.add_lan(&mut tx, id.clone(), external.0, internal.internal, false)
.await?;
}
for (external, internal) in interface.tor_config.iter().flat_map(|t| &t.port_mapping) {
svc.add_tor(&mut tx, id.clone(), external.0, internal.0)
.await?;
}
}
for volume in seed.manifest.volumes.values() {
if let Volume::Certificate { interface_id } = volume {
svc.export_cert(&mut tx, interface_id, ip.into()).await?;
}
}
tx.commit().await?;
Ok(svc)
}
enum GetRunningIp {
@@ -716,49 +700,6 @@ async fn container_inspect(
.await
}
async fn generate_certificate(
seed: &ManagerSeed,
interfaces: &Vec<(
InterfaceId,
&crate::net::interface::Interface,
TorSecretKeyV3,
)>,
) -> Result<GeneratedCertificateMountPoint, Error> {
seed.ctx
.net_controller
.generate_certificate_mountpoint(&seed.manifest.id, interfaces)
.await
}
fn main_interfaces(
seed: &ManagerSeed,
) -> Result<
Vec<(
InterfaceId,
&crate::net::interface::Interface,
TorSecretKeyV3,
)>,
Error,
> {
seed.manifest
.interfaces
.0
.iter()
.map(|(id, info)| {
Ok((
id.clone(),
info,
seed.tor_keys
.get(id)
.ok_or_else(|| {
Error::new(eyre!("interface {} missing key", id), crate::ErrorKind::Tor)
})?
.clone(),
))
})
.collect::<Result<Vec<_>, Error>>()
}
async fn wait_for_status(shared: &ManagerSharedState, status: Status) {
let mut recv = shared.status.0.subscribe();
while {