Bugfixes for alpha.12 (#3049)

* squashfs-wip

* sdk fixes

* misc fixes

* bump sdk

* Include StartTunnel installation command

Added installation instructions for StartTunnel.

* CA instead of leaf for StartTunnel (#3046)

* updated docs for CA instead of cert

* generate ca instead of self-signed in start-tunnel

* Fix formatting in START-TUNNEL.md installation instructions

* Fix formatting in START-TUNNEL.md

* fix infinite loop

* add success message to install

* hide loopback and bridge gateways

---------

Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>

* prevent gateways from getting stuck empty

* fix set-password

* misc networking fixes

* build and efi fixes

* efi fixes

* alpha.13

* remove cross

* fix tests

* provide path to upgrade

* fix networkmanager issues

* remove squashfs before creating

---------

Co-authored-by: Matt Hill <MattDHill@users.noreply.github.com>
This commit is contained in:
Aiden McClelland
2025-11-15 22:33:03 -07:00
committed by GitHub
parent edb916338c
commit 2fbaaebf44
61 changed files with 856 additions and 693 deletions

View File

@@ -15,7 +15,7 @@ license = "MIT"
name = "start-os"
readme = "README.md"
repository = "https://github.com/Start9Labs/start-os"
version = "0.4.0-alpha.12" # VERSION_BUMP
version = "0.4.0-alpha.13" # VERSION_BUMP
[lib]
name = "startos"
@@ -93,7 +93,6 @@ async-compression = { version = "0.4.32", features = [
] }
async-stream = "0.3.5"
async-trait = "0.1.74"
aws-lc-sys = { version = "0.32", features = ["bindgen"] }
axum = { version = "0.8.4", features = ["ws"] }
backtrace-on-stack-overflow = { version = "0.3.0", optional = true }
barrage = "0.2.3"
@@ -223,7 +222,7 @@ regex = "1.10.2"
reqwest = { version = "0.12.4", features = ["json", "socks", "stream"] }
reqwest_cookie_store = "0.8.0"
rpassword = "7.2.0"
rpc-toolkit = { git = "https://github.com/Start9Labs/rpc-toolkit.git", branch = "master" }
rpc-toolkit = { git = "https://github.com/Start9Labs/rpc-toolkit.git", rev = "068db90" }
rust-argon2 = "2.0.0"
safelog = { version = "0.4.8", git = "https://github.com/Start9Labs/arti.git", branch = "patch/disable-exit", optional = true }
semver = { version = "1.0.20", features = ["serde"] }
@@ -252,7 +251,7 @@ termion = "4.0.5"
textwrap = "0.16.1"
thiserror = "2.0.12"
tokio = { version = "1.38.1", features = ["full"] }
tokio-rustls = "0.26.0"
tokio-rustls = "0.26.4"
tokio-stream = { version = "0.1.14", features = ["io-util", "net", "sync"] }
tokio-tar = { git = "https://github.com/dr-bonez/tokio-tar.git" }
tokio-tungstenite = { version = "0.26.2", features = ["native-tls", "url"] }

View File

@@ -22,7 +22,7 @@ use crate::tunnel::tunnel_router;
use crate::tunnel::web::TunnelCertHandler;
use crate::util::logger::LOGGER;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
enum WebserverListener {
Http,
Https(SocketAddr),

View File

@@ -48,7 +48,6 @@ pub async fn bind<P0: AsRef<Path>, P1: AsRef<Path>>(
pub async fn unmount<P: AsRef<Path>>(mountpoint: P, lazy: bool) -> Result<(), Error> {
tracing::debug!("Unmounting {}.", mountpoint.as_ref().display());
let mut cmd = tokio::process::Command::new("umount");
cmd.arg("-R");
if lazy {
cmd.arg("-l");
}

View File

@@ -280,6 +280,9 @@ pub async fn list(os: &OsPartitionInfo) -> Result<Vec<DiskInfo>, Error> {
.try_fold(
BTreeMap::<PathBuf, DiskIndex>::new(),
|mut disks, dir_entry| async move {
if dir_entry.file_type().await?.is_dir() {
return Ok(disks);
}
if let Some(disk_path) = dir_entry.path().file_name().and_then(|s| s.to_str()) {
let (disk_path, part_path) = if let Some(end) = PARTITION_REGEX.find(disk_path) {
(

View File

@@ -26,6 +26,7 @@ use crate::context::{CliContext, RpcContext};
use crate::db::model::Database;
use crate::db::model::public::AcmeSettings;
use crate::db::{DbAccess, DbAccessByKey, DbAccessMut};
use crate::net::ssl::should_use_cert;
use crate::net::tls::{SingleCertResolver, TlsHandler};
use crate::net::web_server::Accept;
use crate::prelude::*;
@@ -63,20 +64,27 @@ where
.and_then(|p| p.as_idx(JsonKey::new_ref(san_info)))
{
let cert = cert.de().log_err()?;
return Some(
CertifiedKey::from_der(
cert.fullchain
.into_iter()
.map(|c| Ok(CertificateDer::from(c.to_der()?)))
.collect::<Result<_, Error>>()
.log_err()?,
PrivateKeyDer::from(PrivatePkcs8KeyDer::from(
cert.key.0.private_key_to_pkcs8().log_err()?,
)),
&*self.crypto_provider,
)
.log_err()?,
);
if cert
.fullchain
.get(0)
.and_then(|c| should_use_cert(&c.0).log_err())
.unwrap_or(false)
{
return Some(
CertifiedKey::from_der(
cert.fullchain
.into_iter()
.map(|c| Ok(CertificateDer::from(c.to_der()?)))
.collect::<Result<_, Error>>()
.log_err()?,
PrivateKeyDer::from(PrivatePkcs8KeyDer::from(
cert.key.0.private_key_to_pkcs8().log_err()?,
)),
&*self.crypto_provider,
)
.log_err()?,
);
}
}
if !self.in_progress.send_if_modified(|x| {
@@ -307,6 +315,16 @@ where
return Ok(None);
};
let cert = cert.de()?;
if !cert
.fullchain
.get(0)
.map(|c| should_use_cert(&c.0))
.transpose()
.map_err(Error::from)?
.unwrap_or(false)
{
return Ok(None);
}
Ok(Some((
String::from_utf8(
cert.key

View File

@@ -437,7 +437,8 @@ impl InterfaceForwardState {
for mut entry in self.state.iter_mut() {
entry.gc(ip_info, &self.port_forward).await?;
}
Ok(())
self.port_forward.gc().await
}
}
@@ -537,7 +538,6 @@ impl InterfacePortForwardController {
_ = ip_info.changed() => {
interfaces = ip_info.read();
state.sync(&interfaces).await.log_err();
state.port_forward.gc().await.log_err();
}
}
}

View File

@@ -1,5 +1,6 @@
use std::any::Any;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt;
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV6};
use std::sync::{Arc, Weak};
@@ -130,7 +131,6 @@ async fn list_interfaces(
}
#[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)]
#[ts(export)]
struct NetworkInterfaceSetPublicParams {
gateway: GatewayId,
public: Option<bool>,
@@ -147,7 +147,6 @@ async fn set_public(
}
#[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)]
#[ts(export)]
struct UnsetPublicParams {
gateway: GatewayId,
}
@@ -163,7 +162,6 @@ async fn unset_public(
}
#[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)]
#[ts(export)]
struct ForgetGatewayParams {
gateway: GatewayId,
}
@@ -176,7 +174,6 @@ async fn forget_iface(
}
#[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)]
#[ts(export)]
struct RenameGatewayParams {
id: GatewayId,
name: InternedString,
@@ -404,6 +401,12 @@ async fn watcher(
) {
loop {
let res: Result<(), Error> = async {
Command::new("systemctl")
.arg("start")
.arg("NetworkManager")
.invoke(ErrorKind::Network)
.await?;
let connection = Connection::system().await?;
let netman_proxy = NetworkManagerProxy::new(&connection).await?;
@@ -436,6 +439,11 @@ async fn watcher(
until
.run(async {
let devices = netman_proxy.all_devices().await?;
ensure_code!(
!devices.is_empty(),
ErrorKind::Network,
"NetworkManager returned no devices. Trying again..."
);
let mut ifaces = BTreeSet::new();
let mut jobs = Vec::new();
for device in devices {
@@ -1538,6 +1546,14 @@ pub struct NetworkInterfaceListenerAcceptMetadata<B: Bind> {
pub inner: <B::Accept as Accept>::Metadata,
pub info: GatewayInfo,
}
impl<B: Bind> fmt::Debug for NetworkInterfaceListenerAcceptMetadata<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NetworkInterfaceListenerAcceptMetadata")
.field("inner", &self.inner)
.field("info", &self.info)
.finish()
}
}
impl<B: Bind> Clone for NetworkInterfaceListenerAcceptMetadata<B>
where
<B::Accept as Accept>::Metadata: Clone,
@@ -1614,3 +1630,39 @@ where
Self::new(Some(Either::Left(listener)))
}
}
#[test]
fn test_filter() {
use crate::net::host::binding::NetInfo;
let wg1 = "wg1".parse::<GatewayId>().unwrap();
assert!(!InterfaceFilter::filter(
&AndFilter(
NetInfo {
private_disabled: [wg1.clone()].into_iter().collect(),
public_enabled: Default::default(),
assigned_port: None,
assigned_ssl_port: None,
},
AndFilter(IdFilter(wg1.clone()), PublicFilter { public: false }),
)
.into_dyn(),
&wg1,
&NetworkInterfaceInfo {
name: None,
public: None,
secure: None,
ip_info: Some(Arc::new(IpInfo {
name: "".into(),
scope_id: 3,
device_type: Some(NetworkInterfaceType::Wireguard),
subnets: ["10.59.0.2/24".parse::<IpNet>().unwrap()]
.into_iter()
.collect(),
lan_ip: Default::default(),
wan_ip: None,
ntp_servers: Default::default(),
dns_servers: Default::default(),
})),
},
));
}

View File

@@ -19,7 +19,7 @@ use openssl::x509::extension::{
AuthorityKeyIdentifier, BasicConstraints, KeyUsage, SubjectAlternativeName,
SubjectKeyIdentifier,
};
use openssl::x509::{X509, X509Builder, X509NameBuilder};
use openssl::x509::{X509, X509Builder, X509NameBuilder, X509Ref};
use openssl::*;
use patch_db::HasModel;
use serde::{Deserialize, Serialize};
@@ -48,6 +48,17 @@ pub fn gen_nistp256() -> Result<PKey<Private>, ErrorStack> {
)?)?)
}
pub fn should_use_cert(cert: &X509Ref) -> Result<bool, ErrorStack> {
Ok(cert
.not_before()
.compare(Asn1Time::days_from_now(0)?.as_ref())?
== Ordering::Less
&& cert
.not_after()
.compare(Asn1Time::days_from_now(30)?.as_ref())?
== Ordering::Greater)
}
#[derive(Debug, Deserialize, Serialize, HasModel)]
#[model = "Model<Self>"]
#[serde(rename_all = "camelCase")]
@@ -83,30 +94,8 @@ impl Model<CertStore> {
.map(|m| m.de())
.transpose()?
{
if cert_data
.certs
.ed25519
.not_before()
.compare(Asn1Time::days_from_now(0)?.as_ref())?
== Ordering::Less
&& cert_data
.certs
.ed25519
.not_after()
.compare(Asn1Time::days_from_now(30)?.as_ref())?
== Ordering::Greater
&& cert_data
.certs
.nistp256
.not_before()
.compare(Asn1Time::days_from_now(0)?.as_ref())?
== Ordering::Less
&& cert_data
.certs
.nistp256
.not_after()
.compare(Asn1Time::days_from_now(30)?.as_ref())?
== Ordering::Greater
if should_use_cert(&cert_data.certs.ed25519)?
&& should_use_cert(&cert_data.certs.nistp256)?
{
return Ok(FullchainCertData {
root: self.as_root_cert().de()?.0,

View File

@@ -889,7 +889,8 @@ async fn torctl(
}
}
}
Err(Error::new(eyre!("Log stream terminated"), ErrorKind::Tor))
// Err(Error::new(eyre!("Log stream terminated"), ErrorKind::Tor))
Ok(())
};
let health_checker = async {
let mut last_success = Instant::now();

View File

@@ -1,5 +1,6 @@
use std::any::Any;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, Weak};
use std::task::{Poll, ready};
@@ -41,6 +42,7 @@ use crate::net::tls::{
use crate::net::web_server::{Accept, AcceptStream, ExtractVisitor, TcpMetadata, extract};
use crate::prelude::*;
use crate::util::collections::EqSet;
use crate::util::future::WeakFuture;
use crate::util::serde::{HandlerExtSerde, MaybeUtf8String, display_serializable};
use crate::util::sync::{SyncMutex, Watch};
@@ -134,7 +136,6 @@ impl VHostController {
pub fn dump_table(
&self,
) -> BTreeMap<JsonKey<u16>, BTreeMap<JsonKey<Option<InternedString>>, EqSet<String>>> {
let ip_info = self.interfaces.watcher.ip_info();
self.servers.peek(|s| {
s.iter()
.map(|(k, v)| {
@@ -187,7 +188,7 @@ pub trait VHostTarget<A: Accept>: std::fmt::Debug + Eq {
hello: &'a ClientHello<'a>,
metadata: &'a <A as Accept>::Metadata,
) -> impl Future<Output = Option<(ServerConfig, Self::PreprocessRes)>> + Send + 'a;
fn handle_stream(&self, stream: AcceptStream, prev: Self::PreprocessRes);
fn handle_stream(&self, stream: AcceptStream, prev: Self::PreprocessRes, rc: Weak<()>);
}
pub trait DynVHostTargetT<A: Accept>: std::fmt::Debug + Any {
@@ -199,7 +200,7 @@ pub trait DynVHostTargetT<A: Accept>: std::fmt::Debug + Any {
hello: &'a ClientHello<'a>,
metadata: &'a <A as Accept>::Metadata,
) -> BoxFuture<'a, Option<(ServerConfig, Box<dyn Any + Send>)>>;
fn handle_stream(&self, stream: AcceptStream, prev: Box<dyn Any + Send>);
fn handle_stream(&self, stream: AcceptStream, prev: Box<dyn Any + Send>, rc: Weak<()>);
fn eq(&self, other: &dyn DynVHostTargetT<A>) -> bool;
}
impl<A: Accept, T: VHostTarget<A> + 'static> DynVHostTargetT<A> for T {
@@ -219,9 +220,9 @@ impl<A: Accept, T: VHostTarget<A> + 'static> DynVHostTargetT<A> for T {
.map(|o| o.map(|(cfg, res)| (cfg, Box::new(res) as Box<dyn Any + Send>)))
.boxed()
}
fn handle_stream(&self, stream: AcceptStream, prev: Box<dyn Any + Send>) {
fn handle_stream(&self, stream: AcceptStream, prev: Box<dyn Any + Send>, rc: Weak<()>) {
if let Ok(prev) = prev.downcast() {
VHostTarget::handle_stream(self, stream, *prev);
VHostTarget::handle_stream(self, stream, *prev, rc);
}
}
fn eq(&self, other: &dyn DynVHostTargetT<A>) -> bool {
@@ -251,21 +252,27 @@ impl<A: Accept + 'static> PartialEq for DynVHostTarget<A> {
}
}
impl<A: Accept + 'static> Eq for DynVHostTarget<A> {}
struct Preprocessed<A: Accept>(DynVHostTarget<A>, Box<dyn Any + Send>);
struct Preprocessed<A: Accept>(DynVHostTarget<A>, Weak<()>, Box<dyn Any + Send>);
impl<A: Accept> fmt::Debug for Preprocessed<A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
(self.0).0.fmt(f)
}
}
impl<A: Accept + 'static> DynVHostTarget<A> {
async fn into_preprocessed(
self,
rc: Weak<()>,
prev: ServerConfig,
hello: &ClientHello<'_>,
metadata: &<A as Accept>::Metadata,
) -> Option<(ServerConfig, Preprocessed<A>)> {
let (cfg, res) = self.0.preprocess(prev, hello, metadata).await?;
Some((cfg, Preprocessed(self, res)))
Some((cfg, Preprocessed(self, rc, res)))
}
}
impl<A: Accept + 'static> Preprocessed<A> {
fn finish(self, stream: AcceptStream) {
(self.0).0.handle_stream(stream, self.1);
(self.0).0.handle_stream(stream, self.2, self.1);
}
}
@@ -279,6 +286,7 @@ pub struct ProxyTarget {
impl PartialEq for ProxyTarget {
fn eq(&self, other: &Self) -> bool {
self.filter == other.filter
&& self.acme == other.acme
&& self.addr == other.addr
&& self.connect_ssl.as_ref().map(Arc::as_ptr)
== other.connect_ssl.as_ref().map(Arc::as_ptr)
@@ -294,6 +302,9 @@ where
type PreprocessRes = AcceptStream;
fn filter(&self, metadata: &<A as Accept>::Metadata) -> bool {
let info = extract::<GatewayInfo, _>(metadata);
if info.is_none() {
tracing::warn!("No GatewayInfo on metadata");
}
info.as_ref()
.map_or(true, |i| self.filter.filter(&i.id, &i.info))
}
@@ -304,7 +315,7 @@ where
&'a self,
mut prev: ServerConfig,
hello: &'a ClientHello<'a>,
metadata: &'a <A as Accept>::Metadata,
_: &'a <A as Accept>::Metadata,
) -> Option<(ServerConfig, Self::PreprocessRes)> {
let tcp_stream = TcpStream::connect(self.addr)
.await
@@ -345,8 +356,10 @@ where
}
Some((prev, Box::pin(tcp_stream)))
}
fn handle_stream(&self, mut stream: AcceptStream, mut prev: Self::PreprocessRes) {
tokio::spawn(async move { tokio::io::copy_bidirectional(&mut stream, &mut prev).await });
fn handle_stream(&self, mut stream: AcceptStream, mut prev: Self::PreprocessRes, rc: Weak<()>) {
tokio::spawn(async move {
WeakFuture::new(rc, tokio::io::copy_bidirectional(&mut stream, &mut prev)).await
});
}
}
@@ -436,16 +449,16 @@ where
return Some(prev);
}
let target = self.0.peek(|m| {
let (target, rc) = self.0.peek(|m| {
m.get(&hello.server_name().map(InternedString::from))
.into_iter()
.flatten()
.filter(|(_, rc)| rc.strong_count() > 0)
.find(|(t, _)| t.0.filter(metadata))
.map(|(e, _)| e.clone())
.map(|(t, rc)| (t.clone(), rc.clone()))
})?;
let (prev, store) = target.into_preprocessed(prev, hello, metadata).await?;
let (prev, store) = target.into_preprocessed(rc, prev, hello, metadata).await?;
self.1 = Some(store);
@@ -480,6 +493,14 @@ struct VHostListenerMetadata<A: Accept> {
inner: TlsMetadata<A::Metadata>,
preprocessed: Preprocessed<A>,
}
impl<A: Accept> fmt::Debug for VHostListenerMetadata<A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("VHostListenerMetadata")
.field("inner", &self.inner)
.field("preprocessed", &self.preprocessed)
.finish()
}
}
impl<M, A> Accept for VHostListener<M, A>
where
for<'a> M: HasModel<Model = Model<M>>
@@ -637,6 +658,7 @@ impl<A: Accept> VHostServer<A> {
changed = true;
Arc::new(())
};
targets.retain(|_, rc| rc.strong_count() > 0);
targets.insert(target, Arc::downgrade(&rc));
writable.insert(hostname, targets);
res = Ok(rc);

View File

@@ -1,3 +1,4 @@
use core::fmt;
use std::any::Any;
use std::collections::BTreeMap;
use std::future::Future;
@@ -68,7 +69,7 @@ pub fn extract<
metadata: &M,
) -> Option<T> {
let mut visitor = ExtractVisitor(None);
visitor.visit(metadata);
metadata.visit(&mut visitor);
visitor.0
}
@@ -84,7 +85,7 @@ impl<V: MetadataVisitor> Visit<V> for TcpMetadata {
}
pub trait Accept {
type Metadata;
type Metadata: fmt::Debug;
fn poll_accept(
&mut self,
cx: &mut std::task::Context<'_>,
@@ -144,7 +145,7 @@ where
}
}
#[derive(Clone, VisitFields)]
#[derive(Debug, Clone, VisitFields)]
pub struct MapListenerMetadata<K, M> {
pub inner: M,
pub key: K,
@@ -162,7 +163,7 @@ where
impl<K, A> Accept for BTreeMap<K, A>
where
K: Clone,
K: Clone + fmt::Debug,
A: Accept,
{
type Metadata = MapListenerMetadata<K, A::Metadata>;
@@ -218,40 +219,38 @@ trait DynAcceptT: Send + Sync {
fn poll_accept(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<
Result<
(
Box<dyn for<'a> Visit<ExtensionVisitor<'a>> + Send + Sync>,
AcceptStream,
),
Error,
>,
>;
) -> Poll<Result<(DynMetadata, AcceptStream), Error>>;
}
impl<A> DynAcceptT for A
where
A: Accept + Send + Sync,
for<'a> <A as Accept>::Metadata: Visit<ExtensionVisitor<'a>> + Send + Sync + 'static,
<A as Accept>::Metadata: DynMetadataT + 'static,
{
fn poll_accept(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<
Result<
(
Box<dyn for<'a> Visit<ExtensionVisitor<'a>> + Send + Sync>,
AcceptStream,
),
Error,
>,
> {
) -> Poll<Result<(DynMetadata, AcceptStream), Error>> {
let (metadata, stream) = ready!(Accept::poll_accept(self, cx)?);
Poll::Ready(Ok((Box::new(metadata), stream)))
Poll::Ready(Ok((DynMetadata(Box::new(metadata)), stream)))
}
}
pub struct DynAccept(Box<dyn DynAcceptT>);
trait DynMetadataT: for<'a> Visit<ExtensionVisitor<'a>> + fmt::Debug + Send + Sync {}
impl<T> DynMetadataT for T where for<'a> T: Visit<ExtensionVisitor<'a>> + fmt::Debug + Send + Sync {}
#[derive(Debug)]
pub struct DynMetadata(Box<dyn DynMetadataT>);
impl<'a> Visit<ExtensionVisitor<'a>> for DynMetadata {
fn visit(
&self,
visitor: &mut ExtensionVisitor<'a>,
) -> <ExtensionVisitor<'a> as Visitor>::Result {
self.0.visit(visitor)
}
}
impl Accept for DynAccept {
type Metadata = Box<dyn for<'a> Visit<ExtensionVisitor<'a>> + Send + Sync>;
type Metadata = DynMetadata;
fn poll_accept(
&mut self,
cx: &mut std::task::Context<'_>,
@@ -325,7 +324,7 @@ impl Acceptor<Vec<DynAccept>> {
}
impl<K> Acceptor<BTreeMap<K, TcpListener>>
where
K: Ord + Clone + Send + Sync + 'static,
K: Ord + Clone + fmt::Debug + Send + Sync + 'static,
{
pub async fn bind_map(
listen: impl IntoIterator<Item = (K, SocketAddr)>,
@@ -347,7 +346,7 @@ where
}
impl<K> Acceptor<BTreeMap<K, DynAccept>>
where
K: Ord + Clone + Send + Sync + 'static,
K: Ord + Clone + fmt::Debug + Send + Sync + 'static,
{
pub async fn bind_map_dyn(
listen: impl IntoIterator<Item = (K, SocketAddr)>,

View File

@@ -356,7 +356,10 @@ pub async fn execute<C: Context>(
let mut install = Command::new("chroot");
install.arg(overlay.path()).arg("grub-install");
if tokio::fs::metadata("/sys/firmware/efi").await.is_err() {
install.arg("--target=i386-pc");
match ARCH {
"x86_64" => install.arg("--target=i386-pc"),
_ => &mut install,
};
} else {
match ARCH {
"x86_64" => install.arg("--target=x86_64-efi"),
@@ -372,7 +375,7 @@ pub async fn execute<C: Context>(
Command::new("chroot")
.arg(overlay.path())
.arg("update-grub2")
.arg("update-grub")
.invoke(crate::ErrorKind::Grub)
.await?;
dev.unmount(false).await?;

View File

@@ -150,31 +150,39 @@ impl ExecParams {
cmd.env(k, v);
}
if let Some(uid) = user.as_deref().and_then(|u| u.parse::<u32>().ok()) {
cmd.uid(uid);
} else if let Some(user) = user {
let passwd = std::fs::read_to_string("/etc/passwd")
.with_ctx(|_| (ErrorKind::Filesystem, "read /etc/passwd"));
if passwd.is_err() && user == "root" {
cmd.uid(0);
cmd.gid(0);
if let Some((uid, gid)) =
if let Some(uid) = user.as_deref().and_then(|u| u.parse::<u32>().ok()) {
Some((uid, uid))
} else if let Some(user) = user {
let passwd = std::fs::read_to_string("/etc/passwd")
.with_ctx(|_| (ErrorKind::Filesystem, "read /etc/passwd"));
Some(if passwd.is_err() && user == "root" {
(0, 0)
} else {
let (uid, gid) = passwd?
.lines()
.find_map(|l| {
let mut split = l.trim().split(":");
if user != split.next()? {
return None;
}
split.next(); // throw away x
Some((split.next()?.parse().ok()?, split.next()?.parse().ok()?))
// uid gid
})
.or_not_found(lazy_format!("{user} in /etc/passwd"))?;
(uid, gid)
})
} else {
let (uid, gid) = passwd?
.lines()
.find_map(|l| {
let mut split = l.trim().split(":");
if user != split.next()? {
return None;
}
split.next(); // throw away x
Some((split.next()?.parse().ok()?, split.next()?.parse().ok()?))
// uid gid
})
.or_not_found(lazy_format!("{user} in /etc/passwd"))?;
cmd.uid(uid);
cmd.gid(gid);
None
}
};
{
std::os::unix::fs::chown("/proc/self/fd/0", Some(uid), Some(gid)).log_err();
std::os::unix::fs::chown("/proc/self/fd/1", Some(uid), Some(gid)).log_err();
std::os::unix::fs::chown("/proc/self/fd/2", Some(uid), Some(gid)).log_err();
cmd.uid(uid);
cmd.gid(gid);
}
if let Some(workdir) = workdir {
cmd.current_dir(workdir);
} else {

View File

@@ -725,6 +725,8 @@ pub struct AttachParams {
name: Option<InternedString>,
#[ts(type = "string | null")]
image_id: Option<ImageId>,
#[ts(type = "string | null")]
user: Option<InternedString>,
}
pub async fn attach(
ctx: RpcContext,
@@ -738,6 +740,7 @@ pub async fn attach(
subcontainer,
image_id,
name,
user,
}: AttachParams,
) -> Result<Guid, Error> {
let (container_id, subcontainer_id, image_id, workdir, root_command) = {
@@ -814,9 +817,26 @@ pub async fn attach(
.join("etc")
.join("passwd");
let root_command = get_passwd_root_command(passwd).await;
let image_meta = serde_json::from_str::<Value>(
&tokio::fs::read_to_string(
root_dir
.join("media/startos/images/")
.join(&image_id)
.with_extension("json"),
)
.await?,
)
.with_kind(ErrorKind::Deserialization)?;
let workdir = attach_workdir(&image_id, &root_dir).await?;
let root_command = get_passwd_command(
passwd,
user.as_deref()
.or_else(|| image_meta["user"].as_str())
.unwrap_or("root"),
)
.await;
let workdir = image_meta["workdir"].as_str().map(|s| s.to_owned());
if subcontainer_ids.len() > 1 {
let subcontainer_ids = subcontainer_ids
@@ -849,6 +869,7 @@ pub async fn attach(
pty_size: Option<TermSize>,
image_id: ImageId,
workdir: Option<String>,
user: Option<InternedString>,
root_command: &RootCommand,
) -> Result<(), Error> {
use axum::extract::ws::Message;
@@ -871,6 +892,10 @@ pub async fn attach(
.with_extension("env"),
);
if let Some(user) = user {
cmd.arg("--user").arg(&*user);
}
if let Some(workdir) = workdir {
cmd.arg("--workdir").arg(workdir);
}
@@ -1032,6 +1057,7 @@ pub async fn attach(
pty_size,
image_id,
workdir,
user,
&root_command,
)
.await
@@ -1051,19 +1077,46 @@ pub async fn attach(
Ok(guid)
}
async fn attach_workdir(image_id: &ImageId, root_dir: &Path) -> Result<Option<String>, Error> {
let path_str = root_dir.join("media/startos/images/");
let mut subcontainer_json =
tokio::fs::File::open(path_str.join(image_id).with_extension("json")).await?;
let mut contents = vec![];
subcontainer_json.read_to_end(&mut contents).await?;
let subcontainer_json: serde_json::Value =
serde_json::from_slice(&contents).with_kind(ErrorKind::Filesystem)?;
Ok(subcontainer_json["workdir"].as_str().map(|x| x.to_string()))
#[derive(Deserialize, Serialize, TS)]
#[serde(rename_all = "camelCase")]
pub struct ListSubcontainersParams {
pub id: PackageId,
}
async fn get_passwd_root_command(etc_passwd_path: PathBuf) -> RootCommand {
#[derive(Clone, Debug, Serialize, Deserialize, TS)]
#[serde(rename_all = "camelCase")]
pub struct SubcontainerInfo {
pub name: InternedString,
pub image_id: ImageId,
}
pub async fn list_subcontainers(
ctx: RpcContext,
ListSubcontainersParams { id }: ListSubcontainersParams,
) -> Result<BTreeMap<Guid, SubcontainerInfo>, Error> {
let service = ctx.services.get(&id).await;
let service_ref = service.as_ref().or_not_found(&id)?;
let container = &service_ref.seed.persistent_container;
let subcontainers = container.subcontainers.lock().await;
let result: BTreeMap<Guid, SubcontainerInfo> = subcontainers
.iter()
.map(|(guid, subcontainer)| {
(
guid.clone(),
SubcontainerInfo {
name: subcontainer.name.clone(),
image_id: subcontainer.image_id.clone(),
},
)
})
.collect();
Ok(result)
}
async fn get_passwd_command(etc_passwd_path: PathBuf, user: &str) -> RootCommand {
async {
let mut file = tokio::fs::File::open(etc_passwd_path).await?;
@@ -1074,8 +1127,8 @@ async fn get_passwd_root_command(etc_passwd_path: PathBuf) -> RootCommand {
for line in contents.split('\n') {
let line_information = line.split(':').collect::<Vec<_>>();
if let (Some(&"root"), Some(shell)) =
(line_information.first(), line_information.last())
if let (Some(&u), Some(shell)) = (line_information.first(), line_information.last())
&& u == user
{
return Ok(shell.to_string());
}
@@ -1106,6 +1159,8 @@ pub struct CliAttachParams {
#[arg(long, short)]
name: Option<InternedString>,
#[arg(long, short)]
user: Option<InternedString>,
#[arg(long, short)]
image_id: Option<ImageId>,
}
#[instrument[skip_all]]
@@ -1147,6 +1202,7 @@ pub async fn cli_attach(
"subcontainer": params.subcontainer,
"imageId": params.image_id,
"name": params.name,
"user": params.user,
}),
)
.await?,

View File

@@ -353,6 +353,7 @@ pub async fn show_config(
Ok(client
.client_config(
ip,
subnet,
wg.as_key().de()?.verifying_key(),
(wan_addr, wg.as_port().de()?).into(),
)

View File

@@ -293,14 +293,7 @@ pub async fn set_password_cli(
Ok(())
}
pub async fn reset_password(
HandlerArgs {
context,
parent_method,
method,
..
}: HandlerArgs<CliContext>,
) -> Result<(), Error> {
pub async fn reset_password(ctx: CliContext) -> Result<(), Error> {
println!("Generating a random password...");
let params = SetPasswordParams {
password: base32::encode(
@@ -309,11 +302,7 @@ pub async fn reset_password(
),
};
context
.call_remote::<TunnelContext>(
&parent_method.iter().chain(method.iter()).join("."),
to_value(&params)?,
)
ctx.call_remote::<TunnelContext>("auth.set-password", to_value(&params)?)
.await?;
println!("Your new password is:");

View File

@@ -7,6 +7,6 @@ PrivateKey = {privkey}
[Peer]
PublicKey = {server_pubkey}
PresharedKey = {psk}
AllowedIPs = 0.0.0.0/0,::/0
AllowedIPs = {subnet}
Endpoint = {server_addr}
PersistentKeepalive = 25

View File

@@ -170,12 +170,14 @@ impl WgConfig {
pub fn client_config(
self,
addr: Ipv4Addr,
subnet: Ipv4Net,
server_pubkey: Base64<PublicKey>,
server_addr: SocketAddr,
) -> ClientConfig {
ClientConfig {
client_config: self,
client_addr: addr,
subnet,
server_pubkey,
server_addr,
}
@@ -213,6 +215,7 @@ where
pub struct ClientConfig {
client_config: WgConfig,
client_addr: Ipv4Addr,
subnet: Ipv4Net,
#[serde(deserialize_with = "deserialize_verifying_key")]
server_pubkey: Base64<PublicKey>,
server_addr: SocketAddr,
@@ -226,6 +229,7 @@ impl std::fmt::Display for ClientConfig {
privkey = self.client_config.key.to_padded_string(),
psk = self.client_config.psk.to_padded_string(),
addr = self.client_addr,
subnet = self.subnet,
server_pubkey = self.server_pubkey.to_padded_string(),
server_addr = self.server_addr,
)

View File

@@ -19,12 +19,6 @@ use ts_rs::TS;
use crate::PLATFORM;
use crate::context::{CliContext, RpcContext};
use crate::disk::mount::filesystem::MountType;
use crate::disk::mount::filesystem::bind::Bind;
use crate::disk::mount::filesystem::block_dev::BlockDev;
use crate::disk::mount::filesystem::efivarfs::EfiVarFs;
use crate::disk::mount::filesystem::overlayfs::OverlayGuard;
use crate::disk::mount::guard::{GenericMountGuard, MountGuard, TmpMountGuard};
use crate::notifications::{NotificationLevel, notify};
use crate::prelude::*;
use crate::progress::{
@@ -275,7 +269,6 @@ async fn maybe_do_update(
download_phase.set_total(asset.commitment.size);
download_phase.set_units(Some(ProgressUnits::Bytes));
let reverify_phase = progress.add_phase("Reverifying File".into(), Some(10));
let sync_boot_phase = progress.add_phase("Syncing Boot Files".into(), Some(1));
let finalize_phase = progress.add_phase("Finalizing Update".into(), Some(1));
let start_progress = progress.snapshot();
@@ -331,7 +324,6 @@ async fn maybe_do_update(
prune_phase,
download_phase,
reverify_phase,
sync_boot_phase,
finalize_phase,
},
)
@@ -388,7 +380,6 @@ struct UpdateProgressHandles {
prune_phase: PhaseProgressTrackerHandle,
download_phase: PhaseProgressTrackerHandle,
reverify_phase: PhaseProgressTrackerHandle,
sync_boot_phase: PhaseProgressTrackerHandle,
finalize_phase: PhaseProgressTrackerHandle,
}
@@ -401,7 +392,6 @@ async fn do_update(
mut prune_phase,
mut download_phase,
mut reverify_phase,
mut sync_boot_phase,
mut finalize_phase,
}: UpdateProgressHandles,
) -> Result<(), Error> {
@@ -416,9 +406,7 @@ async fn do_update(
prune_phase.complete();
download_phase.start();
let path = Path::new("/media/startos/images")
.join(hex::encode(&asset.commitment.hash[..16]))
.with_extension("rootfs");
let path = Path::new("/media/startos/images/next.squashfs");
let mut dst = AtomicFile::new(&path, None::<&Path>)
.await
.with_kind(ErrorKind::Filesystem)?;
@@ -438,92 +426,24 @@ async fn do_update(
dst.save().await.with_kind(ErrorKind::Filesystem)?;
reverify_phase.complete();
sync_boot_phase.start();
finalize_phase.start();
Command::new("unsquashfs")
.arg("-n")
.arg("-f")
.arg("-d")
.arg("/")
.arg(&path)
.arg("boot")
.arg("/usr/lib/startos/scripts/upgrade")
.invoke(crate::ErrorKind::Filesystem)
.await?;
if &*PLATFORM != "raspberrypi" {
let mountpoint = "/media/startos/next";
let root_guard = OverlayGuard::mount(
TmpMountGuard::mount(&BlockDev::new(&path), MountType::ReadOnly).await?,
mountpoint,
)
.await?;
let startos = MountGuard::mount(
&Bind::new("/media/startos/root"),
root_guard.path().join("media/startos/root"),
MountType::ReadOnly,
)
.await?;
let boot_guard = MountGuard::mount(
&Bind::new("/boot"),
root_guard.path().join("boot"),
MountType::ReadWrite,
)
.await?;
let dev = MountGuard::mount(
&Bind::new("/dev"),
root_guard.path().join("dev"),
MountType::ReadWrite,
)
.await?;
let proc = MountGuard::mount(
&Bind::new("/proc"),
root_guard.path().join("proc"),
MountType::ReadWrite,
)
.await?;
let sys = MountGuard::mount(
&Bind::new("/sys"),
root_guard.path().join("sys"),
MountType::ReadWrite,
)
.await?;
let efivarfs = if tokio::fs::metadata("/sys/firmware/efi").await.is_ok() {
Some(
MountGuard::mount(
&EfiVarFs,
root_guard.path().join("sys/firmware/efi/efivars"),
MountType::ReadWrite,
)
.await?,
)
} else {
None
};
Command::new("chroot")
.arg(root_guard.path())
.arg("update-grub2")
.invoke(ErrorKind::Grub)
.await?;
let checksum = hex::encode(&asset.commitment.hash[..16]);
if let Some(efivarfs) = efivarfs {
efivarfs.unmount(false).await?;
}
sys.unmount(false).await?;
proc.unmount(false).await?;
dev.unmount(false).await?;
boot_guard.unmount(false).await?;
startos.unmount(false).await?;
root_guard.unmount(false).await?;
}
sync_boot_phase.complete();
finalize_phase.start();
Command::new("ln")
.arg("-rsf")
Command::new("/usr/lib/startos/scripts/upgrade")
.env("CHECKSUM", &checksum)
.arg(&path)
.arg("/media/startos/config/current.rootfs")
.invoke(crate::ErrorKind::Filesystem)
.invoke(ErrorKind::Grub)
.await?;
Command::new("sync").invoke(ErrorKind::Filesystem).await?;
finalize_phase.complete();
progress.complete();

View File

@@ -1,11 +1,10 @@
use std::pin::Pin;
use std::sync::Weak;
use std::task::{Context, Poll};
use axum::middleware::FromFn;
use futures::future::{BoxFuture, FusedFuture, abortable, pending};
use futures::stream::{AbortHandle, Abortable, BoxStream};
use futures::{Future, FutureExt, Stream, StreamExt};
use rpc_toolkit::from_fn_blocking;
use tokio::sync::watch;
use tokio::task::LocalSet;
@@ -201,3 +200,26 @@ async fn test_cancellable() {
handle.cancel_and_wait().await;
assert!(weak.strong_count() == 0);
}
#[pin_project::pin_project]
pub struct WeakFuture<Fut> {
rc: Weak<()>,
#[pin]
fut: Fut,
}
impl<Fut> WeakFuture<Fut> {
pub fn new(rc: Weak<()>, fut: Fut) -> Self {
Self { rc, fut }
}
}
impl<Fut: Future> Future for WeakFuture<Fut> {
type Output = Option<Fut::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if this.rc.strong_count() > 0 {
this.fut.poll(cx).map(Some)
} else {
Poll::Ready(None)
}
}
}

View File

@@ -49,7 +49,7 @@ pub mod net;
pub mod rpc;
pub mod rpc_client;
pub mod serde;
//pub mod squashfs;
// pub mod squashfs;
pub mod sync;
pub mod tui;

View File

@@ -98,8 +98,7 @@ impl<W: Write> Visit<SquashfsSerializer<W>> for Superblock {
#[pin_project::pin_project]
pub struct MetadataBlocksWriter<W> {
input: [u8; 8192],
size: usize,
input: PartialBuffer<[u8; 8192]>,
size_addr: Option<u64>,
output: PartialBuffer<[u8; 8192]>,
output_flushed: usize,
@@ -123,25 +122,29 @@ enum WriteState {
WritingSizeHeader(u16),
WritingOutput(Box<Self>),
EncodingInput,
FinishingCompression,
WritingFinalSizeHeader(u64, u64),
SeekingToEnd(u64),
}
fn poll_seek_helper<W: AsyncSeek>(
writer: std::pin::Pin<&mut W>,
mut writer: std::pin::Pin<&mut W>,
seek_state: &mut SeekState,
cx: &mut std::task::Context<'_>,
pos: u64,
) -> std::task::Poll<std::io::Result<u64>> {
match *seek_state {
SeekState::Idle => {
writer.start_seek(std::io::SeekFrom::Start(pos))?;
writer.as_mut().start_seek(std::io::SeekFrom::Start(pos))?;
*seek_state = SeekState::Seeking(pos);
Poll::Pending
match writer.as_mut().poll_complete(cx)? {
Poll::Ready(result) => {
*seek_state = SeekState::Idle;
Poll::Ready(Ok(result))
}
Poll::Pending => Poll::Pending,
}
}
SeekState::Seeking(target) if target == pos => {
let result = ready!(writer.poll_complete(cx))?;
let result = ready!(writer.as_mut().poll_complete(cx))?;
*seek_state = SeekState::Idle;
Poll::Ready(Ok(result))
}
@@ -151,35 +154,53 @@ fn poll_seek_helper<W: AsyncSeek>(
pos,
old_target
);
writer.start_seek(std::io::SeekFrom::Start(pos))?;
writer.as_mut().start_seek(std::io::SeekFrom::Start(pos))?;
*seek_state = SeekState::Seeking(pos);
Poll::Pending
match writer.as_mut().poll_complete(cx)? {
Poll::Ready(result) => {
*seek_state = SeekState::Idle;
Poll::Ready(Ok(result))
}
Poll::Pending => Poll::Pending,
}
}
SeekState::GettingPosition => {
tracing::warn!(
"poll_seek({}) called while getting stream position, canceling",
pos
);
writer.start_seek(std::io::SeekFrom::Start(pos))?;
writer.as_mut().start_seek(std::io::SeekFrom::Start(pos))?;
*seek_state = SeekState::Seeking(pos);
Poll::Pending
match writer.as_mut().poll_complete(cx)? {
Poll::Ready(result) => {
*seek_state = SeekState::Idle;
Poll::Ready(Ok(result))
}
Poll::Pending => Poll::Pending,
}
}
}
}
fn poll_stream_position_helper<W: AsyncSeek>(
writer: std::pin::Pin<&mut W>,
mut writer: std::pin::Pin<&mut W>,
seek_state: &mut SeekState,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<u64>> {
match *seek_state {
SeekState::Idle => {
writer.start_seek(std::io::SeekFrom::Current(0))?;
writer.as_mut().start_seek(std::io::SeekFrom::Current(0))?;
*seek_state = SeekState::GettingPosition;
Poll::Pending
match writer.as_mut().poll_complete(cx)? {
Poll::Ready(result) => {
*seek_state = SeekState::Idle;
Poll::Ready(Ok(result))
}
Poll::Pending => Poll::Pending,
}
}
SeekState::GettingPosition => {
let result = ready!(writer.poll_complete(cx))?;
let result = ready!(writer.as_mut().poll_complete(cx))?;
*seek_state = SeekState::Idle;
Poll::Ready(Ok(result))
}
@@ -188,18 +209,22 @@ fn poll_stream_position_helper<W: AsyncSeek>(
"poll_stream_position called while seeking to {}, canceling",
target
);
writer.start_seek(std::io::SeekFrom::Current(0))?;
writer.as_mut().start_seek(std::io::SeekFrom::Current(0))?;
*seek_state = SeekState::GettingPosition;
Poll::Pending
match writer.as_mut().poll_complete(cx)? {
Poll::Ready(result) => {
*seek_state = SeekState::Idle;
Poll::Ready(Ok(result))
}
Poll::Pending => Poll::Pending,
}
}
}
}
impl<W: Write + Seek> Write for MetadataBlocksWriter<W> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let n = buf.len().min(self.input.len() - self.size);
self.input[self.size..self.size + n].copy_from_slice(&buf[..n]);
self.size += n;
let n = self.input.copy_unwritten_from(&mut PartialBuffer::new(buf));
if n < buf.len() {
self.flush()?;
}
@@ -207,9 +232,9 @@ impl<W: Write + Seek> Write for MetadataBlocksWriter<W> {
}
fn flush(&mut self) -> std::io::Result<()> {
loop {
match self.write_state {
match &self.write_state {
WriteState::Idle => {
if self.size == 0 {
if self.input.written().is_empty() {
return Ok(());
}
self.write_state = WriteState::WritingSizeHeader(0);
@@ -218,12 +243,12 @@ impl<W: Write + Seek> Write for MetadataBlocksWriter<W> {
WriteState::WritingSizeHeader(size) => {
let done = if let Some(size_addr) = self.size_addr {
self.writer.seek(SeekFrom::Start(size_addr))?;
Some(size_addr + size as u64)
Some(size_addr + 2 + *size as u64)
} else {
self.size_addr = Some(self.writer.stream_position()?);
None
};
self.output.unwritten_mut()[..2].copy_from_slice(&u16::to_le_bytes(size)[..]);
self.output.unwritten_mut()[..2].copy_from_slice(&u16::to_le_bytes(*size)[..]);
self.output.advance(2);
self.write_state =
WriteState::WritingOutput(Box::new(if let Some(end) = done {
@@ -242,80 +267,33 @@ impl<W: Write + Seek> Write for MetadataBlocksWriter<W> {
} else {
self.output.reset();
self.output_flushed = 0;
self.write_state = *next;
self.write_state = *next.clone();
}
}
WriteState::EncodingInput => {
let encoder = self.zstd.get_or_insert_with(|| ZstdEncoder::new(22));
let mut input = PartialBuffer::new(&self.input[..self.size]);
while !self.output.unwritten().is_empty() && !input.unwritten().is_empty() {
encoder.encode(&mut input, &mut self.output)?;
}
while !encoder.flush(&mut self.output)? {}
while !encoder.finish(&mut self.output)? {}
if !self.output.unwritten().is_empty() {
let mut input =
PartialBuffer::new(&self.input[self.input_flushed..self.size]);
encoder.encode(&mut input, &mut self.output)?;
self.input_flushed += input.written().len();
}
self.write_state = WriteState::WritingOutput(Box::new());
continue;
}
WriteState::FinishingCompression => {
if !self.output.unwritten().is_empty() {
if self.zstd.as_mut().unwrap().finish(&mut self.output)? {
self.zstd = None;
}
}
if self.output.written().len() > self.output_flushed {
self.write_state = WriteState::WritingOutput;
continue;
}
if self.zstd.is_none() && self.output.written().len() == self.output_flushed {
self.output_flushed = 0;
self.output.reset();
let end_addr = self.writer.stream_position()?;
let size_addr = self.size_addr.ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"size_addr not set when finishing compression",
)
})?;
self.write_state = WriteState::WritingFinalSizeHeader(size_addr, end_addr);
continue;
}
return Ok(());
}
WriteState::WritingFinalSizeHeader(size_addr, end_addr) => {
if self.output.written().len() > self.output_flushed {
let n = self
.writer
.write(&self.output.written()[self.output_flushed..])?;
self.output_flushed += n;
continue;
}
self.writer.seek(std::io::SeekFrom::Start(size_addr))?;
self.output.unwritten_mut()[..2]
.copy_from_slice(&((end_addr - size_addr - 2) as u16).to_le_bytes());
self.output.advance(2);
let n = self.writer.write(&self.output.written())?;
self.output_flushed = n;
if n == 2 {
self.output_flushed = 0;
self.output.reset();
self.write_state = WriteState::SeekingToEnd(end_addr);
}
continue;
encoder.encode(
&mut PartialBuffer::new(&self.input.written()),
&mut self.output,
)?;
let compressed = if !encoder.finish(&mut self.output)? {
std::mem::swap(&mut self.output, &mut self.input);
false
} else {
true
};
self.zstd = None;
self.input.reset();
self.write_state =
WriteState::WritingOutput(Box::new(WriteState::WritingSizeHeader(
self.output.written().len() as u16
| if compressed { 0 } else { 0x8000 },
)));
}
WriteState::SeekingToEnd(end_addr) => {
self.writer.seek(std::io::SeekFrom::Start(end_addr))?;
self.input_flushed = 0;
self.size = 0;
self.writer.seek(std::io::SeekFrom::Start(*end_addr))?;
self.size_addr = None;
self.write_state = WriteState::Idle;
return Ok(());
@@ -332,11 +310,9 @@ impl<W: AsyncWrite + AsyncSeek> AsyncWrite for MetadataBlocksWriter<W> {
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
let this = self.as_mut().project();
let n = buf.len().min(this.input.len() - *this.size);
this.input[*this.size..*this.size + n].copy_from_slice(&buf[..n]);
*this.size += n;
let n = this.input.copy_unwritten_from(&mut PartialBuffer::new(buf));
if n < buf.len() {
ready!(self.poll_flush(cx)?);
ready!(self.poll_flush(cx))?;
}
Poll::Ready(Ok(n))
}
@@ -347,115 +323,76 @@ impl<W: AsyncWrite + AsyncSeek> AsyncWrite for MetadataBlocksWriter<W> {
) -> std::task::Poll<std::io::Result<()>> {
loop {
let mut this = self.as_mut().project();
match *this.write_state {
match this.write_state.clone() {
WriteState::Idle => {
if *this.size == 0 {
if this.input.written().is_empty() {
return Poll::Ready(Ok(()));
}
if this.size_addr.is_none() {
*this.write_state = WriteState::WritingSizeHeader(0);
}
WriteState::WritingSizeHeader(size) => {
let done = if let Some(size_addr) = *this.size_addr {
ready!(poll_seek_helper(
this.writer.as_mut(),
this.seek_state,
cx,
size_addr
))?;
Some(size_addr + 2 + size as u64)
} else {
let pos = ready!(poll_stream_position_helper(
this.writer.as_mut(),
this.seek_state,
cx
))?;
*this.size_addr = Some(pos);
this.output.unwritten_mut()[..2].copy_from_slice(&[0; 2]);
this.output.advance(2);
}
*this.write_state = WriteState::WritingOutput;
continue;
}
WriteState::WritingOutput => {
if this.output.written().len() > *this.output_flushed {
let n = ready!(
this.writer
.as_mut()
.poll_write(cx, &this.output.written()[*this.output_flushed..])
)?;
*this.output_flushed += n;
continue;
}
if this.output.written().len() == *this.output_flushed {
*this.output_flushed = 0;
this.output.reset();
}
if *this.input_flushed < *this.size {
if !this.output.unwritten().is_empty() {
let mut input =
PartialBuffer::new(&this.input[*this.input_flushed..*this.size]);
this.zstd
.get_or_insert_with(|| ZstdEncoder::new(22))
.encode(&mut input, this.output)?;
*this.input_flushed += input.written().len();
}
continue;
None
};
this.output.unwritten_mut()[..2]
.copy_from_slice(&u16::to_le_bytes(size)[..]);
this.output.advance(2);
*this.write_state = WriteState::WritingOutput(Box::new(if let Some(end) = done {
WriteState::SeekingToEnd(end)
} else {
if !this.output.unwritten().is_empty() {
if this.zstd.as_mut().unwrap().finish(this.output)? {
*this.zstd = None;
}
continue;
}
if this.zstd.is_none()
&& this.output.written().len() == *this.output_flushed
{
*this.output_flushed = 0;
this.output.reset();
if let Some(size_addr) = *this.size_addr {
let end_addr = ready!(poll_stream_position_helper(
this.writer.as_mut(),
this.seek_state,
cx
))?;
*this.write_state =
WriteState::WritingFinalSizeHeader(size_addr, end_addr);
ready!(poll_seek_helper(
this.writer.as_mut(),
this.seek_state,
cx,
size_addr
))?;
this.output.unwritten_mut()[..2].copy_from_slice(
&((end_addr - size_addr - 2) as u16).to_le_bytes(),
);
this.output.advance(2);
continue;
}
}
}
return Poll::Ready(Ok(()));
WriteState::EncodingInput
}));
}
WriteState::WritingSizeHeader(_size_addr) => {
*this.write_state = WriteState::WritingOutput;
continue;
WriteState::WritingOutput(next) => {
if this.output.written().len() > *this.output_flushed {
let n = ready!(this
.writer
.as_mut()
.poll_write(cx, &this.output.written()[*this.output_flushed..]))?;
*this.output_flushed += n;
} else {
this.output.reset();
*this.output_flushed = 0;
*this.write_state = *next;
}
}
WriteState::EncodingInput => {
*this.write_state = WriteState::WritingOutput;
continue;
}
WriteState::FinishingCompression => {
*this.write_state = WriteState::WritingOutput;
continue;
}
WriteState::WritingFinalSizeHeader(_size_addr, end_addr) => {
if this.output.written().len() > *this.output_flushed {
let n = ready!(
this.writer
.as_mut()
.poll_write(cx, &this.output.written()[*this.output_flushed..])
)?;
*this.output_flushed += n;
continue;
}
*this.output_flushed = 0;
this.output.reset();
*this.write_state = WriteState::SeekingToEnd(end_addr);
continue;
let encoder = this.zstd.get_or_insert_with(|| ZstdEncoder::new(22));
encoder.encode(
&mut PartialBuffer::new(this.input.written()),
this.output,
)?;
let compressed = if !encoder.finish(this.output)? {
std::mem::swap(this.output, this.input);
false
} else {
true
};
*this.zstd = None;
this.input.reset();
*this.write_state = WriteState::WritingOutput(Box::new(
WriteState::WritingSizeHeader(
this.output.written().len() as u16
| if compressed { 0 } else { 0x8000 },
),
));
}
WriteState::SeekingToEnd(end_addr) => {
@@ -466,8 +403,6 @@ impl<W: AsyncWrite + AsyncSeek> AsyncWrite for MetadataBlocksWriter<W> {
end_addr
))?;
*this.size_addr = None;
*this.input_flushed = 0;
*this.size = 0;
*this.write_state = WriteState::Idle;
return Poll::Ready(Ok(()));
}
@@ -486,11 +421,9 @@ impl<W: AsyncWrite + AsyncSeek> AsyncWrite for MetadataBlocksWriter<W> {
impl<W> MetadataBlocksWriter<W> {
pub fn new(writer: W) -> Self {
Self {
input: [0; 8192],
input_flushed: 0,
size: 0,
input: PartialBuffer::new([0; 8192]),
size_addr: None,
output: PartialBuffer::new([0; 4096]),
output: PartialBuffer::new([0; 8192]),
output_flushed: 0,
zstd: None,
seek_state: SeekState::Idle,
@@ -507,11 +440,10 @@ use tokio::io::AsyncRead;
pub struct MetadataBlocksReader<R> {
#[pin]
reader: R,
size_buf: [u8; 2],
size_bytes_read: usize,
compressed: [u8; 8192],
size_buf: PartialBuffer<[u8; 2]>,
compressed: PartialBuffer<[u8; 8192]>,
compressed_size: usize,
compressed_pos: usize,
is_compressed: bool,
output: PartialBuffer<[u8; 8192]>,
output_pos: usize,
zstd: Option<ZstdDecoder>,
@@ -531,11 +463,10 @@ impl<R> MetadataBlocksReader<R> {
pub fn new(reader: R) -> Self {
Self {
reader,
size_buf: [0; 2],
size_bytes_read: 0,
compressed: [0; 8192],
size_buf: PartialBuffer::new([0; 2]),
compressed: PartialBuffer::new([0; 8192]),
compressed_size: 0,
compressed_pos: 0,
is_compressed: false,
output: PartialBuffer::new([0; 8192]),
output_pos: 0,
zstd: None,
@@ -551,11 +482,9 @@ impl<R: Read> Read for MetadataBlocksReader<R> {
loop {
match self.state {
ReadState::ReadingSize => {
let n = self
.reader
.read(&mut self.size_buf[self.size_bytes_read..])?;
let n = self.reader.read(self.size_buf.unwritten_mut())?;
if n == 0 {
if self.size_bytes_read == 0 {
if self.size_buf.written().is_empty() {
self.state = ReadState::Eof;
return Ok(0);
} else {
@@ -566,56 +495,57 @@ impl<R: Read> Read for MetadataBlocksReader<R> {
}
}
self.size_bytes_read += n;
if self.size_bytes_read < 2 {
self.size_buf.advance(n);
if self.size_buf.written().len() < 2 {
continue;
}
let size_header = u16::from_le_bytes(self.size_buf);
let is_compressed = (size_header & 0x8000) == 0;
let size = (size_header & 0x7FFF) as usize;
let size_header = u16::from_le_bytes([
self.size_buf.written()[0],
self.size_buf.written()[1],
]);
self.is_compressed = (size_header & 0x8000) == 0;
self.compressed_size = (size_header & 0x7FFF) as usize;
if !is_compressed {
if self.compressed_size == 0 || self.compressed_size > 8192 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Uncompressed metadata blocks not supported",
format!("Invalid metadata block size: {}", self.compressed_size),
));
}
if size == 0 || size > 8192 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Invalid metadata block size: {}", size),
));
}
self.compressed_size = size;
self.compressed_pos = 0;
self.size_bytes_read = 0;
self.compressed.reset();
self.size_buf.reset();
self.state = ReadState::ReadingData;
continue;
}
ReadState::ReadingData => {
let n = self
.reader
.read(&mut self.compressed[self.compressed_pos..self.compressed_size])?;
let n = self.reader.read(self.compressed.unwritten_mut())?;
if n == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Unexpected EOF reading compressed data",
"Unexpected EOF reading data",
));
}
self.compressed_pos += n;
if self.compressed_pos < self.compressed_size {
self.compressed.advance(n);
if !self.compressed.unwritten().is_empty() {
continue;
}
self.zstd = Some(ZstdDecoder::new());
self.output_pos = 0;
self.output.reset();
self.state = ReadState::Decompressing;
if self.is_compressed {
self.zstd = Some(ZstdDecoder::new());
self.state = ReadState::Decompressing;
} else {
self.output
.copy_unwritten_from(&mut PartialBuffer::new(self.compressed.written()));
self.state = ReadState::Outputting;
}
continue;
}
@@ -625,7 +555,7 @@ impl<R: Read> Read for MetadataBlocksReader<R> {
continue;
}
let mut input = PartialBuffer::new(&self.compressed[..self.compressed_size]);
let mut input = PartialBuffer::new(self.compressed.written());
let decoder = self.zstd.as_mut().unwrap();
if decoder.decode(&mut input, &mut self.output)? {
@@ -676,13 +606,13 @@ impl<R: AsyncRead> AsyncRead for MetadataBlocksReader<R> {
match *this.state {
ReadState::ReadingSize => {
let mut read_buf =
tokio::io::ReadBuf::new(&mut this.size_buf[*this.size_bytes_read..]);
let mut read_buf = tokio::io::ReadBuf::new(this.size_buf.unwritten_mut());
let before = read_buf.filled().len();
ready!(this.reader.as_mut().poll_read(cx, &mut read_buf))?;
let n = read_buf.filled().len() - before;
let n = read_buf.filled().len();
if n == 0 {
if *this.size_bytes_read == 0 {
if this.size_buf.written().is_empty() {
*this.state = ReadState::Eof;
return Poll::Ready(Ok(()));
} else {
@@ -693,22 +623,16 @@ impl<R: AsyncRead> AsyncRead for MetadataBlocksReader<R> {
}
}
*this.size_bytes_read += n;
if *this.size_bytes_read < 2 {
this.size_buf.advance(n);
if this.size_buf.written().len() < 2 {
continue;
}
let size_header = u16::from_le_bytes(*this.size_buf);
let is_compressed = (size_header & 0x8000) == 0;
let size_header = u16::from_le_bytes(*this.size_buf.written());
*this.is_compressed = (size_header & 0x8000) == 0;
let size = (size_header & 0x7FFF) as usize;
if !is_compressed {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Uncompressed metadata blocks not supported",
)));
}
if size == 0 || size > 8192 {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
@@ -716,36 +640,42 @@ impl<R: AsyncRead> AsyncRead for MetadataBlocksReader<R> {
)));
}
*this.compressed_size = size;
*this.compressed_pos = 0;
*this.size_bytes_read = 0;
this.compressed.reset();
this.compressed.reserve(size);
this.size_buf.reset();
*this.state = ReadState::ReadingData;
continue;
}
ReadState::ReadingData => {
let mut read_buf = tokio::io::ReadBuf::new(
&mut this.compressed[*this.compressed_pos..*this.compressed_size],
);
let mut read_buf = tokio::io::ReadBuf::new(this.compressed.unwritten_mut());
let before = read_buf.filled().len();
ready!(this.reader.as_mut().poll_read(cx, &mut read_buf))?;
let n = read_buf.filled().len() - before;
let n = read_buf.filled().len();
if n == 0 {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Unexpected EOF reading compressed data",
"Unexpected EOF reading data",
)));
}
*this.compressed_pos += n;
if *this.compressed_pos < *this.compressed_size {
this.compressed.advance(n);
if !this.compressed.unwritten().is_empty() {
continue;
}
*this.zstd = Some(ZstdDecoder::new());
*this.output_pos = 0;
this.output.reset();
*this.state = ReadState::Decompressing;
if *this.is_compressed {
*this.zstd = Some(ZstdDecoder::new());
*this.state = ReadState::Decompressing;
} else {
this.output
.copy_unwritten_from(&mut PartialBuffer::new(this.compressed.written()));
*this.state = ReadState::Outputting;
}
continue;
}
@@ -755,7 +685,7 @@ impl<R: AsyncRead> AsyncRead for MetadataBlocksReader<R> {
continue;
}
let mut input = PartialBuffer::new(&this.compressed[..*this.compressed_size]);
let mut input = PartialBuffer::new(this.compressed.written());
let decoder = this.zstd.as_mut().unwrap();
if decoder.decode(&mut input, this.output)? {

View File

@@ -52,8 +52,9 @@ mod v0_4_0_alpha_9;
mod v0_4_0_alpha_10;
mod v0_4_0_alpha_11;
mod v0_4_0_alpha_12;
mod v0_4_0_alpha_13;
pub type Current = v0_4_0_alpha_12::Version; // VERSION_BUMP
pub type Current = v0_4_0_alpha_13::Version; // VERSION_BUMP
impl Current {
#[instrument(skip(self, db))]
@@ -167,7 +168,8 @@ enum Version {
V0_4_0_alpha_9(Wrapper<v0_4_0_alpha_9::Version>),
V0_4_0_alpha_10(Wrapper<v0_4_0_alpha_10::Version>),
V0_4_0_alpha_11(Wrapper<v0_4_0_alpha_11::Version>),
V0_4_0_alpha_12(Wrapper<v0_4_0_alpha_12::Version>), // VERSION_BUMP
V0_4_0_alpha_12(Wrapper<v0_4_0_alpha_12::Version>),
V0_4_0_alpha_13(Wrapper<v0_4_0_alpha_13::Version>), // VERSION_BUMP
Other(exver::Version),
}
@@ -222,7 +224,8 @@ impl Version {
Self::V0_4_0_alpha_9(v) => DynVersion(Box::new(v.0)),
Self::V0_4_0_alpha_10(v) => DynVersion(Box::new(v.0)),
Self::V0_4_0_alpha_11(v) => DynVersion(Box::new(v.0)),
Self::V0_4_0_alpha_12(v) => DynVersion(Box::new(v.0)), // VERSION_BUMP
Self::V0_4_0_alpha_12(v) => DynVersion(Box::new(v.0)),
Self::V0_4_0_alpha_13(v) => DynVersion(Box::new(v.0)), // VERSION_BUMP
Self::Other(v) => {
return Err(Error::new(
eyre!("unknown version {v}"),
@@ -269,7 +272,8 @@ impl Version {
Version::V0_4_0_alpha_9(Wrapper(x)) => x.semver(),
Version::V0_4_0_alpha_10(Wrapper(x)) => x.semver(),
Version::V0_4_0_alpha_11(Wrapper(x)) => x.semver(),
Version::V0_4_0_alpha_12(Wrapper(x)) => x.semver(), // VERSION_BUMP
Version::V0_4_0_alpha_12(Wrapper(x)) => x.semver(),
Version::V0_4_0_alpha_13(Wrapper(x)) => x.semver(), // VERSION_BUMP
Version::Other(x) => x.clone(),
}
}

View File

@@ -0,0 +1,37 @@
use exver::{PreReleaseSegment, VersionRange};
use super::v0_3_5::V0_3_0_COMPAT;
use super::{VersionT, v0_4_0_alpha_12};
use crate::prelude::*;
lazy_static::lazy_static! {
static ref V0_4_0_alpha_13: exver::Version = exver::Version::new(
[0, 4, 0],
[PreReleaseSegment::String("alpha".into()), 13.into()]
);
}
#[derive(Clone, Copy, Debug, Default)]
pub struct Version;
impl VersionT for Version {
type Previous = v0_4_0_alpha_12::Version;
type PreUpRes = ();
async fn pre_up(self) -> Result<Self::PreUpRes, Error> {
Ok(())
}
fn semver(self) -> exver::Version {
V0_4_0_alpha_13.clone()
}
fn compat(self) -> &'static VersionRange {
&V0_3_0_COMPAT
}
#[instrument(skip_all)]
fn up(self, _db: &mut Value, _: Self::PreUpRes) -> Result<Value, Error> {
Ok(Value::Null)
}
fn down(self, _db: &mut Value) -> Result<(), Error> {
Ok(())
}
}