Bugfix/ssl proxy to ssl (#2956)

* fix registry rm command

* fix bind with addSsl on ssl proto

* fix bind with addSsl on ssl proto

* Add pre-release version migrations

* fix os build

* add mime to package deps

* update lockfile

* more ssl fixes

* add waitFor

* improve restart lockup

* beta.26

* fix dependency health check logic

* handle missing health check

* fix port forwards

---------

Co-authored-by: Aiden McClelland <me@drbonez.dev>
This commit is contained in:
Dominion5254
2025-06-04 19:41:21 -06:00
committed by GitHub
parent 02413a4fac
commit ab6ca8e16a
40 changed files with 1240 additions and 816 deletions

View File

@@ -19,6 +19,7 @@ struct ConcurrentRunner<A> {
waiting: Vec<Request<A>>,
recv: mpsc::UnboundedReceiver<Request<A>>,
handlers: Vec<(
&'static str,
Guid,
Arc<ConflictFn<A>>,
oneshot::Sender<Box<dyn Any + Send>>,
@@ -47,13 +48,26 @@ impl<A: Actor + Clone> Future for ConcurrentRunner<A> {
if this
.handlers
.iter()
.any(|(hid, f, _, _)| &id != hid && f(&*msg))
.any(|(_, hid, f, _, _)| &id != hid && f(&*msg))
{
#[cfg(feature = "unstable")]
{
tracing::debug!("{} must wait...", msg.type_name());
tracing::debug!(
"waiting on {:?}",
this.handlers
.iter()
.filter(|h| h.2(&*msg))
.map(|h| (h.0, &h.1))
.collect::<Vec<_>>()
);
}
this.waiting.push((id, msg, reply));
} else {
let mut actor = this.actor.clone();
let queue = this.queue.clone();
this.handlers.push((
msg.type_name(),
id.clone(),
msg.conflicts_with(),
reply,
@@ -69,15 +83,15 @@ impl<A: Actor + Clone> Future for ConcurrentRunner<A> {
.handlers
.iter_mut()
.enumerate()
.filter_map(|(i, (_, _, _, f))| match f.poll_unpin(cx) {
.filter_map(|(i, (_, _, _, _, f))| match f.poll_unpin(cx) {
std::task::Poll::Pending => None,
std::task::Poll::Ready(res) => Some((i, res)),
})
.collect::<Vec<_>>();
for (idx, res) in complete.into_iter().rev() {
#[allow(clippy::let_underscore_future)]
let (_, f, reply, _) = this.handlers.swap_remove(idx);
let _ = reply.send(res);
let (_, _, f, reply, _) = this.handlers.swap_remove(idx);
reply.send(res).ok();
// TODO: replace with Vec::extract_if once stable
if this.shutdown.is_some() {
let mut i = 0;
@@ -86,12 +100,13 @@ impl<A: Actor + Clone> Future for ConcurrentRunner<A> {
&& !this
.handlers
.iter()
.any(|(_, f, _, _)| f(&*this.waiting[i].1))
.any(|(_, _, f, _, _)| f(&*this.waiting[i].1))
{
let (id, msg, reply) = this.waiting.remove(i);
let mut actor = this.actor.clone();
let queue = this.queue.clone();
this.handlers.push((
msg.type_name(),
id.clone(),
msg.conflicts_with(),
reply,
@@ -100,6 +115,18 @@ impl<A: Actor + Clone> Future for ConcurrentRunner<A> {
));
cont = true;
} else {
#[cfg(feature = "unstable")]
{
tracing::debug!("{} must wait...", this.waiting[i].1.type_name());
tracing::debug!(
"waiting on {:?}",
this.handlers
.iter()
.filter(|h| h.2(&*this.waiting[i].1))
.map(|h| (h.0, &h.1))
.collect::<Vec<_>>()
);
}
i += 1;
}
}
@@ -219,3 +246,77 @@ impl<A: Actor + Clone> ConcurrentActor<A> {
}
}
}
#[cfg(test)]
mod test {
use std::time::Duration;
use crate::rpc_continuations::Guid;
use crate::util::actor::background::BackgroundJobQueue;
use crate::util::actor::{Actor, ConflictBuilder, Handler};
#[derive(Clone)]
struct CActor;
impl Actor for CActor {
fn init(&mut self, jobs: &BackgroundJobQueue) {}
}
struct Pending;
impl Handler<Pending> for CActor {
type Response = ();
fn conflicts_with(_: &Pending) -> ConflictBuilder<Self> {
ConflictBuilder::everything().except::<NoConflicts>()
}
async fn handle(&mut self, _: Guid, _: Pending, _: &BackgroundJobQueue) -> Self::Response {
futures::future::pending().await
}
}
struct Conflicts;
impl Handler<Conflicts> for CActor {
type Response = ();
fn conflicts_with(_: &Conflicts) -> ConflictBuilder<Self> {
ConflictBuilder::everything().except::<NoConflicts>()
}
async fn handle(
&mut self,
_: Guid,
_: Conflicts,
_: &BackgroundJobQueue,
) -> Self::Response {
}
}
struct NoConflicts;
impl Handler<NoConflicts> for CActor {
type Response = ();
fn conflicts_with(_: &NoConflicts) -> ConflictBuilder<Self> {
ConflictBuilder::nothing()
}
async fn handle(
&mut self,
_: Guid,
_: NoConflicts,
_: &BackgroundJobQueue,
) -> Self::Response {
}
}
#[tokio::test]
async fn test_conflicts() {
let actor = super::ConcurrentActor::new(CActor);
let guid = Guid::new();
actor.queue(guid.clone(), Pending);
assert!(
tokio::time::timeout(Duration::from_secs(1), actor.send(Guid::new(), Conflicts))
.await
.is_err()
);
assert!(
tokio::time::timeout(Duration::from_secs(1), actor.send(Guid::new(), NoConflicts))
.await
.is_ok()
);
assert!(
tokio::time::timeout(Duration::from_secs(1), actor.send(guid, Conflicts))
.await
.is_ok()
);
}
}

View File

@@ -45,6 +45,9 @@ trait Message<A>: Send + Any {
actor: &'a mut A,
jobs: &'a BackgroundJobQueue,
) -> BoxFuture<'a, Box<dyn Any + Send>>;
fn type_name(&self) -> &'static str {
std::any::type_name_of_val(self)
}
}
impl<M: Send + Any, A: Actor> Message<A> for M
where