fix deadlock

This commit is contained in:
Aiden McClelland
2025-08-21 18:40:53 -06:00
parent 359146f02c
commit 5bee2cef96
6 changed files with 50 additions and 166 deletions

View File

@@ -1,4 +1,5 @@
ls-files = $(shell git ls-files --cached --others --exclude-standard $1)
PROFILE = release
PLATFORM_FILE := $(shell ./check-platform.sh)
ENVIRONMENT_FILE := $(shell ./check-environment.sh)
@@ -23,7 +24,7 @@ WEB_INSTALL_WIZARD_SRC := $(call ls-files, web/projects/install-wizard)
PATCH_DB_CLIENT_SRC := $(shell git ls-files --recurse-submodules patch-db/client)
GZIP_BIN := $(shell which pigz || which gzip)
TAR_BIN := $(shell which gtar || which tar)
COMPILED_TARGETS := core/target/$(ARCH)-unknown-linux-musl/release/startbox core/target/$(ARCH)-unknown-linux-musl/release/containerbox container-runtime/rootfs.$(ARCH).squashfs
COMPILED_TARGETS := core/target/$(ARCH)-unknown-linux-musl/$(PROFILE)/startbox core/target/$(ARCH)-unknown-linux-musl/release/containerbox container-runtime/rootfs.$(ARCH).squashfs
ALL_TARGETS := $(STARTD_SRC) $(ENVIRONMENT_FILE) $(GIT_HASH_FILE) $(VERSION_FILE) $(COMPILED_TARGETS) cargo-deps/$(ARCH)-unknown-linux-musl/release/startos-backup-fs $(PLATFORM_FILE) \
$(shell if [ "$(PLATFORM)" = "raspberrypi" ]; then \
echo cargo-deps/aarch64-unknown-linux-musl/release/pi-beep; \
@@ -133,7 +134,7 @@ results/$(BASENAME).$(IMAGE_TYPE) results/$(BASENAME).squashfs: $(IMAGE_RECIPE_S
install: $(ALL_TARGETS)
$(call mkdir,$(DESTDIR)/usr/bin)
$(call mkdir,$(DESTDIR)/usr/sbin)
$(call cp,core/target/$(ARCH)-unknown-linux-musl/release/startbox,$(DESTDIR)/usr/bin/startbox)
$(call cp,core/target/$(ARCH)-unknown-linux-musl/$(PROFILE)/startbox,$(DESTDIR)/usr/bin/startbox)
$(call ln,/usr/bin/startbox,$(DESTDIR)/usr/bin/startd)
$(call ln,/usr/bin/startbox,$(DESTDIR)/usr/bin/start-cli)
if [ "$(PLATFORM)" = "raspberrypi" ]; then $(call cp,cargo-deps/aarch64-unknown-linux-musl/release/pi-beep,$(DESTDIR)/usr/bin/pi-beep); fi
@@ -169,10 +170,10 @@ update-overlay: $(ALL_TARGETS)
$(MAKE) install REMOTE=$(REMOTE) SSHPASS=$(SSHPASS) PLATFORM=$(PLATFORM)
$(call ssh,"sudo systemctl start startd")
wormhole: core/target/$(ARCH)-unknown-linux-musl/release/startbox
wormhole: core/target/$(ARCH)-unknown-linux-musl/$(PROFILE)/startbox
@echo "Paste the following command into the shell of your StartOS server:"
@echo
@wormhole send core/target/$(ARCH)-unknown-linux-musl/release/startbox 2>&1 | awk -Winteractive '/wormhole receive/ { printf "sudo /usr/lib/startos/scripts/chroot-and-upgrade \"cd /usr/bin && rm startbox && wormhole receive --accept-file %s && chmod +x startbox\"\n", $$3 }'
@wormhole send core/target/$(ARCH)-unknown-linux-musl/$(PROFILE)/startbox 2>&1 | awk -Winteractive '/wormhole receive/ { printf "sudo /usr/lib/startos/scripts/chroot-and-upgrade \"cd /usr/bin && rm startbox && wormhole receive --accept-file %s && chmod +x startbox\"\n", $$3 }'
wormhole-deb: results/$(BASENAME).deb
@echo "Paste the following command into the shell of your StartOS server:"
@@ -192,10 +193,10 @@ update: $(ALL_TARGETS)
$(MAKE) install REMOTE=$(REMOTE) SSHPASS=$(SSHPASS) DESTDIR=/media/startos/next PLATFORM=$(PLATFORM)
$(call ssh,'sudo /media/startos/next/usr/lib/startos/scripts/chroot-and-upgrade --no-sync "apt-get install -y $(shell cat ./build/lib/depends)"')
update-startbox: core/target/$(ARCH)-unknown-linux-musl/release/startbox # only update binary (faster than full update)
update-startbox: core/target/$(ARCH)-unknown-linux-musl/$(PROFILE)/startbox # only update binary (faster than full update)
@if [ -z "$(REMOTE)" ]; then >&2 echo "Must specify REMOTE" && false; fi
$(call ssh,'sudo /usr/lib/startos/scripts/chroot-and-upgrade --create')
$(call cp,core/target/$(ARCH)-unknown-linux-musl/release/startbox,/media/startos/next/usr/bin/startbox)
$(call cp,core/target/$(ARCH)-unknown-linux-musl/$(PROFILE)/startbox,/media/startos/next/usr/bin/startbox)
$(call ssh,'sudo /media/startos/next/usr/lib/startos/scripts/chroot-and-upgrade --no-sync true')
update-deb: results/$(BASENAME).deb # better than update, but only available from debian
@@ -269,9 +270,9 @@ build/lib/depends build/lib/conflicts: build/dpkg-deps/*
$(FIRMWARE_ROMS): build/lib/firmware.json download-firmware.sh $(PLATFORM_FILE)
./download-firmware.sh $(PLATFORM)
core/target/$(ARCH)-unknown-linux-musl/release/startbox: $(CORE_SRC) $(COMPRESSED_WEB_UIS) web/patchdb-ui-seed.json $(ENVIRONMENT_FILE)
ARCH=$(ARCH) ./core/build-startbox.sh
touch core/target/$(ARCH)-unknown-linux-musl/release/startbox
core/target/$(ARCH)-unknown-linux-musl/$(PROFILE)/startbox: $(CORE_SRC) $(COMPRESSED_WEB_UIS) web/patchdb-ui-seed.json $(ENVIRONMENT_FILE)
ARCH=$(ARCH) PROFILE=$(PROFILE) ./core/build-startbox.sh
touch core/target/$(ARCH)-unknown-linux-musl/$(PROFILE)/startbox
core/target/$(ARCH)-unknown-linux-musl/release/containerbox: $(CORE_SRC) $(ENVIRONMENT_FILE)
ARCH=$(ARCH) ./core/build-containerbox.sh

View File

@@ -1,5 +1,10 @@
#!/bin/bash
PROFILE=${PROFILE:-release}
if [ "${PROFILE}" = "release" ]; then
BUILD_FLAGS="--release"
fi
cd "$(dirname "${BASH_SOURCE[0]}")"
set -ea
@@ -30,7 +35,7 @@ alias 'rust-musl-builder'='docker run $USE_TTY --rm -e "RUSTFLAGS=$RUSTFLAGS" -v
echo "FEATURES=\"$FEATURES\""
echo "RUSTFLAGS=\"$RUSTFLAGS\""
rust-musl-builder sh -c "cd core && cargo build --release --no-default-features --features cli,startd,$FEATURES --locked --bin startbox --target=$ARCH-unknown-linux-musl"
if [ "$(ls -nd core/target/$ARCH-unknown-linux-musl/release/startbox | awk '{ print $3 }')" != "$UID" ]; then
rust-musl-builder sh -c "cd core && cargo build $BUILD_FLAGS --no-default-features --features cli,startd,$FEATURES --locked --bin startbox --target=$ARCH-unknown-linux-musl"
if [ "$(ls -nd core/target/$ARCH-unknown-linux-musl/${PROFILE}/startbox | awk '{ print $3 }')" != "$UID" ]; then
rust-musl-builder sh -c "cd core && chown -R $UID:$UID target && chown -R $UID:$UID /root/.cargo"
fi

View File

@@ -144,7 +144,7 @@ pub fn main(args: impl IntoIterator<Item = OsString>) {
let res = {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(max(4, num_cpus::get()))
.worker_threads(max(1, num_cpus::get()))
.enable_all()
.build()
.expect("failed to initialize runtime");

View File

@@ -22,6 +22,7 @@ use hickory_server::server::{Request, RequestHandler, ResponseHandler, ResponseI
use hickory_server::ServerFuture;
use imbl::OrdMap;
use imbl_value::InternedString;
use itertools::Itertools;
use models::{GatewayId, OptionExt, PackageId};
use rpc_toolkit::{
from_fn_async, from_fn_blocking, Context, HandlerArgs, HandlerExt, ParentHandler,
@@ -164,7 +165,7 @@ impl DnsClient {
if let Err::<(), Error>(e) = async {
let mut stream = file_string_stream("/run/systemd/resolve/resolv.conf")
.filter_map(|a| futures::future::ready(a.transpose())).boxed();
let mut conf = stream
let mut conf: String = stream
.next()
.await
.or_not_found("/run/systemd/resolve/resolv.conf")??;
@@ -175,6 +176,7 @@ impl DnsClient {
.lines()
.map(|l| l.trim())
.filter_map(|l| l.strip_prefix("nameserver "))
.skip(2)
.map(|n| {
n.parse::<SocketAddr>()
.or_else(|_| n.parse::<IpAddr>().map(|a| (a, 53).into()))
@@ -408,10 +410,12 @@ impl RequestHandler for Resolver {
}
} else {
let query = query.original().clone();
let mut streams = self.client.lookup(query, DnsRequestOptions::default());
let mut streams = self
.client
.lookup(dbg!(query), DnsRequestOptions::default());
let mut err = None;
for stream in streams.iter_mut() {
match stream.next().await {
match dbg!(stream.next().await) {
None => (),
Some(Err(e)) => err = Some(e),
Some(Ok(msg)) => {
@@ -433,18 +437,9 @@ impl RequestHandler for Resolver {
tracing::error!("{e}");
tracing::debug!("{e:?}");
}
let res = Header::response_from_request(request.header());
response_handle
.send_response(
MessageResponseBuilder::from_message_request(&*request).build(
res.into(),
[],
[],
[],
[],
),
)
.await
let mut res = Header::response_from_request(request.header());
res.set_response_code(ResponseCode::ServFail);
Ok(res.into())
}
}
.await

View File

@@ -25,6 +25,7 @@ use ts_rs::TS;
use zbus::proxy::{PropertyChanged, PropertyStream, SignalStream};
use zbus::zvariant::{
DeserializeDict, Dict, OwnedObjectPath, OwnedValue, Type as ZType, Value as ZValue,
DICT_ENTRY_SIG_END_STR,
};
use zbus::{proxy, Connection};
@@ -1185,17 +1186,10 @@ impl ListenerMap {
}
pub trait InterfaceFilter: Any + Clone + std::fmt::Debug + Eq + Ord + Send + Sync {
#[cfg_attr(feature = "unstable", inline(never))]
fn filter(&self, id: &GatewayId, info: &NetworkInterfaceInfo) -> bool;
#[cfg_attr(feature = "unstable", inline(never))]
fn simplify(&self) -> &dyn DynInterfaceFilterT {
self
}
#[cfg_attr(feature = "unstable", inline(never))]
fn eq(&self, other: &dyn Any) -> bool {
Some(self) == other.downcast_ref::<Self>()
}
#[cfg_attr(feature = "unstable", inline(never))]
fn cmp(&self, other: &dyn Any) -> std::cmp::Ordering {
match (self as &dyn Any).type_id().cmp(&other.type_id()) {
std::cmp::Ordering::Equal => {
@@ -1204,20 +1198,10 @@ pub trait InterfaceFilter: Any + Clone + std::fmt::Debug + Eq + Ord + Send + Syn
ord => ord,
}
}
#[cfg_attr(feature = "unstable", inline(never))]
fn as_any(&self) -> &dyn Any {
self
}
fn into_dyn(self) -> DynInterfaceFilter {
#[cfg(feature = "unstable")]
{
let res = DynInterfaceFilter::new(self.clone());
if !DynInterfaceFilterT::eq(&self, &res) || !DynInterfaceFilterT::eq(&res, &self) {
panic!("self != self")
}
res
}
#[cfg(not(feature = "unstable"))]
DynInterfaceFilter::new(self)
}
}
@@ -1262,47 +1246,6 @@ impl<A: InterfaceFilter, B: InterfaceFilter> InterfaceFilter for AndFilter<A, B>
fn filter(&self, id: &GatewayId, info: &NetworkInterfaceInfo) -> bool {
self.0.filter(id, info) && self.1.filter(id, info)
}
fn simplify(&self) -> &dyn DynInterfaceFilterT {
if InterfaceFilter::eq(&self.0, &self.1) {
&self.0
} else {
self
}
}
fn eq(&self, other: &dyn Any) -> bool {
if let Some(other) = other.downcast_ref::<Self>() {
(InterfaceFilter::eq(&self.0, other.0.as_any())
&& InterfaceFilter::eq(&self.1, other.1.as_any()))
|| (InterfaceFilter::eq(&self.0, other.1.as_any())
&& InterfaceFilter::eq(&self.1, other.0.as_any()))
} else {
false
}
}
fn cmp(&self, other: &dyn Any) -> std::cmp::Ordering {
if let Some(other) = other.downcast_ref::<Self>() {
let mut lhs: [&dyn DynInterfaceFilterT; 2] = [&self.0, &self.1];
lhs.sort_by(|a, b| a.cmp(b.as_any()));
let mut rhs: [&dyn DynInterfaceFilterT; 2] = [&other.0, &other.1];
rhs.sort_by(|a, b| a.cmp(b.as_any()));
lhs.iter()
.zip_eq(rhs)
.fold_while(std::cmp::Ordering::Equal, |acc, (a, b)| {
match a.cmp(b.as_any()) {
std::cmp::Ordering::Equal => itertools::FoldWhile::Continue(acc),
ord => itertools::FoldWhile::Done(ord),
}
})
.into_inner()
} else {
match (self as &dyn Any).type_id().cmp(&other.type_id()) {
std::cmp::Ordering::Equal => {
std::cmp::Ord::cmp(self, other.downcast_ref::<Self>().unwrap())
}
ord => ord,
}
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
@@ -1311,47 +1254,6 @@ impl<A: InterfaceFilter, B: InterfaceFilter> InterfaceFilter for OrFilter<A, B>
fn filter(&self, id: &GatewayId, info: &NetworkInterfaceInfo) -> bool {
self.0.filter(id, info) || self.1.filter(id, info)
}
fn simplify(&self) -> &dyn DynInterfaceFilterT {
if InterfaceFilter::eq(&self.0, &self.1) {
&self.0
} else {
self
}
}
fn eq(&self, other: &dyn Any) -> bool {
if let Some(other) = other.downcast_ref::<Self>() {
(InterfaceFilter::eq(&self.0, other.0.as_any())
&& InterfaceFilter::eq(&self.1, other.1.as_any()))
|| (InterfaceFilter::eq(&self.0, other.1.as_any())
&& InterfaceFilter::eq(&self.1, other.0.as_any()))
} else {
false
}
}
fn cmp(&self, other: &dyn Any) -> std::cmp::Ordering {
if let Some(other) = other.downcast_ref::<Self>() {
let mut lhs: [&dyn DynInterfaceFilterT; 2] = [&self.0, &self.1];
lhs.sort_by(|a, b| a.cmp(b.as_any()));
let mut rhs: [&dyn DynInterfaceFilterT; 2] = [&other.0, &other.1];
rhs.sort_by(|a, b| a.cmp(b.as_any()));
lhs.iter()
.zip_eq(rhs)
.fold_while(std::cmp::Ordering::Equal, |acc, (a, b)| {
match a.cmp(b.as_any()) {
std::cmp::Ordering::Equal => itertools::FoldWhile::Continue(acc),
ord => itertools::FoldWhile::Done(ord),
}
})
.into_inner()
} else {
match (self as &dyn Any).type_id().cmp(&other.type_id()) {
std::cmp::Ordering::Equal => {
std::cmp::Ord::cmp(self, other.downcast_ref::<Self>().unwrap())
}
ord => ord,
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
@@ -1360,13 +1262,6 @@ impl InterfaceFilter for AnyFilter {
fn filter(&self, id: &GatewayId, info: &NetworkInterfaceInfo) -> bool {
self.0.iter().any(|f| InterfaceFilter::filter(f, id, info))
}
fn simplify(&self) -> &dyn DynInterfaceFilterT {
match self.0.len() {
0 => &false,
1 => self.0.first().unwrap(),
_ => self,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
@@ -1375,13 +1270,6 @@ impl InterfaceFilter for AllFilter {
fn filter(&self, id: &GatewayId, info: &NetworkInterfaceInfo) -> bool {
self.0.iter().all(|f| InterfaceFilter::filter(f, id, info))
}
fn simplify(&self) -> &dyn DynInterfaceFilterT {
match self.0.len() {
0 => &true,
1 => self.0.first().unwrap(),
_ => self,
}
}
}
pub trait DynInterfaceFilterT: std::fmt::Debug + Any + Send + Sync {
@@ -1395,31 +1283,25 @@ impl<T: InterfaceFilter> DynInterfaceFilterT for T {
InterfaceFilter::filter(self, id, info)
}
fn eq(&self, other: &dyn Any) -> bool {
let simplified = self.simplify();
if (simplified as &dyn Any).is::<Self>() {
InterfaceFilter::eq(self, other)
} else {
dbg!(simplified.eq(other))
}
InterfaceFilter::eq(self, other)
}
fn cmp(&self, other: &dyn Any) -> std::cmp::Ordering {
let simplified = self.simplify();
if (simplified as &dyn Any).is::<Self>() {
InterfaceFilter::cmp(self, other)
} else {
simplified.cmp(other)
}
InterfaceFilter::cmp(self, other)
}
fn as_any(&self) -> &dyn Any {
let simplified = self.simplify();
if (simplified as &dyn Any).is::<Self>() {
InterfaceFilter::as_any(self)
} else {
simplified.as_any()
}
InterfaceFilter::as_any(self)
}
}
#[test]
fn test_interface_filter_eq() {
let dyn_t = true.into_dyn();
assert!(DynInterfaceFilterT::eq(
&dyn_t,
DynInterfaceFilterT::as_any(&true),
))
}
#[derive(Clone, Debug)]
pub struct DynInterfaceFilter(Arc<dyn DynInterfaceFilterT>);
impl InterfaceFilter for DynInterfaceFilter {
@@ -1582,7 +1464,7 @@ impl NetworkInterfaceListener {
filter: &impl InterfaceFilter,
) -> Poll<Result<Accepted, Error>> {
while self.ip_info.poll_changed(cx).is_ready()
|| !InterfaceFilter::eq(&self.listeners.prev_filter, filter)
|| !DynInterfaceFilterT::eq(&self.listeners.prev_filter, filter.as_any())
{
self.ip_info
.peek_and_mark_seen(|ip_info| self.listeners.update(ip_info, filter))?;

View File

@@ -2,6 +2,7 @@ pub mod eq_map;
pub mod eq_set;
use std::marker::PhantomData;
use std::ops::Bound;
pub use eq_map::EqMap;
pub use eq_set::EqSet;
@@ -26,15 +27,15 @@ impl<'a, K: Ord + Clone, V: Clone> Iterator for OrdMapIterMut<'a, K, V> {
fn next(&mut self) -> Option<Self::Item> {
unsafe {
let map: &'a mut OrdMap<K, V> = self.map.as_mut().unwrap();
let res = if let Some(k) = self.prev.take() {
map.get_next_mut(k)
let Some((k, _)) = (if let Some(k) = self.prev.take() {
map.range((Bound::Excluded(k), Bound::Unbounded)).next()
} else {
let Some((k, _)) = map.get_min() else {
return None;
};
let k = k.clone(); // hate that I have to do this but whatev
map.get_key_value_mut(&k)
map.get_min().map(|(k, v)| (k, v))
}) else {
return None;
};
let k = k.clone(); // hate that I have to do this but whatev
let res = map.get_key_value_mut(&k);
if let Some((k, _)) = &res {
self.prev = Some(*k);
}